-
Fabian Heinrich authoredFabian Heinrich authored
simulation_runner.py 3.24 KiB
import logging
import time
from threading import Thread
import numpy as np
from overcooked_simulator import ROOT_DIR
from overcooked_simulator.overcooked_environment import Environment, Action
log = logging.getLogger(__name__)
class Simulator(Thread):
"""Simulator main class which runs manages the environment and player inputs and game state outputs.
Main Simulator class which runs the game environment. Players can be registered in the game.
The simulator is run as its own thread.
Typical usage example:
```python
sim = Simulator()
sim.register_player(Player("p1", [x,y]))
sim.start()
```
"""
def __init__(
self,
env_config_path,
layout_path,
frequency: int,
item_info_path=ROOT_DIR / "game_content" / "item_info.yaml",
seed: int = 8654321,
):
# TODO look at https://builtin.com/data-science/numpy-random-seed to change to other random
np.random.seed(seed)
self.finished: bool = False
self.step_frequency: int = frequency
self.preferred_sleep_time_ns: float = 1e9 / self.step_frequency
self.env: Environment = Environment(
env_config_path, layout_path, item_info_path
)
super().__init__()
def step(self):
"""One simulation step of the environment."""
self.env.step()
def enter_action(self, action: Action):
"""Takes an action and executes it in the environment.
Args:
action (Action): The action object to be executed.
"""
self.env.perform_action(action)
def get_state(self):
"""Get the current game state as python objects.
Returns:
The current state of the game. Currently, as dict with lists of environment objects.
"""
return self.env.get_state()
def get_state_json(self):
"""Get the current game state in json-like dict.
Returns:
The gamest ate encoded in a json style nested dict.
"""
return self.env.get_state_json()
def register_player(self, player_name: str, pos=None):
"""Adds a player to the environment.
Args:
player: The player to be added.
"""
self.env.add_player(player_name, pos)
def register_players(self, players: list[str]):
"""Registers multiple players from a list
Args:
players: List of players to be added.
"""
for p in players:
self.register_player(p)
def run(self):
"""Starts the simulator thread. Runs in a loop until stopped."""
overslept_in_ns = 0
while not self.finished:
step_start = time.time_ns()
self.step()
step_duration = time.time_ns() - step_start
time_to_sleep_ns = self.preferred_sleep_time_ns - (
step_duration + overslept_in_ns
)
sleep_start = time.time_ns()
time.sleep(max(time_to_sleep_ns / 1e9, 0))
sleep_function_duration = time.time_ns() - sleep_start
overslept_in_ns = sleep_function_duration - time_to_sleep_ns
def stop(self):
"""Stops the simulator"""
log.debug("Stopping the simulation")
self.finished = True