Skip to content
Snippets Groups Projects
simulation_runner.py 2.88 KiB
import time
from threading import Thread

from overcooked_simulator.overcooked_environment import Environment, Action
from overcooked_simulator.player import Player


class Simulator(Thread):
    """Simulator main class which runs manages the environment and player inputs and gamestate outputs.

    Main Simulator class which runs the game environment. Players can be registered in the game.
    The simulator is run as its own thread.

    Typical usage example:

      sim = Simulator()
      sim.register_player(Player("p1", [x,y]))
      sim.start()
    """

    def __init__(self, env_layout_path, frequency: int):
        self.finished: bool = False

        self.step_frequency: int = frequency
        self.prefered_sleeptime_ns: float = 1e9 / self.step_frequency
        self.env: Environment = Environment(env_layout_path)

        super().__init__()

    def step(self):
        """One simulation step of the environment."""
        self.env.step()

    def enter_action(self, action: Action):
        """Takes an action and executes it in the environment.

        Args:
            action (Action): The action object to be executed.
        """
        self.env.perform_action(action)

    def get_state(self):
        """Get the current gamestate as python objects.

        Returns:
            The current state of the game. Currently as dict with lists of environment objects.
        """

        return self.env.get_state()

    def get_state_json(self):
        """Get the current gamestate in json-like dict.

        Returns:
            The gamestate encoded in a json style nested dict.
        """

        return self.env.get_state_json()

    def register_player(self, player: Player):
        """Adds a player to the environment.

        Args:
            player: The player to be added.
        """
        # print(f"Added player {player.name} to the game.")

        self.env.players[player.name] = player

    def register_players(self, players: list[Player]):
        """Registers multiple players from a list

        Args:
            players: List of players to be added.
        """

        for p in players:
            self.register_player(p)

    def run(self):
        """Starts the simulator thread. Runs in a loop until stopped."""
        overslept_in_ns = 0

        while not self.finished:
            step_start = time.time_ns()
            self.step()
            step_duration = time.time_ns() - step_start

            time_to_sleep_ns = self.prefered_sleeptime_ns - (
                step_duration + overslept_in_ns
            )

            sleep_start = time.time_ns()
            time.sleep(max(time_to_sleep_ns / 1e9, 0))
            sleep_function_duration = time.time_ns() - sleep_start
            overslept_in_ns = sleep_function_duration - time_to_sleep_ns

    def stop(self):
        """Stops the simulator"""
        print("Stopping the simulation.")
        self.finished = True