""" # Usage - Set `CONNECT_WITH_STUDY_SERVER` in overcooked_gui.py to True. - Run this script. Copy the manager id that is printed - Run the game_server.py script with the manager id copied from the terminal ``` python game_server.py --manager_ids COPIED_UUID ``` - Run 2 overcooked_gui.py scripts in different terminals. For more players change `NUMBER_PLAYER_PER_ENV` and start more guis. The environment starts when all players connected. """ import argparse import asyncio import logging import os import random import signal import subprocess from pathlib import Path from subprocess import Popen from typing import Tuple, TypedDict import requests import uvicorn import yaml from fastapi import FastAPI from cooperative_cuisine import ROOT_DIR from cooperative_cuisine.environment import EnvironmentConfig from cooperative_cuisine.game_server import CreateEnvironmentConfig from cooperative_cuisine.server_results import PlayerInfo from cooperative_cuisine.utils import ( url_and_port_arguments, add_list_of_manager_ids_arguments, ) NUMBER_PLAYER_PER_ENV = 2 log = logging.getLogger(__name__) app = FastAPI() # HARDCODED_MANAGER_ID = "1234" USE_AAAMBOS_AGENT = False class LevelConfig(TypedDict): name: str config_path: str layout_path: str item_info_path: str class LevelInfo(TypedDict): name: str last_level: bool recipes: list[str] recipe_graphs: list[dict] class StudyConfig(TypedDict): levels: list[LevelConfig] num_players: int num_bots: int class StudyState: def __init__(self, study_config_path: str | Path, game_url, game_port): with open(study_config_path, "r") as file: env_config_f = file.read() self.study_config: StudyConfig = yaml.load( str(env_config_f), Loader=yaml.SafeLoader ) self.levels: list[LevelConfig] = self.study_config["levels"] self.current_level_idx: int = 0 self.participant_id_to_player_info = {} self.player_ids = {} self.num_connected_players: int = 0 self.current_running_env = None self.next_level_env = None self.players_done = {} self.USE_AAAMBOS_AGENT = False self.websocket_url = f"ws://{game_url}:{game_port}/ws/player/" print("WS:", self.websocket_url) self.sub_processes = [] @property def study_done(self): return self.current_level_idx >= len(self.levels) @property def last_level(self): return self.current_level_idx >= len(self.levels) - 1 @property def is_full(self): return ( len(self.participant_id_to_player_info) == self.study_config["num_players"] ) def can_add_participant(self, num_participants: int) -> bool: filled = ( self.num_connected_players + num_participants <= self.study_config["num_players"] ) return filled and not self.is_full def create_env(self, level): with open(ROOT_DIR / "game_content" / level["item_info_path"], "r") as file: item_info = file.read() self.current_item_info: EnvironmentConfig = yaml.load( item_info, Loader=yaml.Loader ) with open( ROOT_DIR / "game_content" / "layouts" / level["layout_path"], "r" ) as file: layout = file.read() with open(ROOT_DIR / "game_content" / level["config_path"], "r") as file: environment_config = file.read() self.current_config: EnvironmentConfig = yaml.load( environment_config, Loader=yaml.Loader ) seed = int(random.random() * 1000000) print(seed) creation_json = CreateEnvironmentConfig( manager_id=study_manager.server_manager_id, number_players=self.study_config["num_players"] + self.study_config["num_bots"], environment_settings={"all_player_can_pause_game": False}, item_info_config=item_info, environment_config=environment_config, layout_config=layout, seed=seed, ).model_dump(mode="json") env_info = requests.post( study_manager.game_server_url + "/manage/create_env/", json=creation_json ) if env_info.status_code == 403: raise ValueError(f"Forbidden Request: {env_info.json()['detail']}") env_info = env_info.json() player_info = env_info["player_info"] for idx, (player_id, player_info) in enumerate(player_info.items()): if idx >= self.study_config["num_players"]: self.create_and_connect_bot(player_id, player_info) return env_info def start(self): level = self.levels[self.current_level_idx] self.current_running_env = self.create_env(level) def next_level(self): requests.post( f"{study_manager.game_server_url}/manage/stop_env/", json={ "manager_id": study_manager.server_manager_id, "env_id": self.current_running_env["env_id"], "reason": "Next level", }, ) self.current_level_idx += 1 if not self.study_done: level = self.levels[self.current_level_idx] self.current_running_env = self.create_env(level) for ( participant_id, player_info, ) in self.participant_id_to_player_info.items(): new_player_info = { player_name: self.current_running_env["player_info"][player_name] for player_name in player_info.keys() } self.participant_id_to_player_info[participant_id] = new_player_info for key in self.players_done: self.players_done[key] = False def add_participant(self, participant_id: str, number_players: int): player_names = [ str(self.num_connected_players + i) for i in range(number_players) ] player_info = { player_name: self.current_running_env["player_info"][player_name] for player_name in player_names } self.participant_id_to_player_info[participant_id] = player_info self.num_connected_players += number_players return player_info def player_finished_level(self, participant_id): self.players_done[participant_id] = True level_done = all(self.players_done.values()) if level_done: self.next_level() def get_connection(self, participant_id: str): player_info = self.participant_id_to_player_info[participant_id] current_level = self.levels[self.current_level_idx] if self.current_config["meals"]["all"]: recipes = ["all"] else: recipes = self.current_config["meals"]["list"] level_info = LevelInfo( name=current_level["name"], last_level=self.last_level, recipes=recipes, recipe_graphs=self.current_running_env["recipe_graphs"], ) return player_info, level_info def create_and_connect_bot(self, player_id, player_info): player_hash = player_info["player_hash"] print( f'--general_plus="agent_websocket:{self.websocket_url + player_info["client_id"]};player_hash:{player_hash};agent_id:{player_id}"' ) if self.USE_AAAMBOS_AGENT: sub = Popen( " ".join( [ "exec", "aaambos", "run", "--arch_config", str(ROOT_DIR / "game_content" / "agents" / "arch_config.yml"), "--run_config", str(ROOT_DIR / "game_content" / "agents" / "run_config.yml"), f'--general_plus="agent_websocket:{self.websocket_url + player_info["client_id"]};player_hash:{player_hash};agent_id:{player_id}"', f"--instance={player_hash}", ] ), shell=True, ) else: sub = Popen( " ".join( [ "python", str(ROOT_DIR / "game_content" / "agents" / "random_agent.py"), f'--uri {self.websocket_url + player_info["client_id"]}', f"--player_hash {player_hash}", f"--player_id {player_id}", ] ), shell=True, ) self.sub_processes.append(sub) def kill_bots(self): for sub in self.sub_processes: try: if self.USE_AAAMBOS_AGENT: pgrp = os.getpgid(sub.pid) os.killpg(pgrp, signal.SIGINT) subprocess.run( "kill $(ps aux | grep 'aaambos' | awk '{print $2}')", shell=True ) else: sub.kill() except ProcessLookupError: pass self.sub_processes = [] for websocket in self.websockets.values(): websocket.close() def __repr__(self): return f"Study({self.current_running_env['env_id']})" class StudyManager: def __init__(self): self.game_host: str | None = None self.game_port: str | None = None self.game_server_url: str | None = None self.server_manager_id: str | None = None self.running_studies: list[StudyState] = [] self.participant_id_to_study_map: dict[str, StudyState] = {} self.running_envs: dict[str, Tuple[int, dict[str, PlayerInfo], list[str]]] = {} self.current_free_envs = [] self.running_tutorials: dict[ str, Tuple[int, dict[str, PlayerInfo], list[str]] ] = {} def create_study(self): study = StudyState( ROOT_DIR / "game_content" / "study" / "study_config.yaml", self.game_host, self.game_port, ) study.start() self.running_studies.append(study) def add_participant(self, participant_id: str, number_players: int): player_info = None if not self.running_studies or all( [not s.can_add_participant(number_players) for s in self.running_studies] ): self.create_study() for study in self.running_studies: if study.can_add_participant(number_players): player_info = study.add_participant(participant_id, number_players) self.participant_id_to_study_map[participant_id] = study return player_info def player_finished_level(self, participant_id: str): assigned_study = self.participant_id_to_study_map[participant_id] assigned_study.player_finished_level(participant_id) def get_participant_game_connection(self, participant_id: str): assigned_study = self.participant_id_to_study_map[participant_id] player_info, level_info = assigned_study.get_connection(participant_id) return player_info, level_info def set_game_server_url(self, game_host, game_port): self.game_host = game_host self.game_port = game_port self.game_server_url = f"http://{self.game_host}:{self.game_port}" def set_manager_id(self, manager_id: str): self.server_manager_id = manager_id study_manager = StudyManager() @app.post("/start_study/{participant_id}/{number_players}") async def start_study(participant_id: str, number_players: int): player_info = study_manager.add_participant(participant_id, number_players) return player_info @app.post("/level_done/{participant_id}") async def level_done(participant_id: str): last_level = study_manager.player_finished_level(participant_id) @app.post("/get_game_connection/{participant_id}") async def get_game_connection(participant_id: str): player_info, level_info = study_manager.get_participant_game_connection( participant_id ) return {"player_info": player_info, "level_info": level_info} @app.post("/connect_to_tutorial/{participant_id}") async def want_to_play_tutorial(participant_id: str): environment_config_path = ROOT_DIR / "game_content" / "tutorial_env_config.yaml" layout_path = ROOT_DIR / "game_content" / "layouts" / "tutorial.layout" item_info_path = ROOT_DIR / "game_content" / "item_info.yaml" with open(item_info_path, "r") as file: item_info = file.read() with open(layout_path, "r") as file: layout = file.read() with open(environment_config_path, "r") as file: environment_config = file.read() print("STUDY MANAGER ID", study_manager.server_manager_id) creation_json = CreateEnvironmentConfig( manager_id=study_manager.server_manager_id, number_players=1, environment_settings={"all_player_can_pause_game": False}, item_info_config=item_info, environment_config=environment_config, layout_config=layout, seed=1234567890, ).model_dump(mode="json") # todo async env_info = requests.post( study_manager.game_server_url + "/manage/create_env/", json=creation_json ) if env_info.status_code == 403: raise ValueError(f"Forbidden Request: {env_info.json()['detail']}") env_info = env_info.json() study_manager.running_tutorials[participant_id] = env_info return env_info["player_info"]["0"] @app.post("/disconnect_from_tutorial/{participant_id}") async def want_to_play_tutorial(participant_id: str): requests.post( f"{study_manager.game_server_url}/manage/stop_env/", json={ "manager_id": study_manager.server_manager_id, "env_id": study_manager.running_tutorials[participant_id]["env_id"], "reason": "Finished tutorial", }, ) def main(study_host, study_port, game_host, game_port, manager_ids): study_manager.set_game_server_url(game_host=game_host, game_port=game_port) study_manager.set_manager_id(manager_id=manager_ids[0]) print( f"Use {study_manager.server_manager_id=} for game_server_url=http://{game_host}:{game_port}" ) loop = asyncio.new_event_loop() config = uvicorn.Config(app, host=study_host, port=study_port, loop=loop) server = uvicorn.Server(config) loop.run_until_complete(server.serve()) if __name__ == "__main__": parser = argparse.ArgumentParser( prog="Overcooked Simulator Study Server", description="Study Server: Match Making, client pre and post managing.", epilog="For further information, see https://scs.pages.ub.uni-bielefeld.de/cocosy/overcooked-simulator/overcooked_simulator.html", ) url_and_port_arguments( parser=parser, server_name="Study Server", default_study_port=8080, default_game_port=8000, ) add_list_of_manager_ids_arguments(parser=parser) args = parser.parse_args() game_server_url = f"https://{args.game_url}:{args.game_port}" main( args.study_url, args.port, game_host=args.game_url, game_port=args.game_port, manager_ids=args.manager_ids, )