Simple Reinforcement Algorithms From Scratch

import numpy as np
import random
import matplotlib.pyplot as plt
import gym

1. Incremental Implementation of Average

We’ve finished the incremental implementation of average for you. Please call the function estimate with 1/step step size and fixed step size to compare the difference between this two on a simulated Bandit problem.

def estimate(OldEstimate, StepSize, Target):
    'An incremental implementation of average.'
    
    NewEstimate = OldEstimate + StepSize * (Target - OldEstimate)
    return NewEstimate

random.seed(6885)
numTimeStep = 10000
q_h = np.zeros(numTimeStep + 1) # Q Value estimate with 1/step step size
q_f = np.zeros(numTimeStep + 1) # Q value estimate with fixed step size
FixedStepSize = 0.5 #A large number to exaggerate the difference
for step in range(1, numTimeStep + 1):
    if step < numTimeStep / 2:
        r = random.gauss(mu = 1, sigma = 0.1)
    else:
        r = random.gauss(mu = 3, sigma = 0.1)
    
    q_h[step] = estimate(q_h[step-1], 1/step, r)
    q_f[step] = estimate(q_f[step-1], FixedStepSize, r)
    
q_h = q_h[1:]
q_f = q_f[1:]
import plotly.express as px
import pandas as pd
df = pd.DataFrame({"Decaying Step Size":q_h, "Fixed Step Size":q_f, "step": range(1, numTimeStep + 1)})
df = df.melt(id_vars=["step"], value_vars=["Decaying Step Size", "Fixed Step Size"], value_name="Q Value Estimate", var_name="Estimate Step Size")
fig = px.line(df, x='step', color='Estimate Step Size', y='Q Value Estimate', title="Q Value Estimate for estimaters using a fixed or decaying step size")
fig.show()

2. \(\epsilon\)-Greedy for Exploration

In Reinforcement Learning, we are always faced with the dilemma of exploration and exploitation. \(\epsilon\)-Greedy is a trade-off between them. You are gonna implement Greedy and \(\epsilon\)-Greedy. We combine these two policies in one function by treating Greedy as \(\epsilon\)-Greedy where \(\epsilon = 0\). Edit the function epsilon_greedy in ./RLalgs/utils.py.

def epsilon_greedy(value, e, seed=None):
    '''
    Implement Epsilon-Greedy policy.
    
    Inputs:
    value: numpy ndarray
            A vector of values of actions to choose from
    e: float
            Epsilon
    seed: None or int
            Assign an integer value to remove the randomness
    
    Outputs:
    action: int
            Index of the chosen action
    '''
    
    assert len(value.shape) == 1
    assert 0 <= e <= 1
    
    if seed != None:
        np.random.seed(seed)
    
    max_val = max(value)
    num_actions = len(value)
    num_max_vals = len([True for v in value if v == max_val])
    high_prob = (1-e)/num_max_vals
    lower_prob = e/num_actions
    action_probs = [lower_prob if v != max_val else high_prob + lower_prob for v in value]
    assert abs(1 - sum(action_probs)) < 0.001
    action = np.random.choice(range(len(value)), p=action_probs)
    return action
np.random.seed(6885) #Set the seed to cancel the randomness
q = np.random.normal(0, 1, size = 5)
greedy_action = epsilon_greedy(q, 0, seed=334535) #Use epsilon = 0 for Greedy
e_greedy_action = epsilon_greedy(q, 0.1, seed=334535) #Use epsilon = 0.1
print('Values:')
print(q)
print('Greedy Choice =', greedy_action)
print('Epsilon-Greedy Choice =', e_greedy_action)
Values:
[ 0.61264537  0.27923079 -0.84600857  0.05469574 -1.09990968]
Greedy Choice = 0
Epsilon-Greedy Choice = 0

You should get the following results.
Values:
[ 0.61264537 0.27923079 -0.84600857 0.05469574 -1.09990968]
Greedy Choice = 0

3. Frozen Lake Environment

env = gym.make('FrozenLake-v0')

3.1 Derive Q value from V value

