Skip to content
Snippets Groups Projects
Commit aa7c6eb5 authored by S1710567010 (Haminger-Huber Nikolaus)'s avatar S1710567010 (Haminger-Huber Nikolaus)
Browse files

Merge branch 'pong_impl' into 'master'

Added Pong environment implementation

See merge request !1
parents 7ebcd225 d4276fda
Branches master
No related tags found
1 merge request!1Added Pong environment implementation
import pygame
import gym
import numpy as np
from pong import *
class PongEnv(gym.Env):
"""
Encapsulation of the Pong game as RL environment
"""
def __init__(self):
# Actions: move paddle
self.action_space = gym.spaces.Box(low=-1., high=1., shape=(1,), dtype=np.float32)
# States: [agent.y, ball.x, ball.y, ball.direction_x, ball.direction_y, ball.speed]
obs_high = np.array([SCREEN_HEIGHT, SCREEN_WIDTH, SCREEN_HEIGHT, 1, 1, 15])
obs_low = np.array([0, 0, 0, -1, -1, 3])
self.observation_space = gym.spaces.Box(low=obs_low, high=obs_high, dtype=np.float32)
self._ball = Ball()
self._agent = Paddle(x=30, color=LIGHT_BLUE)
self._opponent = Paddle(x=SCREEN_WIDTH - 60, color=BROWN)
self._screen = None
self._round_winner = None
self._step_num = 0
self._total_steps = 0
self._speed_up_num = 500
self._episode_reward = 0
self._episode_steps = 5000
def step(self, action):
"""
Performs a step of the environment with respect to the agent action.
:param action: Agent action
:return: (state, reward, done, info)
"""
reward = 0
done = False
# Move paddle according to agent action
movement = agent_move(self._ball, self._agent, int(action[0] * PLAYER_MOVEMENT_UNITS))
# Move the ball
self._ball.move()
self._ball.keep_on_screen()
# Look if the round is finished
if self._round_winner and self._ball.leaves_screen():
# Add score to round winner
self._round_winner.score += 1
# Signal episode end
done = True
else:
# Evaluate game status and get reward
reward += self._evaluate_game_status(movement)
# Move the opponent paddle
opponent_move(self._ball, self._opponent)
# Count steps and speed up game after some steps
self._step_num += 1
self._total_steps += 1
if self._step_num % self._speed_up_num == 0:
self._ball.speed += 1
# Episode stuff
self._episode_reward += reward
if self._step_num == self._episode_steps:
done = True
# Some verbose output
if done:
print('Round finished.')
print('Steps :', self._total_steps)
print('Reward :', self._episode_reward)
print('---')
return np.array(self._state()), reward, done, {}
def reset(self):
"""
Resets the environment.
:return: state
"""
self._step_num = 0
self._episode_reward = 0
self._round_winner = None
self._ball.throw_in()
self._ball.move()
self._ball.keep_on_screen()
return np.array(self._state())
def render(self, mode='human'):
"""
Renders the environment for real-time simulation. Should be called after a step.
:param mode: Rendering mode. Not relevant for this environment.
"""
if not pygame.get_init():
pygame.init()
self._screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
pygame.display.set_caption('Pong Environment')
pygame.font.init()
pygame.event.get()
pygame.time.delay(FRAME_DELAY)
a, o, b = self._agent, self._opponent, self._ball
self._screen.fill(BLACK)
self._display_text('Agent', 150, 0)
self._display_text(str(a.score), 180, 45)
self._display_text('Opponent', SCREEN_WIDTH - 240, 0)
self._display_text(str(o.score), SCREEN_WIDTH - 180, 45)
pygame.draw.rect(self._screen, a.color, (a.x, a.y, a.width, a.height), 0)
pygame.draw.rect(self._screen, o.color, (o.x, o.y, o.width, o.height), 0)
pygame.draw.circle(self._screen, b.color, (b.x, b.y), b.radius)
for i in range(1, SCREEN_HEIGHT // 10):
if i % 2 == 0:
pygame.draw.rect(self._screen, GRAY, (SCREEN_WIDTH // 2, i * 10, 10, 10), 0)
pygame.display.update()
def close(self):
"""
Environment cleanup.
"""
if pygame.get_init():
pygame.quit()
def _evaluate_game_status(self, movement):
reward = 0
# Evaluate agent game status
agent_goal_reached, agent_defended = self._agent.collides_with_ball(self._ball)
# The ball crosses the goal line
if agent_goal_reached:
# The round hasn't been won yet, the agent hits the ball back
if not self._round_winner and agent_defended:
reward += 1
self._ball.flip_direction_x()
if movement:
self._ball.direction_y = movement
# The round hasn't been won yet, the agent fails to hit back
elif not self._round_winner and not agent_defended:
reward -= 10
self._round_winner = self._opponent
# The round has been won, though the agent hits the ball
elif self._round_winner and agent_defended:
self._ball.flip_direction_y()
# Evaluate opponent game status
opponent_goal_reached, opponent_defended = self._opponent.collides_with_ball(self._ball)
# The ball crosses opponent goal line
if opponent_goal_reached:
# Round hasn't been won yet, opponent hits ball back
if not self._round_winner and opponent_defended:
self._ball.flip_direction_x()
# Round hasn't been won yet, opponent fails to hit back
elif not self._round_winner and not opponent_defended:
self._round_winner = self._agent
# Round has been won, opponent hits the ball
elif self._round_winner and opponent_defended:
self._ball.flip_direction_y()
return reward
def _state(self):
return [self._agent.y,
self._ball.x,
self._ball.y,
self._ball.direction_x,
self._ball.direction_y,
self._ball.speed]
def _display_text(self, text, x, y):
myfont = pygame.font.SysFont('Comic Sans MS', 30)
textsurface = myfont.render(text, False, WHITE)
self._screen.blit(textsurface, (x, y))
from pong_env import PongEnv
from stable_baselines import PPO2 as agent
from stable_baselines.common.policies import MlpPolicy
from stable_baselines.common.vec_env import DummyVecEnv
def train(env, steps, model_file):
try:
model = agent.load(model_file, env)
except ValueError:
model = agent(MlpPolicy, env)
model.learn(total_timesteps=steps)
model.save(model_file)
def evaluate(env, steps, model_file):
model = agent.load(model_file, env)
obs = env.reset()
for i in range(steps):
action, _ = model.predict(obs)
obs, _, _, _ = env.step(action)
env.render()
def main():
env = DummyVecEnv([lambda: PongEnv()])
model_file = 'models/pong_1000k_ppo2'
train(env, int(2e5), model_file)
evaluate(env, 5000, model_file)
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment