Source code for ns_gym.schedulers

import ns_gym.base as base
import numpy as np

"""
The schdulers can be initialized at different times to we need to maintian an internal clock as well.
"""


[docs] class RandomScheduler(base.Scheduler): """Random event scheduler: Events occur randomly with a given probability at each time step. Args: probability (float): The probability of an event occurring at each time step. start (int, optional): The start time for the scheduler. Defaults to 0. end (int, optional): The end time for the scheduler. Defaults to infinity. seed (int, optional): Random generator seed. Defaults to None. """ def __init__( self, probability: float = 0.5, start=0, end=np.inf, seed=None ) -> None: super().__init__(start, end) self.probability = probability self.rng = np.random.default_rng(seed=seed) def __call__(self, t: int) -> bool: if self.start <= t <= self.end: return self.rng.random() < self.probability else: return False
[docs] class CustomScheduler(base.Scheduler): """Custom event scheduler: Allows for custom event logic based on a user-defined function.""" def __init__(self, event_function, start=0, end=np.inf) -> None: """ Args: event_function (function): A function that takes the current time step as input and returns a boolean indicating whether an event should occur. """ super().__init__(start, end) self.event_function = event_function def __call__(self, t: int) -> bool: if self.start <= t <= self.end: return self.event_function(t) else: return False
[docs] class ContinuousScheduler(base.Scheduler): """Continuous Event Scheduler : At every time step return true""" def __init__(self, start=0, end=np.inf) -> None: super().__init__(start, end) def __call__(self, t: int) -> bool: return self.start <= t <= self.end
[docs] class DiscreteScheduler(base.Scheduler): """A discrete event scheduler returns a bool indicating where the system should transition at this time step Args: event_list (set): List of time steps to make a transition """ def __init__(self, event_list: set, start=0, end=np.inf) -> None: super().__init__(start, end) self.event_list = event_list assert min(event_list) >= start, ( "Scheduler start time occurs after first event in event list" ) assert max(event_list) <= end, ( "Scheduler end time occurs before last event in event list" ) def __call__(self, t: int) -> bool: if self.start <= t <= self.end: return t in self.event_list
[docs] class PeriodicScheduler(base.Scheduler): """Periodic event scheduler: At periodic steps return true. Args: period (int): Period of event transition times. """ def __init__(self, period: int, start=0, end=np.inf) -> None: super().__init__(start, end) self.period = period def __call__(self, t: int) -> bool: return t % self.period == 0
[docs] class MemorylessScheduler(base.Scheduler): """Memoryless Scheduler: Events happen at intervals according to a Geometric distribution This scheduler models the number of trials that must be run before a success. The scheduler samples from a geometric distribution then records the new time an event will occur. After a transition we resample from the geometric distribution. Args: p (float): The probability of success of an individual trial. seed (int, optional): Random generator seed. Defaults to None. """ def __init__(self, p: float, start=0, end=np.inf, seed=None) -> None: super().__init__(start, end) self.p = p self.rng = np.random.default_rng(seed=seed) self.transition_time = self.rng.geometric(p=self.p, size=(1,)) def __call__(self, t: int) -> bool: if t == self.transition_time: delta_t = self.rng.geometric(p=self.p, size=(1,)) self.transition_time = delta_t + t return True else: return False
if __name__ == "__main__": print(base.Scheduler.__subclasses__())