Skip to content
Snippets Groups Projects
simulation_runner.py 3.24 KiB
import logging
import time
from threading import Thread

import numpy as np

from overcooked_simulator import ROOT_DIR
from overcooked_simulator.overcooked_environment import Environment, Action

log = logging.getLogger(__name__)


class Simulator(Thread):
    """Simulator main class which runs manages the environment and player inputs and game state outputs.

    Main Simulator class which runs the game environment. Players can be registered in the game.
    The simulator is run as its own thread.

    Typical usage example:
    ```python
    sim = Simulator()
    sim.register_player(Player("p1", [x,y]))
    sim.start()
    ```
    """

    def __init__(
        self,
        env_config_path,
        layout_path,
        frequency: int,
        item_info_path=ROOT_DIR / "game_content" / "item_info.yaml",
        seed: int = 8654321,
    ):
        # TODO look at https://builtin.com/data-science/numpy-random-seed to change to other random
        np.random.seed(seed)
        self.finished: bool = False

        self.step_frequency: int = frequency
        self.preferred_sleep_time_ns: float = 1e9 / self.step_frequency
        self.env: Environment = Environment(
            env_config_path, layout_path, item_info_path
        )

        super().__init__()

    def step(self):
        """One simulation step of the environment."""
        self.env.step()

    def enter_action(self, action: Action):
        """Takes an action and executes it in the environment.

        Args:
            action (Action): The action object to be executed.
        """
        self.env.perform_action(action)

    def get_state(self):
        """Get the current game state as python objects.

        Returns:
            The current state of the game. Currently, as dict with lists of environment objects.
        """

        return self.env.get_state()

    def get_state_json(self):
        """Get the current game state in json-like dict.

        Returns:
            The gamest ate encoded in a json style nested dict.
        """

        return self.env.get_state_json()

    def register_player(self, player_name: str, pos=None):
        """Adds a player to the environment.

        Args:
            player: The player to be added.
        """
        self.env.add_player(player_name, pos)

    def register_players(self, players: list[str]):
        """Registers multiple players from a list

        Args:
            players: List of players to be added.
        """

        for p in players:
            self.register_player(p)

    def run(self):
        """Starts the simulator thread. Runs in a loop until stopped."""

        overslept_in_ns = 0

        while not self.finished:
            step_start = time.time_ns()
            self.step()
            step_duration = time.time_ns() - step_start

            time_to_sleep_ns = self.preferred_sleep_time_ns - (
                step_duration + overslept_in_ns
            )

            sleep_start = time.time_ns()
            time.sleep(max(time_to_sleep_ns / 1e9, 0))
            sleep_function_duration = time.time_ns() - sleep_start
            overslept_in_ns = sleep_function_duration - time_to_sleep_ns

    def stop(self):
        """Stops the simulator"""
        log.debug("Stopping the simulation")
        self.finished = True