import numpy as np
from gym import spaces
from .. import Agent
import itertools
[docs]class DiscreteMB(Agent):
"""
Uniform model-based algorithm implemented for MultiDiscrete enviroments
and actions using the metric induces by the l_inf norm
Attributes:
epLen: (int) number of steps per episode
scaling: (float) scaling parameter for confidence intervals
action_space: (MultiDiscrete) the action space
state_space: (MultiDiscrete) the state space
action_size: (list) representing the size of the action sapce
state_size: (list) representing the size of the state sapce
alpha: (float) parameter for prior on transition kernel
flag: (bool) for whether to do full step updates or not
matrix_dim: (tuple) a concatenation of epLen, state_size, and action_size used to create the estimate arrays of the appropriate size
qVals: (list) The Q-value estimates for each episode, state, action tuple
num_visits: (list) The number of times that each episode, state, action tuple has been visited
vVals: (list) The value function values for every step, state pair
rEst: (list) Estimates of the reward for a step, state, action tuple
pEst: (list) Estimates of the number of times that each step, state, action, new_state tuple is considered
"""
[docs] def __init__(self, action_space, state_space, epLen, scaling, alpha, flag):
self.epLen = epLen
self.scaling = scaling
self.alpha = alpha
self.flag = flag
# TODO: Get actual state and action spaces
if isinstance(action_space, spaces.Discrete):
self.action_space = spaces.MultiDiscrete(
nvec=np.array([action_space.n]))
self.multiAction = False
else:
self.action_space = action_space
self.multiAction = True
self.state_space = state_space
# sizes of action and state spaces
self.action_size = self.action_space.nvec
self.state_size = self.state_space.nvec
self.matrix_dim = np.concatenate((
np.array([self.epLen]), self.state_size, self.action_size))
# Matrix of size h*S*A
self.qVals = np.ones(self.matrix_dim, dtype=np.float32) * self.epLen
# matrix of size h*S*A
self.num_visits = np.zeros(self.matrix_dim, dtype=np.float32)
# matrix of size h*S
self.vVals = np.ones(np.append(np.array([self.epLen]), self.state_size),
dtype=np.float32) * self.epLen
# matrix of size h*S*A
self.rEst = np.zeros(self.matrix_dim, dtype=np.float32)
# matrix of size h*S*A*S
self.pEst = np.zeros(np.concatenate((
np.array([self.epLen]), self.state_size, self.action_size, self.state_size)),
dtype=np.float32)
# print(self.pEst.shape)
[docs] def reset(self): # TODO: reset to the way you initialize them
'''
Resets the agent by overwriting all of the estimates back to initial values
'''
self.qVals = np.ones(self.matrix_dim, dtype=np.float32) * self.epLen
self.vVals = np.ones(np.append(np.array([self.epLen]), self.state_size),
dtype=np.float32) * self.epLen
self.rEst = np.zeros(self.matrix_dim, dtype=np.float32)
self.num_visits = np.zeros(self.matrix_dim, dtype=np.float32)
self.pEst = np.zeros(np.concatenate((
np.array([self.epLen]), self.state_size, self.action_size, self.state_size)),
dtype=np.float32)
[docs] def update_parameters(self, param):
"""Update the scaling parameter.
Args:
param: (int) The new scaling value to use"""
self.scaling = param
[docs] def update_obs(self, obs, action, reward, newObs, timestep, info):
'''Add observation to records
Args:
obs: (list) The current state
action: (list) The action taken
reward: (int) The calculated reward
newObs: (list) The next observed state
timestep: (int) The current timestep
'''
# print(
# f'Adding on: {timestep}, state: {obs}, action: {action}, reward: {reward}, newObs: {newObs}')
dim = tuple(np.append(np.append([timestep], obs), action))
self.num_visits[dim] += 1
new_obs_dim = tuple(
np.append(np.append(np.append([timestep], obs), action), newObs))
self.pEst[new_obs_dim] += 1
# timestep, obs, action, newObs
t = self.num_visits[dim]
self.rEst[dim] = (
(t - 1) * self.rEst[dim] + reward) / t
# print(self.num_visits[dim], self.pEst[dim], self.rEst[dim])
[docs] def update_policy(self, k):
'''Update internal policy based upon records'''
# Update value estimates
if self.flag: # update estimates via full step updates
for h in np.arange(self.epLen - 1, -1, -1):
for state in itertools.product(*[np.arange(self.state_size[i]) for i in range(self.state_space.shape[0])]):
for action in itertools.product(*[np.arange(self.action_size[j]) for j in range(self.action_space.shape[0])]):
dim = tuple(np.append(np.append([h], state), action))
if self.num_visits[dim] == 0:
self.qVals[dim] = self.epLen
else:
if h == self.epLen - 1:
self.qVals[dim] = min(
self.qVals[dim], self.rEst[dim] + self.scaling / np.sqrt(self.num_visits[dim]))
else:
vEst = min(self.epLen, np.sum(np.multiply(self.vVals[(
h+1,)], self.pEst[dim] + self.alpha) / (np.sum(self.pEst[dim] + self.alpha))))
self.qVals[dim] = min(
self.qVals[dim], self.epLen, self.rEst[dim] + self.scaling / np.sqrt(self.num_visits[dim]) + vEst)
self.vVals[tuple(np.append([h], state))] = min(self.epLen,
self.qVals[tuple(np.append([h], state))].max())
[docs] def pick_action(self, state, step):
'''
Select action according to a greedy policy
Args:
state: int - current state
step: int - timestep *within* episode
Returns:
list: action
'''
if self.flag == False: # updates estimates via one step update
# state_discrete = np.argmin(
# (np.abs(np.asarray(self.state_net) - np.asarray(state))), axis=0)
for action in itertools.product(*[np.arange(self.action_size[i]) for i in range(self.action_space.shape[0])]):
# dim = (step,) + tuple(state) + action
# dim = np.append(np.asarray([step]), np.asarray(
# state), np.asarray(action))
dim = tuple(np.append(np.append([step], state), action))
if self.num_visits[dim] == 0:
self.qVals[dim] == 0
else:
if step == self.epLen - 1:
self.qVals[dim] = min(
self.qVals[dim], self.rEst[dim] + self.scaling / np.sqrt(self.num_visits[dim]))
else:
vEst = min(self.epLen, np.sum(np.multiply(self.vVals[(
step+1,)], self.pEst[dim] + self.alpha) / (np.sum(self.pEst[dim] + self.alpha))))
self.qVals[dim] = min(
self.qVals[dim], self.epLen, self.rEst[dim] + self.scaling / np.sqrt(self.num_visits[dim]) + vEst)
self.vVals[tuple(np.append([step], state))] = min(self.epLen,
self.qVals[tuple(np.append([step], state))].max())
# state_discrete = np.argmin(
# (np.abs(np.asarray(self.state_net) - np.asarray(state))), axis=0)
qFn = self.qVals[tuple(np.append([step], state))]
action = np.asarray(np.where(qFn == qFn.max()))
index = np.random.choice(len(action[0]))
# print(action.T[index])
action = action[:, index]
#action = action[:len(self.state_size), index]
# print(action)
if not self.multiAction:
action = action[0]
return action
# actions = ()
# for val in action.T[index]:
# actions += (self.action_space[:, 0][val],)
# return np.asarray(actions)