-
Florian Schröder authored
The code now contains a new hook named "additional_state_update" and accommodates accepting additional state content. This functionality is used to better manage player readiness states in the game_server module and includes updates to the get_state and get_json_state methods in the environment module. New changes also impact the environment configuration files.
Florian Schröder authoredThe code now contains a new hook named "additional_state_update" and accommodates accepting additional state content. This functionality is used to better manage player readiness states in the game_server module and includes updates to the get_state and get_json_state methods in the environment module. New changes also impact the environment configuration files.
game_server.py 31.42 KiB
"""A game server that manages the environments.
Study server or single instance games/GUIs can create environments. The game server returns client ids for the player
individual websockets. These players should then connect to the websockets under `/ws/player/{client_id}`. Each
player can state that them are ready to play, request the current game state, and pass the players action. See
`PlayerRequestType` and `manage_websocket_message`.
"""
from __future__ import annotations
import argparse
import asyncio
import dataclasses
import json
import logging
import time
import uuid
from collections import defaultdict
from datetime import datetime, timedelta
from enum import Enum
from typing import Set
import numpy as np
import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi import WebSocket
from pydantic import BaseModel
from starlette.websockets import WebSocketDisconnect
from typing_extensions import TypedDict
from cooperative_cuisine.action import Action
from cooperative_cuisine.environment import Environment
from cooperative_cuisine.server_results import (
CreateEnvResult,
PlayerInfo,
PlayerRequestResult,
)
from cooperative_cuisine.utils import (
url_and_port_arguments,
add_list_of_manager_ids_arguments,
disable_websocket_logging_arguments,
setup_logging,
UUID_CUTOFF,
)
log = logging.getLogger(__name__)
"""The logger for this module."""
app = FastAPI()
"""The FastAPI app that runs the game server."""
TIME_AFTER_STOP_TO_DEL_ENV = 30
"""Time after stopping an environment how long it takes to delete the env data from the game server. In seconds."""
@dataclasses.dataclass
class PlayerData:
"""Information about a player in an environment."""
player_id: str
"""ID of the player."""
env_id: str
"""ID of the environment the player is in."""
websocket_id: str | None = None
"""The websocket id for connecting to the game server."""
connected: bool = False
"""Is the player currently connected?"""
ready: bool = False
"""Did the player send 'ready'?"""
last_action: datetime | None = None
"""Time of the last action of the player."""
name: str = ""
"""Name of the player."""
class EnvironmentSettings(TypedDict):
"""Dict for settings for the environment in the game server"""
all_player_can_pause_game: bool
"""Setting if all players can pause the game."""
# env_steps_per_second: int
class EnvironmentStatus(Enum):
"""4 possible states of an environment."""
WAITING_FOR_PLAYERS = "waitingForPlayers"
"""The environment is waiting for all players to send 'ready'."""
PAUSED = "paused"
"""The environment is paused."""
RUNNING = "running"
"""The environment is running."""
STOPPED = "stopped"
"""The environment is stopped."""
@dataclasses.dataclass
class EnvironmentData:
"""The environment and additional information about it."""
environment: Environment
"""The actual environment."""
player_hashes: Set[str] = dataclasses.field(default_factory=set)
"""Hashes of players for checking permission."""
environment_settings: EnvironmentSettings = dataclasses.field(default_factory=dict)
"""Settings for the environment on the game server."""
status: EnvironmentStatus = EnvironmentStatus.WAITING_FOR_PLAYERS
"""Status of the environment."""
stop_reason: str = ""
"""Reason why the environment was stopped."""
start_time: datetime | None = None
"""Time of when the environment was started."""
last_step_time: int | None = None
"""Time of the last performed step of the environment."""
# add manager_id?
class EnvironmentHandler:
"""Running several environments for a game server."""
def __init__(self, env_step_frequency: int = 200):
"""Constructor of EnvironmentHandler.
Args:
env_step_frequency (int): The frequency at which the environment steps. Defaults to 200.
"""
self.envs: dict[str, EnvironmentData] = {}
"""A dictionary of environment IDs and their respective data."""
self.player_data: dict[str, PlayerData] = {}
"""A dictionary of player hashes and their respective data."""
self.manager_envs: dict[str, Set[str]] = defaultdict(set)
"""A dictionary of manager IDs and the environment IDs managed by each manager."""
self.env_step_frequency = env_step_frequency
"""The frequency at which the environment steps."""
self.preferred_sleep_time_ns = 1e9 / self.env_step_frequency
"""The preferred sleep time between environment steps in nanoseconds based on the `env_step_frequency`."""
self.client_ids_to_player_hashes = {}
"""A dictionary mapping client IDs to player hashes."""
self.allowed_manager: list[str] = []
"""List of manager ids that are allowed to manage/create environments."""
self.host: str = ""
"""The host string (e.g., localhost) of the game server."""
self.port: int = 8000
"""The port of the game server."""
def create_env(
self, environment_config: CreateEnvironmentConfig
) -> CreateEnvResult | int:
"""Create a new environment.
Args:
environment_config: An instance of CreateEnvironmentConfig class that contains the configuration for creating the environment.
Returns:
A dictionary containing the created environment ID and player information.
"""
if environment_config.manager_id not in self.allowed_manager:
return 1
env_id = f"{environment_config.env_name}_env_{uuid.uuid4().hex[:UUID_CUTOFF]}" # todo uuid cutoff
if environment_config.number_players < 1:
raise HTTPException(
status_code=409, detail="Number players need to be positive."
)
env = Environment(
env_config=environment_config.environment_config,
layout_config=environment_config.layout_config,
item_info=environment_config.item_info_config,
as_files=False,
env_name=env_id,
seed=environment_config.seed,
)
player_info = {}
for player_id in range(environment_config.number_players):
player_id = str(player_id)
player_info[player_id] = self.create_player(env, env_id, player_id)
self.envs[env_id] = EnvironmentData(
environment=env,
player_hashes={info["player_hash"] for info in player_info.values()},
)
self.manager_envs[environment_config.manager_id].update([env_id])
graphs = env.recipe_validation.get_recipe_graphs()
kitchen_size = (env.kitchen_width, env.kitchen_height)
env.update_additional_state_content(all_players_ready=False)
res = CreateEnvResult(
env_id=env_id,
player_info=player_info,
recipe_graphs=graphs,
kitchen_size=kitchen_size,
)
return res
def create_player(
self, env: Environment, env_id: str, player_id: str
) -> PlayerInfo:
"""Create a player in an environment and the information for the client to send the control data to the server.
This method creates a player within the specified environment. It generates a unique player hash
and client ID using the uuid module. The player data, including the player ID, environment ID, and websocket
ID (client ID), is stored in the player_data dictionary. The client ID and player hash are also stored in the
client_ids_to_player_hashes dictionary for easy lookup. Finally, the player is added to the environment using
the add_player() method. The method returns a dictionary containing the client ID, player hash, and player ID.
Args:
env (Environment): The instance of the Environment class in which the player is being created.
env_id (str): The identifier for the environment.
player_id (str): The identifier for the player being created.
Returns:
PlayerInfo: A dictionary containing the client ID, player hash, and player ID.
"""
player_hash = uuid.uuid4().hex
client_id = uuid.uuid4().hex
player_data = PlayerData(
player_id=player_id,
env_id=env_id,
websocket_id=client_id,
)
self.player_data[player_hash] = player_data
self.client_ids_to_player_hashes[client_id] = player_hash
env.add_player(player_id)
return {
"client_id": client_id,
"player_hash": player_hash,
"player_id": player_id,
"websocket_url": f"ws://{self.host}:{self.port}/ws/player/{client_id}",
}
def add_player(self, config: AdditionalPlayer) -> dict[str, PlayerInfo]:
"""Add new player(s) to the environment.
Args:
config (AdditionalPlayer): Configuration for adding new player(s) to the environment.
Returns:
dict[str, PlayerInfo]: A dictionary containing information about the newly added player(s).
Example Usage:
config = AdditionalPlayer(manager_id='manager_1', env_id='env_1', number_players=2)
new_players = add_player(config)
"""
new_player_info = {}
if (
config.manager_id in self.manager_envs
and config.env_id in self.manager_envs[config.manager_id]
and self.envs[config.env_id].status != EnvironmentStatus.STOPPED
):
n_players = len(self.envs[config.env_id].player_hashes)
for player_id in range(n_players, n_players + config.number_players):
player_id = str(player_id)
new_player_info[player_id] = self.create_player(
env=self.envs[config.env_id].environment,
env_id=config.env_id,
player_id=player_id,
)
log.debug(f"Added player {player_id} to env {config.env_id}")
return new_player_info
def start_env(self, env_id: str):
"""Start the specified environment and already created environment.
Args:
env_id (str): The ID of the environment to start.
"""
if env_id in self.envs:
log.info(f"Start environment {env_id=}")
start_time = datetime.now()
self.envs[env_id].status = EnvironmentStatus.RUNNING
self.envs[env_id].start_time = start_time
self.envs[env_id].last_step_time = time.time_ns()
self.envs[env_id].environment.reset_env_time()
self.envs[env_id].environment.update_additional_state_content(
all_players_ready=True
)
def get_state(
self, player_hash: str
) -> str | int: # -> StateRepresentation as json
"""Get the current state representation of the environment for a player.
Args:
player_hash (str): The unique identifier of the player.
Returns:
str: The state representation of the environment for a player. Is
`cooperative_cuisine.state_representation.StateRepresentation` as a json.
"""
if (
player_hash in self.player_data
and self.player_data[player_hash].env_id in self.envs
):
env_data = self.envs[self.player_data[player_hash].env_id]
state = env_data.environment.get_json_state(
self.player_data[player_hash].player_id,
)
return state
if player_hash not in self.player_data:
return 1
if self.player_data[player_hash].env_id not in self.envs:
return 2
def pause_env(self, manager_id: str, env_id: str, reason: str):
"""Pause the specified environment.
Args:
manager_id (str): The ID of the manager that manages the environment.
env_id (str): The ID of the environment.
reason (str): The reason for pausing the environment.
"""
if (
manager_id in self.manager_envs
and env_id in self.manager_envs[manager_id]
and self.envs[env_id].status
not in [EnvironmentStatus.STOPPED, EnvironmentStatus.PAUSED]
):
self.envs[env_id].status = EnvironmentStatus.PAUSED
def unpause_env(self, manager_id: str, env_id: str, reason: str):
"""Unpause the specified environment.
Args:
manager_id (str): The ID of the manager that manages the environment.
env_id (str): The ID of the environment.
reason (str): The reason for un-pausing the environment.
"""
if (
manager_id in self.manager_envs
and env_id in self.manager_envs[manager_id]
and self.envs[env_id].status == EnvironmentStatus.PAUSED
):
self.envs[env_id].status = EnvironmentStatus.RUNNING
self.envs[env_id].last_step_time = time.time_ns()
def stop_env(self, manager_id: str, env_id: str, reason: str) -> int:
"""Stop the specified environment.
Args:
manager_id: A string representing the id of the manager.
env_id: A string representing the id of the environment.
reason: A string representing the reason for stopping the environment.
Returns:
An integer code indicating the result of stopping the environment.
- 0: The environment was successfully stopped.
- 1: The manager_id or env_id is invalid.
- 2: The environment is already stopped.
"""
if manager_id in self.manager_envs and env_id in self.manager_envs[manager_id]:
if self.envs[env_id].status != EnvironmentStatus.STOPPED:
self.envs[env_id].status = EnvironmentStatus.STOPPED
self.envs[env_id].stop_reason = reason
log.debug(f"Stopped environment: id={env_id}, reason={reason}")
return 0
log.debug(f"Could not stop environment: id={env_id}, env is not running")
return 2
log.debug(f"Could not stop environment: id={env_id}, no env with this id")
return 1
def set_player_ready(self, player_hash) -> bool:
"""Set the specified player as ready.
Args: player_hash (str): The hash that allows access to the player data (should only know the player client
and not other players).
Returns:
bool: True if the player is successfully set as ready, False otherwise.
"""
if player_hash in self.player_data:
self.player_data[player_hash].ready = True
return True
return False
def set_player_connected(self, client_id: str) -> bool:
"""Set the connected status of a player.
Args:
client_id (str): The client ID of the player.
Returns:
bool: True if the connected status was successfully set, False otherwise.
"""
if client_id in self.client_ids_to_player_hashes:
self.player_data[
self.client_ids_to_player_hashes[client_id]
].connected = True
return True
return False
def set_player_disconnected(self, client_id: str) -> bool:
"""Set player as disconnected.
Args:
client_id: The ID of the client.
Returns:
True if the player was successfully set as disconnected, False otherwise.
"""
if client_id in self.client_ids_to_player_hashes:
log.warning(
f"Player {self.player_data[self.client_ids_to_player_hashes[client_id]].player_id} in env {self.player_data[self.client_ids_to_player_hashes[client_id]].env_id} disconnected"
)
self.player_data[
self.client_ids_to_player_hashes[client_id]
].connected = False
return True
return False
def check_all_player_ready(self, env_id: str) -> bool:
"""Check if all players in the specified environment are ready.
Args:
self (object): The current object instance.
env_id (str): The ID of the environment to check.
Returns:
bool: True if all players are ready, False otherwise.
"""
self: EnvironmentHandler # Pycharm bug?
return env_id in self.envs and all(
self.player_data[player_hash].connected
and self.player_data[player_hash].ready
for player_hash in self.envs[env_id].player_hashes
)
def check_all_players_connected(self, env_id: str) -> bool:
"""Check if all players in a given environment are connected.
Args:
env_id: The ID of the environment to check.
Returns:
bool: True if all players are connected, False otherwise.
"""
return env_id in self.envs and all(
self.player_data[player_hash].connected
for player_hash in self.envs[env_id].player_hashes
)
def list_not_connected_players(self, env_id: str) -> list[str]:
"""List player_ids of all players that aren't connected to the server.
Args:
env_id: The ID of the environment for which to retrieve the list of not connected players
Returns:
A list of player IDs of players who are not connected to the specified environment
"""
if env_id in self.envs:
return [
self.player_data[player_hash].player_id
for player_hash in self.envs[env_id].player_hashes
if not self.player_data[player_hash].connected
]
def list_not_ready_players(self, env_id: str) -> list[str]:
"""List player IDs for players who are not ready to play in a specific environment.
Args:
env_id (str): The ID of the environment.
Returns:
list[str]: A list of player IDs who are not ready to play in the specified environment.
"""
if env_id in self.envs:
return [
self.player_data[player_hash].player_id
for player_hash in self.envs[env_id].player_hashes
if not self.player_data[player_hash].ready
]
async def environment_steps(self):
"""Asynchronous method that performs environmental steps for all running environments.
Should run asynchronously alongside the server.
"""
# TODO environment dependent steps.
overslept_in_ns = 0
# TODO add checking if player disconnects
# - also what should happen when all disconnect for a time -> stop env.
while True:
pre_step_start = time.time_ns()
to_remove = []
for env_id, env_data in self.envs.items():
if env_data.status == EnvironmentStatus.RUNNING:
step_start = time.time_ns()
env_data.environment.step(
timedelta(
seconds=(step_start - env_data.last_step_time)
/ 1_000_000_000
)
)
env_data.last_step_time = step_start
if env_data.environment.game_ended:
log.info(f"Env {env_id} ended. Set env to STOPPED.")
env_data.status = EnvironmentStatus.STOPPED
elif (
env_data.status == EnvironmentStatus.WAITING_FOR_PLAYERS
and self.check_all_player_ready(env_id)
):
self.start_env(env_id)
elif (
env_data.status == EnvironmentStatus.STOPPED
and env_data.last_step_time + (TIME_AFTER_STOP_TO_DEL_ENV * 1e9)
< pre_step_start
):
to_remove.append(env_id)
if to_remove:
for env_id in to_remove:
del self.envs[env_id]
step_duration = time.time_ns() - pre_step_start
time_to_sleep_ns = self.preferred_sleep_time_ns - (
step_duration + overslept_in_ns
)
sleep_start = time.time_ns()
await asyncio.sleep(max(time_to_sleep_ns / 1e9, 0))
sleep_function_duration = time.time_ns() - sleep_start
overslept_in_ns = sleep_function_duration - time_to_sleep_ns
def is_known_client_id(self, client_id: str) -> bool:
"""Check if a client ID is known.
Client IDs are generated by the server for players to connect to a websocket.
Therefore, unknown IDs are ignored.
Args:
client_id (str): The client ID to be checked.
Returns:
bool: True if the client ID is known, False otherwise.
"""
return client_id in self.client_ids_to_player_hashes
def player_action(self, player_hash: str, action: Action) -> bool:
"""Pass an action of a player to the environment.
Args:
player_hash (str): The hash that allows access to the player data
(should only know the player client and not other players).
action (Action): The action to be performed.
Returns:
bool: True if the action was performed successfully, False otherwise.
"""
if (
player_hash in self.player_data
and action.player == self.player_data[player_hash].player_id
and self.player_data[player_hash].env_id in self.envs
and player_hash
in self.envs[self.player_data[player_hash].env_id].player_hashes
):
self.envs[self.player_data[player_hash].env_id].environment.perform_action(
action
)
return True
return False
def extend_allowed_manager(self, manager: list[str]):
self.allowed_manager.extend(manager)
def set_host_and_port(self, host, port):
self.host = host
self.port = port
class PlayerConnectionManager:
"""
PlayerConnectionManager is a class responsible for managing player connections in a server.
"""
def __init__(self):
"""Initializes the PlayerConnectionManager object."""
self.player_connections: dict[str, WebSocket] = {}
"""
A dictionary holding the client ID as the key and the corresponding WebSocket connection as the value.
"""
async def connect_player(self, websocket: WebSocket, client_id: str) -> bool:
"""Connect a player to the server by adding their WebSocket connection to the player_connections dictionary.
Args:
websocket (WebSocket): The WebSocket connection of the player.
client_id (str): The ID of the player.
Returns:
bool: True if the player is successfully connected, False if the player is already connected.
"""
if client_id not in self.player_connections:
await websocket.accept()
self.player_connections[client_id] = websocket
return True
return False
def disconnect(self, client_id: str):
"""Disconnect a player from the server by removing their entry from the player_connections dictionary.
Args:
client_id (str): The ID of the player.
"""
if client_id in self.player_connections:
del self.player_connections[client_id]
@staticmethod
async def send_personal_message(message: str, websocket: WebSocket):
"""Send a personal message to a specific player.
Args:
message (str): The message to send.
websocket (WebSocket): The WebSocket connection of the player.
"""
await websocket.send_text(message)
async def broadcast(self, message: str):
"""Send a message to all connected players.
Args:
message (str): The message to broadcast.
"""
for connection in self.player_connections.values():
await connection.send_text(message)
connection_manager = PlayerConnectionManager()
frequency = 200
environment_handler: EnvironmentHandler = EnvironmentHandler(
env_step_frequency=frequency
)
class PlayerRequestType(Enum):
"""Enumerates the possible types of websocket messages for a connected player."""
READY = "ready"
"""Indicates that the player is ready to play the game."""
GET_STATE = "get_state"
"""Indicates a request to get the current (player-specific) state."""
ACTION = "action"
"""Indicates a request to pass an action of a player to the environment."""
class WebsocketMessage(BaseModel):
type: str
action: None | Action = None
player_hash: str
class Config:
arbitrary_types_allowed = True
def manage_websocket_message(message: str, client_id: str) -> PlayerRequestResult | str:
"""Manage WebSocket Message by validating the message and passing it to the environment.
Args:
message: The WebSocket message.
client_id: The client ID.
Returns:
PlayerRequestResult or str: The result of the managed message.
"""
message_dict = json.loads(message)
request_type = None
try:
ws_message = WebsocketMessage(**message_dict)
request_type = PlayerRequestType(ws_message.type)
match request_type:
case PlayerRequestType.GET_STATE:
state = environment_handler.get_state(ws_message.player_hash)
if isinstance(state, int):
return {
"request_type": ws_message.type,
"status": 400,
"msg": "env id of player not in running envs"
if state == 2
else "player hash unknown",
"player_hash": None,
}
return state
case PlayerRequestType.READY:
accepted = environment_handler.set_player_ready(ws_message.player_hash)
return {
"request_type": request_type.value,
"msg": f"ready{' ' if accepted else ' not '}accepted",
"status": 200 if accepted else 400,
"player_hash": ws_message.player_hash,
}
case PlayerRequestType.ACTION:
assert (
ws_message.action is not None
), "websocket msg type action needs field action filled"
if isinstance(ws_message.action.action_data, list):
ws_message.action.action_data = np.array(
ws_message.action.action_data, dtype=float
)
accepted = environment_handler.player_action(
ws_message.player_hash, ws_message.action
)
return {
"request_type": request_type.value,
"status": 200 if accepted else 400,
"msg": f"action{' ' if accepted else ' not '}accepted",
"player_hash": ws_message.player_hash,
}
return {
"request_type": request_type.value,
"status": 400,
"msg": "request not handled",
"player_hash": ws_message.player_hash,
}
except ValueError as e:
return {
"request_type": message_dict["type"],
"status": 400,
"msg": e.args[0],
"player_hash": None,
}
except AssertionError as e:
return {
"request_type": request_type.value if request_type else None,
"status": 400,
"msg": e.args[0],
"player_hash": None,
}
@app.get("/")
def read_root():
return {"Cooperative": "Cuisine"}
class CreateEnvironmentConfig(BaseModel):
manager_id: str
number_players: int
same_websocket_player: list[list[str]] | None = None
environment_settings: EnvironmentSettings
item_info_config: str # file content
environment_config: str # file content
layout_config: str # file content
seed: int
env_name: str
class ManageEnv(BaseModel):
manager_id: str
env_id: str
reason: str
class AdditionalPlayer(BaseModel):
manager_id: str
env_id: str
number_players: int
existing_websocket: str | None = None
@app.post("/manage/create_env/")
async def create_env(creation: CreateEnvironmentConfig) -> CreateEnvResult:
result = environment_handler.create_env(creation)
if result == 1:
raise HTTPException(status_code=403, detail="Manager ID not known/registered.")
return result
@app.post("/manage/additional_player/")
async def additional_player(creation: AdditionalPlayer) -> dict[str, PlayerInfo]:
result = environment_handler.add_player(creation)
return result
@app.post("/manage/stop_env/")
async def stop_env(manage_env: ManageEnv) -> str:
accept = environment_handler.stop_env(
manage_env.manager_id, manage_env.env_id, manage_env.reason
)
if accept:
raise HTTPException(
status_code=403 if accept == 1 else 409,
detail="Environment does not belong to manager"
if accept == 1
else "Environment already stopped",
)
return "Ok"
# pause / unpause
# close all envs for a manager
# control access / functions / data
@app.websocket("/ws/player/{client_id}")
async def websocket_player_endpoint(websocket: WebSocket, client_id: str):
"""The method that receives messages from the websocket of a player and sends the results back to the client.
Args:
websocket (WebSocket): The WebSocket connection object.
client_id (str): The ID of the client.
"""
if not environment_handler.is_known_client_id(client_id):
log.warning(f"wrong websocket connection with {client_id=}")
return
await connection_manager.connect_player(websocket, client_id)
log.debug(f"Client #{client_id} connected")
environment_handler.set_player_connected(client_id)
try:
while True:
message = await websocket.receive_text()
answer = manage_websocket_message(message, client_id)
if isinstance(answer, dict):
answer = json.dumps(answer)
await connection_manager.send_personal_message(answer, websocket)
except WebSocketDisconnect:
connection_manager.disconnect(client_id)
environment_handler.set_player_disconnected(client_id)
log.debug(f"Client #{client_id} disconnected")
def main(
host: str, port: int, manager_ids: list[str], enable_websocket_logging: bool = False
):
setup_logging(enable_websocket_logging)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
environment_handler.extend_allowed_manager(manager_ids)
environment_handler.set_host_and_port(host=host, port=port)
loop.create_task(environment_handler.environment_steps())
config = uvicorn.Config(app, host=host, port=port, loop=loop)
server = uvicorn.Server(config)
loop.run_until_complete(server.serve())
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Cooperative Cuisine Game Server",
description="Game Engine Server: Starts overcooked game engine server.",
epilog="For further information, see "
"https://scs.pages.ub.uni-bielefeld.de/cocosy/overcooked-simulator/cooperative_cuisine.html",
)
url_and_port_arguments(parser)
disable_websocket_logging_arguments(parser)
add_list_of_manager_ids_arguments(parser)
args = parser.parse_args()
main(args.game_url, args.game_port, args.manager_ids, args.enable_websocket_logging)
"""
Or in console:
uvicorn cooperative_cuisine.fastapi_game_server:app --reload
"""