Source code for or_suite.envs.ambulance.ambulance_graph

"""Implementation of an RL environment in a discrete graph space.

An ambulance environment over a simple graph.  An agent interacts through 
the environment by selecting locations for various ambulances over the graph.  Afterwards 
a patient arrives and the ambulance most go and serve the arrival, paying a 
cost to travel.
"""

import numpy as np
import gym
from gym import spaces
import networkx as nx
import math

from .. import env_configs

# ------------------------------------------------------------------------------


[docs]class AmbulanceGraphEnvironment(gym.Env): """ A graph of nodes V with edges between the nodes E; each node represents a location where an ambulance could be stationed or a call could come in. The edges between nodes are undirected and have a weight representing the distance between those two nodes. The nearest ambulance to a call is determined by computing the shortest path from each ambulance to the call, and choosing the ambulance with the minimum length path. The calls arrive according to a prespecified iid probability distribution that can change over time. Attributes: epLen: The int number of time steps to run the experiment for. arrival_dist: A lambda arrival distribution for calls over the observation space; takes an integer (step) and returns an integer that corresponds to a node in the observation space. alpha: A float controlling proportional difference in cost to move between calls and to respond to a call. from_data: A bool indicator for whether the arrivals will be read from data or randomly generated. arrival_data: An int list only used if from_data is True, this is a list of arrivals, where each arrival corresponds to a node in the observation space. episode_num: The (int) current episode number, increments every time the environment is reset. graph: A networkx Graph representing the observation space. num_nodes: The (int) number of nodes in the graph. state: An int list representing the current state of the environment. timestep: The (int) timestep the current episode is on. lengths: A symmetric float matrix containing the distance between each pair of nodes. starting_state: An int list containing the starting locations for each ambulance. num_ambulance: The (int) number of ambulances in the environment. action_space: (Gym.spaces MultiDiscrete) Actions must be the length of the number of ambulances, every entry is an int corresponding to a node in the graph. observation_space: (Gym.spaces MultiDiscrete) The environment state must be the length of the number of ambulances, every entry is an int corresponding to a node in the graph. """ metadata = {'render.modes': ['human']}
[docs] def __init__(self, config=env_configs.ambulance_graph_default_config): """ Args: config: A dictionary (dict) containing the parameters required to set up a metric ambulance environment. epLen: The (int) number of time steps to run the experiment for. arrival_dist: A (lambda) arrival distribution for calls over the observation space; takes an integer (step) and returns an integer that corresponds to a node in the observation space. alpha: A float controlling proportional difference in cost to move between calls and to respond to a call. from_data: A bool indicator for whether the arrivals will be read from data or randomly generated. data: An int list only needed if from_data is True, this is a list of arrivals, where each arrival corresponds to a node in the observation space. edges: A tuple list where each tuple corresponds to an edge in the graph. The tuples are of the form (int1, int2, {'travel_time': int3}). int1 and int2 are the two endpoints of the edge, and int3 is the time it takes to travel from one endpoint to the other. starting_state: An int list containing the starting locations for each ambulance. num_ambulance: The (int) number of ambulances in the environment. """ super(AmbulanceGraphEnvironment, self).__init__() self.config = config self.epLen = config['epLen'] self.alpha = config['alpha'] self.graph = nx.Graph(config['edges']) self.num_nodes = self.graph.number_of_nodes() self.starting_state = config['starting_state'] self.state = self.starting_state self.timestep = 0 self.num_ambulance = config['num_ambulance'] self.arrival_dist = config['arrival_dist'] self.from_data = config['from_data'] self.lengths = self.find_lengths(self.graph, self.num_nodes) if self.from_data: self.arrival_data = config['data'] self.episode_num = 0 # Creates an array stored in space_array the length of the number of ambulances # where every entry is the number of nodes in the graph num_nodes = self.graph.number_of_nodes() space_array = np.full(self.num_ambulance, num_nodes) # Creates a space where every ambulance can be located at any of the nodes self.action_space = spaces.MultiDiscrete(space_array) # The definition of the observation space is the same as the action space self.observation_space = spaces.MultiDiscrete(space_array)
[docs] def reset(self): """Reinitializes variables and returns the starting state.""" self.timestep = 0 self.state = self.starting_state if self.from_data: self.episode_num += 1 return np.asarray(self.starting_state)
def get_config(self): return self.config
[docs] def step(self, action): """ Move one step in the environment. Args: action: An int list of nodes the same length as the number of ambulances, where each entry i in the list corresponds to the chosen location for ambulance i. Returns: float, int, bool: reward: A float representing the reward based on the action chosen. newState: An int list representing the state of the environment after the action and call arrival. done: A bool flag indicating the end of the episode. """ assert self.action_space.contains(action) old_state = self.state # The location of the new arrival is chosen randomly from among the nodes # in the graph according to the arrival distribution prob_list = [] if self.from_data: dataset_step = (self.episode_num * self.epLen + self.timestep) % len(self.arrival_data) prob_list = self.arrival_dist( dataset_step, self.num_nodes, self.arrival_data) else: prob_list = self.arrival_dist(self.timestep, self.num_nodes) new_arrival = np.random.choice(self.num_nodes, p=prob_list) # Finds the distance traveled by all the ambulances from the old state to # the chosen action, assuming that each ambulance takes the shortest path, # which is stored in total_dist_oldstate_to_action # Also finds the closest ambulance to the call based on their locations at # the end of the action, using shortest paths shortest_length = 999999999 closest_amb_idx = 0 closest_amb_loc = action[closest_amb_idx] total_dist_oldstate_to_action = 0 for amb_idx in range(len(action)): new_length = nx.shortest_path_length( self.graph, action[amb_idx], new_arrival, weight='travel_time') total_dist_oldstate_to_action += nx.shortest_path_length( self.graph, self.state[amb_idx], action[amb_idx], weight='dist') if new_length < shortest_length: shortest_length = new_length closest_amb_idx = amb_idx closest_amb_loc = action[closest_amb_idx] else: continue # Update the state of the system according to the action taken and change # the location of the closest ambulance to the call to the call location newState = np.array(action) newState[closest_amb_idx] = new_arrival obs = newState # The reward is a linear combination of the distance traveled to the action # and the distance traveled to the call # alpha controls the tradeoff between cost to travel between arrivals and # cost to travel to a call # The reward is negated so that maximizing it will minimize the distance reward = -1 * (self.alpha * total_dist_oldstate_to_action + (1 - self.alpha) * shortest_length) # The info dictionary is used to pass the location of the most recent arrival # so it can be used by the agent info = {'arrival': new_arrival} if self.timestep != (self.epLen-1): done = False else: done = True self.state = newState self.timestep += 1 assert self.observation_space.contains(self.state) return self.state, reward, done, info
[docs] def render(self, mode='console'): if mode != 'console': raise NotImplementedError()
[docs] def close(self): pass
[docs] def find_lengths(self, graph, num_nodes): """ Given a graph, find_lengths first calculates the pairwise shortest distance between all the nodes, which is stored in a (symmetric) matrix. """ dict_lengths = dict(nx.all_pairs_dijkstra_path_length( graph, cutoff=None, weight='travel_time')) lengths = np.zeros((num_nodes, num_nodes)) for node1 in range(num_nodes): for node2 in range(num_nodes): lengths[node1, node2] = dict_lengths[node1][node2] return lengths