Edit function action_evaluation in ./RLalgs/utils.py.
TIPS: \(q(s, a)=\sum_{s',r}p(s',r|s,a)(r+\gamma v(s'))\)

def action_evaluation(env, gamma, v):
    '''
    Convert V value to Q value with model.
    
    Inputs:
    env: OpenAI Gym environment
            env.P: dictionary
                    P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    gamma: float
            Discount value
    v: numpy ndarray
            Values of states
            
    Outputs:
    q: numpy ndarray
            Q values of all state-action pairs
    '''
    
    nS = env.nS
    nA = env.nA
    q = np.zeros((nS, nA))
    for s in range(nS):
        for a in range(nA):
            q_i = 0
            for prob, next_state, reward, is_terminal in env.P[s][a]:
                q_i += prob*(reward + gamma*v[next_state])
            q[s][a] = q_i
    return q
v = np.ones(16)
q = action_evaluation(env = env.env, gamma = 1, v = v)
print('Action values:')
print(q)

np.apply_along_axis(epsilon_greedy, 1, q, e=0)
Action values:
[[1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.         1.         1.        ]
 [1.         1.33333333 1.33333333 1.33333333]
 [1.         1.         1.         1.        ]]
array([1, 0, 1, 1, 3, 3, 3, 3, 3, 3, 1, 0, 3, 0, 2, 0])

You should get Q values all equal to one except at State 14

Pseudo-codes of the following four algorithms can be found on Page 80, 83, 130, 131 of the Sutton’s book.

3.2 Model-based RL algorithms

def action_selection(q):
    '''
    Select action from the Q value
    
    Inputs:
    q: numpy ndarray
    
    Outputs:
    actions: int
            The chosen action of each state
    '''
    actions = np.apply_along_axis(epsilon_greedy, 1, q, e=0)
    # actions = np.argmax(q, axis = 1)
    return actions 

def action_evaluation(env, gamma, v):
    '''
    Convert V value to Q value with model.
    
    Inputs:
    env: OpenAI Gym environment
            env.P: dictionary
                    P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    gamma: float
            Discount value
    v: numpy ndarray
            Values of states
            
    Outputs:
    q: numpy ndarray
            Q values of all state-action pairs
    '''
    
    nS = env.nS
    nA = env.nA
    q = np.zeros((nS, nA))
    for s in range(nS):
        for a in range(nA):
            q_i = 0
            for prob, next_state, reward, is_terminal in env.P[s][a]:
                q_i += prob*(reward + gamma*v[next_state])
            q[s][a] = q_i
    return q

def render(env, policy):
    '''
    Play games with the given policy
    
    Inputs:
    env: OpenAI Gym environment 
            env.P: dictionary
                    P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    policy: numpy ndarray
            Maps state to action
    '''
    
    state = env.reset()
    terminal = False
    
    while not terminal:
        action = policy[state]
        state, reward, terminal, prob = env.step(action)
        env.render()
        sleep(1)
    
    print('Episode ends. Reward =', reward)

3.2.1 Policy Iteration

Edit the function policy_iteration and relevant functions in ./RLalgs/pi.py to implement the Policy Iteration Algorithm.

def policy_iteration(env, gamma, max_iteration, theta):
    """
    Implement Policy iteration algorithm.

    Inputs:
    env: OpenAI Gym environment.
            env.P: dictionary
                    P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    gamma: float
            Discount factor.
    max_iteration: int
            The maximum number of iterations to run before stopping.
    theta: float
            The threshold of convergence.
            
    Outputs:
    V: numpy.ndarray
    policy: numpy.ndarray
    numIterations: int
    """

    V = np.zeros(env.nS)
    policy = np.zeros(env.nS, dtype = np.int32)
    policy_stable = False
    numIterations = 0

    v_old = np.zeros(env.nS)
    
    while not policy_stable and numIterations < max_iteration:
        #Implement it with function policy_evaluation and policy_improvement
        ############################
        # YOUR CODE STARTS HERE

        V = policy_evaluation(env, policy, gamma, theta)
        new_policy, policy_stable = policy_improvement(env, V, policy, gamma)
        
        policy = new_policy
        # YOUR CODE ENDS HERE
        ############################
        numIterations += 1
        v_old = V
        
    return V, policy, numIterations


def policy_evaluation(env, policy, gamma, theta):
    """
    Evaluate the value function from a given policy.

    Inputs:
    env: OpenAI Gym environment.
            env.P: dictionary
                    
            env.nS: int
                    number of states
            env.nA: int
                    number of actions

    gamma: float
            Discount factor.
    policy: numpy.ndarray
            The policy to evaluate. Maps states to actions.
    theta: float
            The threshold of convergence.
    
    Outputs:
    V: numpy.ndarray
            The value function from the given policy.
    """
    nS = env.nS
    nA = env.nA
    v_old = np.zeros(nS)
    converged = False
    max_iteration = 10000
    numIterations = 0
    V = np.zeros(nS)
    
    while not converged and numIterations < max_iteration:
        
        q = action_evaluation(env, gamma, v_old)
        V = np.array([q[i, x] for i, x in enumerate(policy.tolist())])
        value_change = max(abs((V - v_old)))
        converged = value_change < theta
        numIterations += 1
        v_old = V

    return V


def policy_improvement(env, value_from_policy, policy, gamma):
    """
    Given the value function from policy, improve the policy.

    Inputs:
    env: OpenAI Gym environment
            env.P: dictionary
                    P[state][action] is tuples with (probability, nextstate, reward, terminal)
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions

    value_from_policy: numpy.ndarray
            The value calculated from the policy
    policy: numpy.ndarray
            The previous policy.
    gamma: float
            Discount factor.

    Outputs:
    new policy: numpy.ndarray
            An array of integers. Each integer is the optimal action to take
            in that state according to the environment dynamics and the
            given value function.
    policy_stable: boolean
            True if the "optimal" policy is found, otherwise false
    """
    q = action_evaluation(env, gamma, value_from_policy)
    new_policy = action_selection(q)
    policy_stable = all(policy == new_policy)
    return new_policy, policy_stable
V, policy, numIterations = policy_iteration(env = env.env, gamma = 1, max_iteration = 500, theta = 1e-7)
print('State values:')
print(V)
print('Number of iterations to converge =', numIterations)
State values:
[0.82352709 0.82352631 0.82352576 0.82352547 0.82352726 0.
 0.5294101  0.         0.82352758 0.82352804 0.76470468 0.
 0.         0.88235197 0.94117597 0.        ]
Number of iterations to converge = 500

You should get values close to:
State values:
[0.82352774 0.8235272 0.82352682 0.82352662 0.82352791 0.
0.52941063 0. 0.82352817 0.82352851 0.76470509 0.
0. 0.88235232 0.94117615 0.]

3.2.2 Value Iteration

Edit the function value_iteration and relevant functions in ./RLalgs/vi.py to implement the Value Iteration Algorithm.

def value_iteration(env, gamma, max_iteration, theta):
    """
    Implement value iteration algorithm. 

    Inputs:
    env: OpenAI Gym environment.
            env.P: dictionary
                    the transition probabilities of the environment
                    P[state][action] is list of tuples. Each tuple contains probability, nextstate, reward, terminal
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    gamma: float
            Discount factor.
    max_iteration: int
            The maximum number of iterations to run before stopping.
    theta: float
            The threshold of convergence.
    
    Outputs:
    V: numpy.ndarray
    policy: numpy.ndarray
    numIterations: int
            Number of iterations
    """

    V = np.zeros(env.nS)
    v_old = np.zeros(env.nS)
    numIterations = 0
    max_iteration = 10000
    numIterations = 0
    converged = False
    
    while not converged and numIterations < max_iteration:
        for s in range(env.nS):
                max_v_i = -9999999999
                for a in range(env.nA):
                        for prob, next_state, reward, is_terminal in env.P[s][a]:
                              v_i = 0
                              for prob, next_state, reward, is_terminal in env.P[s][a]:
                                  v_i += prob*(reward + gamma*v_old[next_state])
                              max_v_i = max(max_v_i, v_i)
                        V[s] = max_v_i
        value_change = max(abs((V - v_old)))
        converged = value_change < theta
        v_old = np.array([x for x in V])
        numIterations += 1
    
    #Extract the "optimal" policy from the value function
    policy = extract_policy(env, V, gamma)
    
    return V, policy, numIterations

def extract_policy(env, v, gamma):

    """ 
    Extract the optimal policy given the optimal value-function.

    Inputs:
    env: OpenAI Gym environment.
            env.P: dictionary
                    P[state][action] is tuples with (probability, nextstate, reward, terminal)
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    v: numpy.ndarray
        value function
    gamma: float
        Discount factor. Number in range [0, 1)
    
    Outputs:
    policy: numpy.ndarray
    """

    policy = np.zeros(env.nS, dtype = np.int32)
    for s in range(env.nS):
        for a in range(env.nA):
                max_v_i = -99999999
                for prob, next_state, reward, is_terminal in env.P[s][a]:
                        if v[next_state] > max_v_i:
                                policy[s] = next_state
    return policy
V, policy, numIterations = value_iteration(env = env.env, gamma = 1, max_iteration = 500, theta = 1e-7)
print('State values:')
print(V)
print('Number of iterations to converge =', numIterations)
State values:
[0.82352709 0.82352631 0.82352576 0.82352548 0.82352726 0.
 0.52941011 0.         0.82352759 0.82352804 0.76470468 0.
 0.         0.88235197 0.94117597 0.        ]
Number of iterations to converge = 525

You should get values close to:
State values:
[0.82352773 0.82352718 0.8235268 0.8235266 0.8235279 0.
0.52941062 0. 0.82352816 0.8235285 0.76470509 0.
0. 0.88235231 0.94117615 0.]

3.3 Model free RL algorithms

3.3.1 Q-Learning

Edit the function QLearning in ./RLalgs/ql.py to implement the Q-Learning Algorithm.

def QLearning(env, num_episodes, gamma, lr, e):
    """
    Implement the Q-learning algorithm following the epsilon-greedy exploration.

    Inputs:
    env: OpenAI Gym environment 
            env.P: dictionary
                    P[state][action] are tuples of tuples tuples with (probability, nextstate, reward, terminal)
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    num_episodes: int
            Number of episodes of training
    gamma: float
            Discount factor.
    lr: float
            Learning rate.
    e: float
            Epsilon value used in the epsilon-greedy method.

    Outputs:
    Q: numpy.ndarray
    """

    Q = np.zeros((env.nS, env.nA))

    for epi in range(num_episodes):
        terminal = False
        state_i = env.reset()
        action_i = epsilon_greedy(Q[state_i], e=e)
        while not terminal:
                last_state, last_action = state_i, action_i
                state_i, last_reward, terminal, prob = env.step(last_action)
                action_i = epsilon_greedy(Q[state_i], e=e)
                greedy_action_i = epsilon_greedy(Q[state_i], e=0)
                Q[last_state][last_action] = Q[last_state][last_action] + lr*(last_reward + gamma*Q[state_i][greedy_action_i] - Q[last_state][last_action])

    return Q
Q = QLearning(env = env.env, num_episodes = 1000, gamma = 1, lr = 0.1, e = 0.1)
print('Action values:')
print(Q)
Action values:
[[3.58539292e-01 2.39550887e-01 2.98189563e-01 2.56742930e-01]
 [6.64188268e-02 4.18412961e-02 2.29107752e-02 1.51815956e-01]
 [1.62123746e-02 3.26191309e-02 4.27164044e-02 9.30353299e-03]
 [1.10126088e-02 3.35943485e-04 0.00000000e+00 1.96347873e-03]
 [3.59428466e-01 1.45038233e-01 2.09359658e-01 1.68825983e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [2.25644101e-02 7.98677168e-02 3.14864096e-03 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [1.84506113e-01 2.44236935e-01 2.02371465e-01 3.58998308e-01]
 [1.16150788e-01 3.55291016e-01 1.74411426e-01 1.70310615e-01]
 [3.82189738e-01 1.98516795e-01 1.89756109e-01 9.77759359e-02]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [1.19007991e-01 2.29408571e-01 1.06694202e-01 3.81043329e-01]
 [2.10428202e-01 3.74655709e-01 7.65887974e-01 4.36852489e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]]

3.3.2 SARSA

Edit the function SARSA in ./RLalgs/sarsa.py to implement the SARSA Algorithm.

def SARSA(env, num_episodes, gamma, lr, e):
    """
    Implement the SARSA algorithm following epsilon-greedy exploration.

    Inputs:
    env: OpenAI Gym environment 
            env.P: dictionary
                    P[state][action] are tuples of tuples tuples with (probability, nextstate, reward, terminal)
                    probability: float
                    nextstate: int
                    reward: float
                    terminal: boolean
            env.nS: int
                    number of states
            env.nA: int
                    number of actions
    num_episodes: int
            Number of episodes of training
    gamma: float
            Discount factor.
    lr: float
            Learning rate.
    e: float
            Epsilon value used in the epsilon-greedy method.

    Outputs:
    Q: numpy.ndarray
            State-action values
    """
    
    Q = np.zeros((env.nS, env.nA))
    
    for epi in range(num_episodes):
        terminal = False
        state_i = env.reset()
        action_i = epsilon_greedy(Q[state_i], e=e)
        while not terminal:
                last_state, last_action = state_i, action_i
                state_i, last_reward, terminal, prob = env.step(last_action)
                action_i = epsilon_greedy(Q[state_i], e=e)
                Q[last_state][last_action] = Q[last_state][last_action] + lr*(last_reward + gamma*Q[state_i][action_i] - Q[last_state][last_action])
        
    return Q
Q = SARSA(env = env.env, num_episodes = 1000, gamma = 1, lr = 0.1, e = 0.1)
print('Action values:')
print(Q)
terminal states are {5, 7, 11, 12, 15}
Action values:
[[2.67582935e-01 2.18605347e-01 2.19526187e-01 2.12913140e-01]
 [4.11010140e-02 4.91476686e-02 2.68654550e-02 1.28709183e-01]
 [1.48023510e-02 1.36887908e-02 3.85594964e-03 5.04395713e-02]
 [8.80898724e-03 1.31326087e-02 3.25511308e-03 2.85275602e-03]
 [2.78687763e-01 1.74856389e-01 1.50320541e-01 1.75055753e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 6.43856070e-04 1.46441032e-02]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [1.65101120e-01 1.61979515e-01 1.81075025e-01 3.33316157e-01]
 [2.26313070e-01 3.61044914e-01 2.42705704e-01 1.83277620e-01]
 [8.15992551e-02 2.74304836e-01 6.77059130e-02 1.56483139e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [9.99441343e-02 3.44255788e-01 5.10698706e-01 2.54869103e-01]
 [4.07597092e-01 2.88810844e-01 6.99259866e-01 3.88493659e-01]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]]

4. Exploration VS. Exploitation

Below is a study on balancing between exploration and exploitation in a 10 armed bandit problem.

import gym
from gym import spaces
from gym.utils import seeding
import tqdm

class ArmedBanditEnv(gym.Env):
    
    def __init__(self, reward_dist):
        self.reward_dist = reward_dist
        self.action_space = spaces.Discrete(len(reward_dist))
        self.info = {"Armed Bandit Test Env": ""}
        self.seed()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        assert self.action_space.contains(action)
        reward = 0
        done = True
        reward = np.random.normal(self.reward_dist[action][0], self.reward_dist[action][1])
        return [0], reward, done, self.info

    def reset(self):
        return [0]

    def render(self, mode='human', close=False):
        pass

def simulate_bandits(num_band_problems = 500, num_iterations = 2000, reward_mean_std=1):
  output = []
  for bandit_prob_i in tqdm.tqdm(range(num_band_problems)):
    bandit_arm_avg_std = [[np.random.normal(0, reward_mean_std), 1] for _ in range(10)]
    env = ArmedBanditEnv(bandit_arm_avg_std)
    for e in [0, 0.01, 0.1]:
      Q = np.zeros(10)
      avg_reward = 0
      for iter_i in range(1, num_iterations + 1):
          action_i = epsilon_greedy(Q, e=e)
          state_i, reward, terminal, info = env.step(action_i)
          Q[action_i] = estimate(Q[action_i], 1/iter_i, reward)
          avg_reward = estimate(avg_reward, 1/iter_i, reward)
          output.append({"bandit_prob_i": bandit_prob_i, "step": iter_i, "epsilon": str(e), "avg_reward": avg_reward})

  df = pd.DataFrame(output)
  return df
df_2 = simulate_bandits(num_band_problems=1000, num_iterations=1000, reward_mean_std=1)
100%|██████████| 1000/1000 [04:58<00:00,  3.35it/s]
import plotly.express as px
import pandas as pd
df_2_grouped = df_2.groupby(["epsilon", "step"], as_index=False).mean()
df_2_grouped = df_2_grouped.drop("bandit_prob_i", axis=1)
df_2_grouped = df_2_grouped.sort_values("step")
fig = px.line(df_2_grouped, x='step', color='epsilon', y='avg_reward', title="Avg Reward For 10 Armed Bandit Problem, Reward mean is normal(0,1)")
fig.show()