Source code for or_suite.agents.rl.discrete_ql

import numpy as np
from .. import Agent
from gym import spaces

''' epsilon Net agent '''


[docs]class DiscreteQl(Agent): """ Q-Learning algorithm implemented for enviroments with discrete states and actions using the metric induces by the l_inf norm TODO: Documentation Attributes: epLen: (int) number of steps per episode scaling: (float) scaling parameter for confidence intervals action_space: (MultiDiscrete) the action space state_space: (MultiDiscrete) the state space action_size: (list) representing the size of the action sapce state_size: (list) representing the size of the state sapce matrix_dim: (tuple) a concatenation of epLen, state_size, and action_size used to create the estimate arrays of the appropriate size qVals: (list) The Q-value estimates for each episode, state, action tuple num_visits: (list) The number of times that each episode, state, action tuple has been visited """
[docs] def __init__(self, action_space, observation_space, epLen, scaling): self.state_space = observation_space if isinstance(action_space, spaces.Discrete): self.action_space = spaces.MultiDiscrete( nvec=np.array([action_space.n])) self.multiAction = False else: self.action_space = action_space self.multiAction = True self.epLen = epLen self.scaling = scaling # starts calculating total dimension for the matrix of estimates of Q Values dim = np.concatenate(( np.array([self.epLen]), self.state_space.nvec, self.action_space.nvec)) self.matrix_dim = dim # Initialize with upper bound on max reward via H*max_one_step_reward self.qVals = self.epLen * np.ones(self.matrix_dim, dtype=np.float32) # Set max_reward as 1 assuming that the reward is normalized max_reward = 1 self.qVals = self.epLen * max_reward * self.qVals # might need to normalize rewards in your rideshare environment code # but otherwise can just use ambulance, that one is already good. self.num_visits = np.zeros(self.matrix_dim, dtype=np.float32)
[docs] def update_config(self, env, config): ''' Update agent information based on the config__file''' self.environment = env pass ''' Resets the agent by overwriting all of the estimates back to initial values '''
[docs] def update_parameters(self, param): """Update the scaling parameter. Args: param: (float) The new scaling value to use""" self.scaling = param
def reset(self): self.qVals = self.epLen * np.ones(self.matrix_dim, dtype=np.float32) self.num_visits = np.zeros(self.matrix_dim, dtype=np.float32) ''' Adds the observation to records by using the update formula '''
[docs] def update_obs(self, obs, action, reward, newObs, timestep, info): '''Add observation to records Args: obs: (list) The current state action: (list) The action taken reward: (int) The calculated reward newObs: (list) The next observed state timestep: (int) The current timestep ''' if not self.multiAction: action = [action] dim = tuple(np.append(np.append([timestep], obs), action)) self.num_visits[dim] += 1 t = self.num_visits[dim] lr = (self.epLen + 1) / (self.epLen + t) bonus = self.scaling * np.sqrt(1 / t) if timestep == self.epLen-1: vFn = 0 else: vFn = np.max(self.qVals[np.append([timestep+1], newObs)]) vFn = min(self.epLen, vFn) self.qVals[dim] = (1 - lr) * self.qVals[dim] + \ lr * (reward + vFn + bonus)
[docs] def update_policy(self, k): '''Update internal policy based upon records''' pass
[docs] def pick_action(self, state, step): ''' Select action according to a greedy policy Args: state: int - current state timestep: int - timestep *within* episode Returns: list: action ''' # returns the state location and takes action based on # maximum q value qFn = self.qVals[tuple(np.append([step], state))] action = np.asarray(np.where(qFn == qFn.max())) index = np.random.choice(len(action[0])) action = action[:, index] if not self.multiAction: action = action[0] return action