Source code for ns_gym.benchmark_algorithms.rats

"""
Risk Averse Tree Search (RATS) algorithm.

Reference: Lecarpentier and Rachelson, "Non-Stationary Markov Decision
Processes: a Worst-Case Approach using Model-Based Reinforcement Learning",
NeurIPS 2019.

Compatible with NS-Gym discrete environments that expose a transition
table on the unwrapped env. The agent reads the model directly from the
env -- it does not call env.step -- so the env must provide:

    env.unwrapped.s         - current discrete state
    env.unwrapped.P[s][a]   - list of (prob, next_state, reward, done) tuples
    env.action_space        - gymnasium.spaces.Discrete

This contract is satisfied by ns_gym.wrappers.NSFrozenLakeWrapper,
ns_gym.wrappers.NSCliffWalkingWrapper, and the ns_gym Bridge env.
"""

import numpy as np
from copy import deepcopy
from gymnasium import spaces

import ns_gym.base as base


def _assert_types(p, types_list):
    assert len(p) == len(types_list), \
        'Error: expected {} parameters received {}'.format(len(types_list), len(p))
    for i, item in enumerate(p):
        assert type(item) == types_list[i], \
            'Error: wrong type, expected {}, received {}'.format(types_list[i], type(item))


def _node_value(node):
    assert node.value is not None, 'Error: node value={}'.format(node.value)
    return node.value


def _get_state(env):
    return getattr(env, 'unwrapped', env).s


def _get_P(env):
    return getattr(env, 'unwrapped', env).P


class DecisionNode:
    """Decision node, labelled by a state."""
    def __init__(self, parent, state, weight, is_terminal, reward=0.0):
        self.parent = parent
        self.state = state
        self.weight = weight
        self.initial_weight = weight
        self.is_terminal = is_terminal
        self.reward = reward
        self.depth = 0 if parent is None else parent.depth + 1
        self.children = []
        self.value = None


class ChanceNode:
    """Chance node, labelled by a state-action pair (state via parent)."""
    def __init__(self, parent, action):
        self.parent = parent
        self.action = action
        self.depth = parent.depth
        self.children = []
        self.value = None


[docs] class RATS(base.Agent): """Risk-Averse Tree Search agent. Args: action_space (gymnasium.spaces.Discrete): action space of the env. gamma (float): discount factor. max_depth (int): planning depth of the minimax tree. L_p (float): Lipschitz constant of the transition kernel in time. L_r (float): Lipschitz constant of the reward in time. tau (float): non-stationarity time scale. """ def __init__(self, action_space, gamma=0.9, max_depth=4, L_p=1.0, L_r=0.0, tau=1.0): super().__init__() self.action_space = action_space self.n_actions = action_space.n self.gamma = gamma self.max_depth = max_depth self.L_p = L_p self.L_r = L_r self.tau = tau self.t_call = 0
[docs] def reset(self, p=None): if p is None: self.__init__(self.action_space) else: _assert_types(p, [spaces.Discrete, float, int]) self.__init__(p[0], p[1], p[2])
[docs] def display(self): print('Displaying RATS agent:') print('Action space :', self.action_space) print('Number of actions:', self.n_actions) print('Gamma :', self.gamma) print('Maximum depth :', self.max_depth)
[docs] def build_tree(self, node, env): P = _get_P(env) if isinstance(node, DecisionNode): if node.depth < self.max_depth: for a in range(self.n_actions): node.children.append(ChanceNode(node, a)) else: return None else: # ChanceNode for prob, next_state, reward, done in P[node.parent.state][node.action]: node.children.append( DecisionNode( parent=node, state=next_state, weight=prob, is_terminal=bool(done), reward=float(reward), ) ) for ch in node.children: if isinstance(ch, DecisionNode): if not ch.is_terminal: self.build_tree(ch, env) else: self.build_tree(ch, env)
[docs] def initialize_tree(self, env, done): root = DecisionNode(None, _get_state(env), 1.0, done) self.build_tree(root, env) return root
[docs] def minimax(self, node, env): if isinstance(node, DecisionNode): assert node.value is None, 'Error: node value={}'.format(node.value) if node.depth == self.max_depth: node.value = self.heuristic_value(node, env) elif node.is_terminal: node.value = node.reward else: v = -np.inf for ch in node.children: v = max(v, self.minimax(ch, env)) node.value = v else: # ChanceNode self.set_worst_case_distribution(node, env) v = 0.0 for ch in node.children: v += ch.weight * ch.value v *= self.gamma R = 0.0 for ch in node.children: R += ch.reward * ch.initial_weight v += R - self.L_r * self.tau * node.depth assert node.value is None, 'Error: node value={}'.format(node.value) node.value = v return node.value
[docs] def set_worst_case_distribution(self, node, env): assert isinstance(node, ChanceNode), \ 'Error: node type={}'.format(type(node)) for ch in node.children: self.minimax(ch, env) v = np.asarray([ch.value for ch in node.children]) w0 = np.asarray([ch.initial_weight for ch in node.children]) c = node.depth * self.L_p * self.tau w = self.worstcase_distribution_direct_method(v, w0, c) for i, ch in enumerate(node.children): ch.weight = w[i]
[docs] def worstcase_distribution_direct_method(self, v, w0, c): n = len(v) w_worst = np.zeros(n) w_worst[int(np.argmin(v))] = 1.0 return w_worst
[docs] def heuristic_value(self, node, env): return 0.0
[docs] def get_depth_list(self, node, d_list): d_list.append(node.depth) for ch in node.children: self.get_depth_list(ch, d_list)
[docs] def act(self, observation=None, env=None, done=False): """Run the RATS planning procedure and return the chosen action. Args: observation: ignored (state is read from env.unwrapped.s); kept for signature parity with other ns_gym agents. env: NS-Gym env exposing env.unwrapped.s and env.unwrapped.P. done (bool): True if the current state is terminal. """ assert env is not None, "RATS.act requires an env argument" self.t_call = 0 root = self.initialize_tree(deepcopy(env), done) self.minimax(root, deepcopy(env)) return max(root.children, key=_node_value).action