-
Notifications
You must be signed in to change notification settings - Fork 0
/
train.py
163 lines (135 loc) · 5.1 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
This module contains methods for training agents.
"""
from typing import Tuple, List
import gym
import numpy as np
from scipy.special import expit
from agent import Agent, StochasticAgent
from utils import np_seed
np.random.seed(np_seed)
def random(num_trials: int, mean: float, std_dev: float, max_reward: int) -> Tuple[Agent, List[Tuple[int, float]]]:
"""
Initialize the weights of agent randomly until its performance exceeds that specified by <max_reward>.
Hyperparameters:
num_trials: number of trials to sample for avg_reward of agent.
mean: mean of gaussian which new weights are sampled from.
std_dev: std dev of gaussian which new weights are sampled from.
max_reward: train the agent until it achieves <max_reward>
"""
agent = Agent()
trajectory = []
t, reward = 0, get_avg_reward(agent, num_trials)
while reward < max_reward:
trajectory.append((t, reward))
agent.init_weights(mean, std_dev)
reward = get_avg_reward(agent, num_trials)
t += 1
return agent, trajectory
def hill_climb(num_trials: int, mean: float, std_dev: float, max_reward: int) -> Tuple[Agent, List[Tuple[int, float]]]:
"""
Initialize an agent randomly, and randomly pertube the weights. If the random pertubation achieves better
performance, update the weights.
Hyperparameters:
num_trials: number of trials to sample for avg_reward of agent.
mean: mean of gaussian which weight pertubations are sampled from.
std_dev: std dev of gaussian which weight pertubations are sampled from.
max_reward: train the agent until it achieves <max_reward>
"""
agent = Agent()
trajectory = []
t, reward = 0, get_avg_reward(agent, num_trials)
while reward < max_reward:
trajectory.append((t, reward))
perturb = std_dev * np.random.randn(4) + mean
agent.set_weights(agent.get_weights() + perturb)
if get_avg_reward(agent, num_trials) <= reward:
agent.set_weights(agent.get_weights() - perturb)
else:
reward = get_avg_reward(agent, num_trials)
t += 1
return agent, trajectory
def reinforce(lr: float, num_trials: int, horizon: int, max_reward: float) \
-> Tuple[StochasticAgent, List[Tuple[int, float]]]:
"""
Trains an agent with a stochastic policy (<agent>) using the standard REINFORCE policy gradient algorithm.
Hyperparameters:
lr: learning rate
num_trials: number of trials to sample
horizon: time horizon for each trial
max_reward: train the agent until it achieves <max_reward>
"""
agent = StochasticAgent()
trajectory = []
t = 0
reward, data = sample_trials(agent, num_trials, horizon)
while reward < max_reward:
trajectory.append((t, reward))
grad = np.zeros(4)
for datum in data:
state, action = datum
z = np.dot(agent.weights, state) # activation
sigmoid = expit(z)
grad += (action * (1 - sigmoid) + (action - 1) * sigmoid) * state
agent.weights += lr * reward * grad
reward, data = sample_trials(agent, num_trials, horizon)
t += 1
if t > 1000:
trajectory.append((t, -1))
break
return agent, trajectory
def sample_trials(agent: Agent, num_trials: int, horizon: int) -> Tuple[float, List[Tuple[np.ndarray, int]]]:
"""
Samples <num_trials> trials with time horizon <horizon>, and returns a tuple (avg reward, List(state, action))
"""
env = gym.make('CartPole-v0')
cumulative_reward = 0
state_action = []
for i in range(num_trials):
t = 0
done = False
observation = env.reset()
while (t < horizon) and not done:
action = agent.get_action(observation)
state_action.append((observation, action))
observation, reward, done, info = env.step(action)
cumulative_reward += reward
t += 1
return (cumulative_reward / num_trials, state_action)
def get_reward(agent: Agent) -> int:
"""
Returns the cumulative reward gained by <agent> in one episode in the training environment.
"""
env = gym.make('CartPole-v0')
observation = env.reset()
cumulative_reward = 0
while True:
action = agent.get_action(observation)
observation, reward, done, info = env.step(action)
cumulative_reward += reward
if done:
break
return cumulative_reward
def get_avg_reward(agent: Agent, num_trials: int) -> float:
"""
Returns the average cumulative reward over <num_trials> trials.
"""
total = 0
for _ in range(num_trials):
total += get_reward(agent)
return total / num_trials
def render(agent: Agent) -> None:
"""
Renders <agent> interacting with <env> to the screen.
"""
env = gym.make("CartPole-v0")
observation = env.reset()
t = 0
while True:
env.render()
action = agent.get_action(observation)
observation, reward, done, info = env.step(action)
if done:
print("Episode finished after {} timesteps".format(t + 1))
break
t += 1