Simple Reinforcement Algorithms From Scratch¶
import numpy as np
import random
import matplotlib.pyplot as plt
import gym
1. Incremental Implementation of Average¶
We’ve finished the incremental implementation of average for you. Please call the function estimate with 1/step step size and fixed step size to compare the difference between this two on a simulated Bandit problem.
def estimate(OldEstimate, StepSize, Target):
'An incremental implementation of average.'
NewEstimate = OldEstimate + StepSize * (Target - OldEstimate)
return NewEstimate
random.seed(6885)
numTimeStep = 10000
q_h = np.zeros(numTimeStep + 1) # Q Value estimate with 1/step step size
q_f = np.zeros(numTimeStep + 1) # Q value estimate with fixed step size
FixedStepSize = 0.5 #A large number to exaggerate the difference
for step in range(1, numTimeStep + 1):
if step < numTimeStep / 2:
r = random.gauss(mu = 1, sigma = 0.1)
else:
r = random.gauss(mu = 3, sigma = 0.1)
q_h[step] = estimate(q_h[step-1], 1/step, r)
q_f[step] = estimate(q_f[step-1], FixedStepSize, r)
q_h = q_h[1:]
q_f = q_f[1:]
import plotly.express as px
import pandas as pd
df = pd.DataFrame({"Decaying Step Size":q_h, "Fixed Step Size":q_f, "step": range(1, numTimeStep + 1)})
df = df.melt(id_vars=["step"], value_vars=["Decaying Step Size", "Fixed Step Size"], value_name="Q Value Estimate", var_name="Estimate Step Size")
fig = px.line(df, x='step', color='Estimate Step Size', y='Q Value Estimate', title="Q Value Estimate for estimaters using a fixed or decaying step size")
fig.show()
2. \(\epsilon\)-Greedy for Exploration¶
In Reinforcement Learning, we are always faced with the dilemma of exploration and exploitation. \(\epsilon\)-Greedy is a trade-off between them. You are gonna implement Greedy and \(\epsilon\)-Greedy. We combine these two policies in one function by treating Greedy as \(\epsilon\)-Greedy where \(\epsilon = 0\). Edit the function epsilon_greedy in ./RLalgs/utils.py.
def epsilon_greedy(value, e, seed=None):
'''
Implement Epsilon-Greedy policy.
Inputs:
value: numpy ndarray
A vector of values of actions to choose from
e: float
Epsilon
seed: None or int
Assign an integer value to remove the randomness
Outputs:
action: int
Index of the chosen action
'''
assert len(value.shape) == 1
assert 0 <= e <= 1
if seed != None:
np.random.seed(seed)
max_val = max(value)
num_actions = len(value)
num_max_vals = len([True for v in value if v == max_val])
high_prob = (1-e)/num_max_vals
lower_prob = e/num_actions
action_probs = [lower_prob if v != max_val else high_prob + lower_prob for v in value]
assert abs(1 - sum(action_probs)) < 0.001
action = np.random.choice(range(len(value)), p=action_probs)
return action
np.random.seed(6885) #Set the seed to cancel the randomness
q = np.random.normal(0, 1, size = 5)
greedy_action = epsilon_greedy(q, 0, seed=334535) #Use epsilon = 0 for Greedy
e_greedy_action = epsilon_greedy(q, 0.1, seed=334535) #Use epsilon = 0.1
print('Values:')
print(q)
print('Greedy Choice =', greedy_action)
print('Epsilon-Greedy Choice =', e_greedy_action)
Values:
[ 0.61264537 0.27923079 -0.84600857 0.05469574 -1.09990968]
Greedy Choice = 0
Epsilon-Greedy Choice = 0
You should get the following results.
Values:
[ 0.61264537 0.27923079 -0.84600857 0.05469574 -1.09990968]
Greedy Choice = 0
3. Frozen Lake Environment¶
env = gym.make('FrozenLake-v0')
3.1 Derive Q value from V value¶
Edit function action_evaluation in ./RLalgs/utils.py.
TIPS: \(q(s, a)=\sum_{s',r}p(s',r|s,a)(r+\gamma v(s'))\)
def action_evaluation(env, gamma, v):
'''
Convert V value to Q value with model.
Inputs:
env: OpenAI Gym environment
env.P: dictionary
P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
gamma: float
Discount value
v: numpy ndarray
Values of states
Outputs:
q: numpy ndarray
Q values of all state-action pairs
'''
nS = env.nS
nA = env.nA
q = np.zeros((nS, nA))
for s in range(nS):
for a in range(nA):
q_i = 0
for prob, next_state, reward, is_terminal in env.P[s][a]:
q_i += prob*(reward + gamma*v[next_state])
q[s][a] = q_i
return q
v = np.ones(16)
q = action_evaluation(env = env.env, gamma = 1, v = v)
print('Action values:')
print(q)
np.apply_along_axis(epsilon_greedy, 1, q, e=0)
Action values:
[[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1.33333333 1.33333333 1.33333333]
[1. 1. 1. 1. ]]
array([1, 0, 1, 1, 3, 3, 3, 3, 3, 3, 1, 0, 3, 0, 2, 0])
You should get Q values all equal to one except at State 14
Pseudo-codes of the following four algorithms can be found on Page 80, 83, 130, 131 of the Sutton’s book.
3.2 Model-based RL algorithms¶
def action_selection(q):
'''
Select action from the Q value
Inputs:
q: numpy ndarray
Outputs:
actions: int
The chosen action of each state
'''
actions = np.apply_along_axis(epsilon_greedy, 1, q, e=0)
# actions = np.argmax(q, axis = 1)
return actions
def action_evaluation(env, gamma, v):
'''
Convert V value to Q value with model.
Inputs:
env: OpenAI Gym environment
env.P: dictionary
P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
gamma: float
Discount value
v: numpy ndarray
Values of states
Outputs:
q: numpy ndarray
Q values of all state-action pairs
'''
nS = env.nS
nA = env.nA
q = np.zeros((nS, nA))
for s in range(nS):
for a in range(nA):
q_i = 0
for prob, next_state, reward, is_terminal in env.P[s][a]:
q_i += prob*(reward + gamma*v[next_state])
q[s][a] = q_i
return q
def render(env, policy):
'''
Play games with the given policy
Inputs:
env: OpenAI Gym environment
env.P: dictionary
P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
policy: numpy ndarray
Maps state to action
'''
state = env.reset()
terminal = False
while not terminal:
action = policy[state]
state, reward, terminal, prob = env.step(action)
env.render()
sleep(1)
print('Episode ends. Reward =', reward)
3.2.1 Policy Iteration¶
Edit the function policy_iteration and relevant functions in ./RLalgs/pi.py to implement the Policy Iteration Algorithm.
def policy_iteration(env, gamma, max_iteration, theta):
"""
Implement Policy iteration algorithm.
Inputs:
env: OpenAI Gym environment.
env.P: dictionary
P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
gamma: float
Discount factor.
max_iteration: int
The maximum number of iterations to run before stopping.
theta: float
The threshold of convergence.
Outputs:
V: numpy.ndarray
policy: numpy.ndarray
numIterations: int
"""
V = np.zeros(env.nS)
policy = np.zeros(env.nS, dtype = np.int32)
policy_stable = False
numIterations = 0
v_old = np.zeros(env.nS)
while not policy_stable and numIterations < max_iteration:
#Implement it with function policy_evaluation and policy_improvement
############################
# YOUR CODE STARTS HERE
V = policy_evaluation(env, policy, gamma, theta)
new_policy, policy_stable = policy_improvement(env, V, policy, gamma)
policy = new_policy
# YOUR CODE ENDS HERE
############################
numIterations += 1
v_old = V
return V, policy, numIterations
def policy_evaluation(env, policy, gamma, theta):
"""
Evaluate the value function from a given policy.
Inputs:
env: OpenAI Gym environment.
env.P: dictionary
env.nS: int
number of states
env.nA: int
number of actions
gamma: float
Discount factor.
policy: numpy.ndarray
The policy to evaluate. Maps states to actions.
theta: float
The threshold of convergence.
Outputs:
V: numpy.ndarray
The value function from the given policy.
"""
nS = env.nS
nA = env.nA
v_old = np.zeros(nS)
converged = False
max_iteration = 10000
numIterations = 0
V = np.zeros(nS)
while not converged and numIterations < max_iteration:
q = action_evaluation(env, gamma, v_old)
V = np.array([q[i, x] for i, x in enumerate(policy.tolist())])
value_change = max(abs((V - v_old)))
converged = value_change < theta
numIterations += 1
v_old = V
return V
def policy_improvement(env, value_from_policy, policy, gamma):
"""
Given the value function from policy, improve the policy.
Inputs:
env: OpenAI Gym environment
env.P: dictionary
P[state][action] is tuples with (probability, nextstate, reward, terminal)
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
value_from_policy: numpy.ndarray
The value calculated from the policy
policy: numpy.ndarray
The previous policy.
gamma: float
Discount factor.
Outputs:
new policy: numpy.ndarray
An array of integers. Each integer is the optimal action to take
in that state according to the environment dynamics and the
given value function.
policy_stable: boolean
True if the "optimal" policy is found, otherwise false
"""
q = action_evaluation(env, gamma, value_from_policy)
new_policy = action_selection(q)
policy_stable = all(policy == new_policy)
return new_policy, policy_stable
V, policy, numIterations = policy_iteration(env = env.env, gamma = 1, max_iteration = 500, theta = 1e-7)
print('State values:')
print(V)
print('Number of iterations to converge =', numIterations)
State values:
[0.82352709 0.82352631 0.82352576 0.82352547 0.82352726 0.
0.5294101 0. 0.82352758 0.82352804 0.76470468 0.
0. 0.88235197 0.94117597 0. ]
Number of iterations to converge = 500
You should get values close to:
State values:
[0.82352774 0.8235272 0.82352682 0.82352662 0.82352791 0.
0.52941063 0. 0.82352817 0.82352851 0.76470509 0.
0. 0.88235232 0.94117615 0.]
3.2.2 Value Iteration¶
Edit the function value_iteration and relevant functions in ./RLalgs/vi.py to implement the Value Iteration Algorithm.
def value_iteration(env, gamma, max_iteration, theta):
"""
Implement value iteration algorithm.
Inputs:
env: OpenAI Gym environment.
env.P: dictionary
the transition probabilities of the environment
P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
env.nS: int
number of states
env.nA: int
number of actions
gamma: float
Discount factor.
max_iteration: int
The maximum number of iterations to run before stopping.
theta: float
The threshold of convergence.
Outputs:
V: numpy.ndarray
policy: numpy.ndarray
numIterations: int
Number of iterations
"""
V = np.zeros(env.nS)
v_old = np.zeros(env.nS)
numIterations = 0
max_iteration = 10000
numIterations = 0
converged = False
while not converged and numIterations < max_iteration:
for s in range(env.nS):
max_v_i = -9999999999
for a in range(env.nA):
for prob, next_state, reward, is_terminal in env.P[s][a]:
v_i = 0
for prob, next_state, reward, is_terminal in env.P[s][a]:
v_i += prob*(reward + gamma*v_old[next_state])
max_v_i = max(max_v_i, v_i)
V[s] = max_v_i
value_change = max(abs((V - v_old)))
converged = value_change < theta
v_old = np.array([x for x in V])
numIterations += 1
#Extract the "optimal" policy from the value function
policy = extract_policy(env, V, gamma)
return V, policy, numIterations
def extract_policy(env, v, gamma):
"""
Extract the optimal policy given the optimal value-function.
Inputs:
env: OpenAI Gym environment.
env.P: dictionary
P[state][action] is tuples with (probability, nextstate, reward, terminal)
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
v: numpy.ndarray
value function
gamma: float
Discount factor. Number in range [0, 1)
Outputs:
policy: numpy.ndarray
"""
policy = np.zeros(env.nS, dtype = np.int32)
for s in range(env.nS):
for a in range(env.nA):
max_v_i = -99999999
for prob, next_state, reward, is_terminal in env.P[s][a]:
if v[next_state] > max_v_i:
policy[s] = next_state
return policy
V, policy, numIterations = value_iteration(env = env.env, gamma = 1, max_iteration = 500, theta = 1e-7)
print('State values:')
print(V)
print('Number of iterations to converge =', numIterations)
State values:
[0.82352709 0.82352631 0.82352576 0.82352548 0.82352726 0.
0.52941011 0. 0.82352759 0.82352804 0.76470468 0.
0. 0.88235197 0.94117597 0. ]
Number of iterations to converge = 525
You should get values close to:
State values:
[0.82352773 0.82352718 0.8235268 0.8235266 0.8235279 0.
0.52941062 0. 0.82352816 0.8235285 0.76470509 0.
0. 0.88235231 0.94117615 0.]
3.3 Model free RL algorithms¶
3.3.1 Q-Learning¶
Edit the function QLearning in ./RLalgs/ql.py to implement the Q-Learning Algorithm.
def QLearning(env, num_episodes, gamma, lr, e):
"""
Implement the Q-learning algorithm following the epsilon-greedy exploration.
Inputs:
env: OpenAI Gym environment
env.P: dictionary
P[state][action] are tuples of tuples tuples with (probability, nextstate, reward, terminal)
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
num_episodes: int
Number of episodes of training
gamma: float
Discount factor.
lr: float
Learning rate.
e: float
Epsilon value used in the epsilon-greedy method.
Outputs:
Q: numpy.ndarray
"""
Q = np.zeros((env.nS, env.nA))
for epi in range(num_episodes):
terminal = False
state_i = env.reset()
action_i = epsilon_greedy(Q[state_i], e=e)
while not terminal:
last_state, last_action = state_i, action_i
state_i, last_reward, terminal, prob = env.step(last_action)
action_i = epsilon_greedy(Q[state_i], e=e)
greedy_action_i = epsilon_greedy(Q[state_i], e=0)
Q[last_state][last_action] = Q[last_state][last_action] + lr*(last_reward + gamma*Q[state_i][greedy_action_i] - Q[last_state][last_action])
return Q
Q = QLearning(env = env.env, num_episodes = 1000, gamma = 1, lr = 0.1, e = 0.1)
print('Action values:')
print(Q)
Action values:
[[3.58539292e-01 2.39550887e-01 2.98189563e-01 2.56742930e-01]
[6.64188268e-02 4.18412961e-02 2.29107752e-02 1.51815956e-01]
[1.62123746e-02 3.26191309e-02 4.27164044e-02 9.30353299e-03]
[1.10126088e-02 3.35943485e-04 0.00000000e+00 1.96347873e-03]
[3.59428466e-01 1.45038233e-01 2.09359658e-01 1.68825983e-01]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[2.25644101e-02 7.98677168e-02 3.14864096e-03 0.00000000e+00]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[1.84506113e-01 2.44236935e-01 2.02371465e-01 3.58998308e-01]
[1.16150788e-01 3.55291016e-01 1.74411426e-01 1.70310615e-01]
[3.82189738e-01 1.98516795e-01 1.89756109e-01 9.77759359e-02]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[1.19007991e-01 2.29408571e-01 1.06694202e-01 3.81043329e-01]
[2.10428202e-01 3.74655709e-01 7.65887974e-01 4.36852489e-01]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]]
3.3.2 SARSA¶
Edit the function SARSA in ./RLalgs/sarsa.py to implement the SARSA Algorithm.
def SARSA(env, num_episodes, gamma, lr, e):
"""
Implement the SARSA algorithm following epsilon-greedy exploration.
Inputs:
env: OpenAI Gym environment
env.P: dictionary
P[state][action] are tuples of tuples tuples with (probability, nextstate, reward, terminal)
probability: float
nextstate: int
reward: float
terminal: boolean
env.nS: int
number of states
env.nA: int
number of actions
num_episodes: int
Number of episodes of training
gamma: float
Discount factor.
lr: float
Learning rate.
e: float
Epsilon value used in the epsilon-greedy method.
Outputs:
Q: numpy.ndarray
State-action values
"""
Q = np.zeros((env.nS, env.nA))
for epi in range(num_episodes):
terminal = False
state_i = env.reset()
action_i = epsilon_greedy(Q[state_i], e=e)
while not terminal:
last_state, last_action = state_i, action_i
state_i, last_reward, terminal, prob = env.step(last_action)
action_i = epsilon_greedy(Q[state_i], e=e)
Q[last_state][last_action] = Q[last_state][last_action] + lr*(last_reward + gamma*Q[state_i][action_i] - Q[last_state][last_action])
return Q
Q = SARSA(env = env.env, num_episodes = 1000, gamma = 1, lr = 0.1, e = 0.1)
print('Action values:')
print(Q)
terminal states are {5, 7, 11, 12, 15}
Action values:
[[2.67582935e-01 2.18605347e-01 2.19526187e-01 2.12913140e-01]
[4.11010140e-02 4.91476686e-02 2.68654550e-02 1.28709183e-01]
[1.48023510e-02 1.36887908e-02 3.85594964e-03 5.04395713e-02]
[8.80898724e-03 1.31326087e-02 3.25511308e-03 2.85275602e-03]
[2.78687763e-01 1.74856389e-01 1.50320541e-01 1.75055753e-01]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[0.00000000e+00 0.00000000e+00 6.43856070e-04 1.46441032e-02]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[1.65101120e-01 1.61979515e-01 1.81075025e-01 3.33316157e-01]
[2.26313070e-01 3.61044914e-01 2.42705704e-01 1.83277620e-01]
[8.15992551e-02 2.74304836e-01 6.77059130e-02 1.56483139e-01]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
[9.99441343e-02 3.44255788e-01 5.10698706e-01 2.54869103e-01]
[4.07597092e-01 2.88810844e-01 6.99259866e-01 3.88493659e-01]
[0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]]
4. Exploration VS. Exploitation¶
Below is a study on balancing between exploration and exploitation in a 10 armed bandit problem.
import gym
from gym import spaces
from gym.utils import seeding
import tqdm
class ArmedBanditEnv(gym.Env):
def __init__(self, reward_dist):
self.reward_dist = reward_dist
self.action_space = spaces.Discrete(len(reward_dist))
self.info = {"Armed Bandit Test Env": ""}
self.seed()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, action):
assert self.action_space.contains(action)
reward = 0
done = True
reward = np.random.normal(self.reward_dist[action][0], self.reward_dist[action][1])
return [0], reward, done, self.info
def reset(self):
return [0]
def render(self, mode='human', close=False):
pass
def simulate_bandits(num_band_problems = 500, num_iterations = 2000, reward_mean_std=1):
output = []
for bandit_prob_i in tqdm.tqdm(range(num_band_problems)):
bandit_arm_avg_std = [[np.random.normal(0, reward_mean_std), 1] for _ in range(10)]
env = ArmedBanditEnv(bandit_arm_avg_std)
for e in [0, 0.01, 0.1]:
Q = np.zeros(10)
avg_reward = 0
for iter_i in range(1, num_iterations + 1):
action_i = epsilon_greedy(Q, e=e)
state_i, reward, terminal, info = env.step(action_i)
Q[action_i] = estimate(Q[action_i], 1/iter_i, reward)
avg_reward = estimate(avg_reward, 1/iter_i, reward)
output.append({"bandit_prob_i": bandit_prob_i, "step": iter_i, "epsilon": str(e), "avg_reward": avg_reward})
df = pd.DataFrame(output)
return df
df_2 = simulate_bandits(num_band_problems=1000, num_iterations=1000, reward_mean_std=1)
100%|██████████| 1000/1000 [04:58<00:00, 3.35it/s]
import plotly.express as px
import pandas as pd
df_2_grouped = df_2.groupby(["epsilon", "step"], as_index=False).mean()
df_2_grouped = df_2_grouped.drop("bandit_prob_i", axis=1)
df_2_grouped = df_2_grouped.sort_values("step")
fig = px.line(df_2_grouped, x='step', color='epsilon', y='avg_reward', title="Avg Reward For 10 Armed Bandit Problem, Reward mean is normal(0,1)")
fig.show()