Source code for or_suite.agents.rl.enet_ql

import numpy as np
from .. import Agent

''' epsilon Net agent '''


[docs]class eNetQL(Agent): """ Uniform Discretization Q-Learning algorithm implemented for enviroments with continuous states and actions using the metric induces by the l_inf norm Attributes: epLen: (int) number of steps per episode scaling: (float) scaling parameter for confidence intervals action_net: (list) of a discretization of action space state_net: (list) of a discretization of the state space state_action_dim: d_1 + d_2 dimensions of state and action space respectively """
[docs] def __init__(self, action_net, state_net, epLen, scaling, state_action_dim): self.state_net = np.resize( state_net, (state_action_dim[0], len(state_net))).T self.action_net = np.resize( action_net, (state_action_dim[1], len(action_net))).T self.epLen = epLen self.scaling = scaling self.state_action_dim = state_action_dim # starts calculating total dimension for the matrix of estimates of Q Values dim = [self.epLen] dim += self.state_action_dim[0] * [len(state_net)] dim += self.state_action_dim[1] * [len(action_net)] self.matrix_dim = dim self.qVals = np.ones(self.matrix_dim, dtype=np.float32) * self.epLen self.num_visits = np.zeros(self.matrix_dim, dtype=np.float32)
[docs] def update_config(self, env, config): ''' Update agent information based on the config__file''' self.environment = env pass ''' Resets the agent by overwriting all of the estimates back to zero '''
def update_parameters(self, param): self.scaling = param def reset(self): self.qVals = np.ones(self.matrix_dim, dtype=np.float32) * self.epLen self.num_visits = np.zeros(self.matrix_dim, dtype=np.float32) ''' Adds the observation to records by using the update formula '''
[docs] def update_obs(self, obs, action, reward, newObs, timestep, info): '''Add observation to records''' # returns the discretized state and action location state_discrete = np.argmin( (np.abs(self.state_net - np.asarray(obs))), axis=0) action_discrete = np.argmin( (np.abs(self.action_net - np.asarray(action))), axis=0) state_new_discrete = np.argmin( (np.abs(self.state_net - np.asarray(newObs))), axis=0) dim = (timestep,) + tuple(state_discrete) + tuple(action_discrete) self.num_visits[dim] += 1 t = self.num_visits[dim] lr = (self.epLen + 1) / (self.epLen + t) bonus = self.scaling * np.sqrt(1 / t) if timestep == self.epLen-1: vFn = 0 else: vFn = np.max(self.qVals[(timestep+1,) + tuple(state_new_discrete)]) vFn = min(self.epLen, vFn) self.qVals[dim] = (1 - lr) * self.qVals[dim] + \ lr * (reward + vFn + bonus)
[docs] def get_num_arms(self): ''' Returns the number of arms''' return self.epLen * len(self.state_net)**(self.state_action_dim[0]) * len(self.action_net)**(self.state_action_dim[1])
[docs] def update_policy(self, k): '''Update internal policy based upon records''' pass
[docs] def pick_action(self, state, step): ''' Select action according to a greedy policy Args: state: int - current state timestep: int - timestep *within* episode Returns: int: action ''' # returns the discretized state location and takes action based on # maximum q value state_discrete = np.argmin( (np.abs(np.asarray(self.state_net) - np.asarray(state))), axis=0) qFn = self.qVals[(step,)+tuple(state_discrete)] action = np.asarray(np.where(qFn == qFn.max())) a = len(action[0]) index = np.random.choice(len(action[0])) actions = () for val in action.T[index]: actions += (self.action_net[:, 0][val],) return np.asarray(actions)