-
Fabian Heinrich authoredFabian Heinrich authored
gui.py 78.92 KiB
import argparse
import json
import logging
import os
import random
import re
import signal
import subprocess
import sys
import uuid
from enum import Enum
from subprocess import Popen
import numpy as np
import pygame
import pygame_gui
import requests
import yaml
from pygame import mixer
from websockets.sync.client import connect
from cooperative_cuisine import ROOT_DIR
from cooperative_cuisine.action import ActionType, InterActionData, Action
from cooperative_cuisine.game_server import (
CreateEnvironmentConfig,
WebsocketMessage,
PlayerRequestType,
)
from cooperative_cuisine.pygame_2d_vis.drawing import Visualizer
from cooperative_cuisine.pygame_2d_vis.game_colors import colors
from cooperative_cuisine.state_representation import StateRepresentation
from cooperative_cuisine.utils import (
url_and_port_arguments,
disable_websocket_logging_arguments,
add_list_of_manager_ids_arguments,
setup_logging,
add_gui_arguments,
)
class MenuStates(Enum):
"""Enumeration of "Page" types in the 2D pygame vis."""
Start = "Start"
ControllerTutorial = "ControllerTutorial"
PreGame = "PreGame"
Game = "Game"
PostGame = "PostGame"
End = "End"
log = logging.getLogger(__name__)
"""The logger for this module."""
class PlayerKeySet:
"""Set of keyboard keys for controlling a player.
First four keys are for movement. Order: Down, Up, Left, Right.
5th key is for interacting with counters.
6th key ist for picking up things or dropping them.
"""
def __init__(
self,
move_keys: list[pygame.key],
interact_key: pygame.key,
pickup_key: pygame.key,
switch_key: pygame.key,
players: list[str],
joystick: int,
):
"""Creates a player key set which contains information about which keyboard keys control the player.
Movement keys in the following order: Down, Up, Left, Right
Args:
move_keys: The keys which control this players movement in the following order: Down, Up, Left, Right.
interact_key: The key to interact with objects in the game.
pickup_key: The key to pick items up or put them down.
switch_key: The key for switching through controllable players.
players: The player indices which this keyset can control.
joystick: number of joystick (later check if available)
"""
self.move_vectors: list[list[int]] = [[-1, 0], [1, 0], [0, -1], [0, 1]]
self.key_to_movement: dict[pygame.key, list[int]] = {
key: vec for (key, vec) in zip(move_keys, self.move_vectors)
}
self.move_keys: list[pygame.key] = move_keys
self.interact_key: pygame.key = interact_key
self.pickup_key: pygame.key = pickup_key
self.switch_key: pygame.key = switch_key
self.controlled_players: list[str] = players
self.current_player: str = players[0] if players else "0"
self.current_idx = 0
self.other_keyset: list[PlayerKeySet] = []
self.joystick = joystick
def set_controlled_players(self, controlled_players: list[str]) -> None:
self.controlled_players = controlled_players
self.current_player = self.controlled_players[0]
self.current_idx = 0
def next_player(self) -> None:
self.current_idx = (self.current_idx + 1) % len(self.controlled_players)
if self.other_keyset:
for ok in self.other_keyset:
if ok.current_idx == self.current_idx:
self.next_player()
return
self.current_player = self.controlled_players[self.current_idx]
class PyGameGUI:
"""Visualisation of the overcooked environment and reading keyboard inputs using pygame."""
def __init__(
self,
study_host: str,
study_port: int,
game_host: str,
game_port: int,
manager_ids: list[str],
CONNECT_WITH_STUDY_SERVER: bool,
USE_AAAMBOS_AGENT: bool,
debug: bool,
):
self.CONNECT_WITH_STUDY_SERVER = CONNECT_WITH_STUDY_SERVER
self.USE_AAAMBOS_AGENT = USE_AAAMBOS_AGENT
self.show_debug_elements = debug
pygame.init()
pygame.display.set_icon(
pygame.image.load(ROOT_DIR / "pygame_2d_vis" / "images" / "brain_icon.png")
)
self.participant_id = uuid.uuid4().hex
self.game_screen: pygame.Surface | None = None
self.running = True
self.key_sets: list[PlayerKeySet] = []
self.websockets = {}
if CONNECT_WITH_STUDY_SERVER:
self.request_url = f"http://{study_host}:{study_port}"
else:
self.request_url = f"http://{game_host}:{game_port}"
self.manager_id = random.choice(manager_ids)
with open(ROOT_DIR / "pygame_2d_vis" / "visualization.yaml", "r") as file:
self.visualization_config = yaml.safe_load(file)
self.language = self.visualization_config["Gui"]["language"]
# self.fluent_bundle: FluentBundle = create_bundle(
# self.visualization_config["Gui"]["language"],
# ROOT_DIR
# / "pygame_2d_vis"
# / "locales"
# / f"gui_messages_{self.language}.ftl",
# )
self.FPS = self.visualization_config["GameWindow"]["FPS"]
self.screen_margin = self.visualization_config["GameWindow"]["screen_margin"]
self.min_width = self.visualization_config["GameWindow"]["min_width"]
self.min_height = self.visualization_config["GameWindow"]["min_height"]
self.buttons_width = self.visualization_config["GameWindow"]["buttons_width"]
self.buttons_height = self.visualization_config["GameWindow"]["buttons_height"]
self.order_bar_height = self.visualization_config["GameWindow"][
"order_bar_height"
]
(
self.window_width_fullscreen,
self.window_height_fullscreen,
) = pygame.display.get_desktop_sizes()[0]
if (
self.window_width_fullscreen >= 3840
and self.window_height_fullscreen >= 2160
):
self.window_width_fullscreen /= 2
self.window_height_fullscreen /= 2
self.window_width_windowed = self.min_width
self.window_height_windowed = self.min_height
self.kitchen_width = 1
self.kitchen_height = 1
self.kitchen_aspect_ratio = 1
self.images_path = ROOT_DIR / "pygame_gui" / "images"
self.vis = Visualizer(self.visualization_config)
self.fullscreen = False if self.show_debug_elements else True
self.menu_state = MenuStates.Start
self.manager: pygame_gui.UIManager
self.sub_processes = []
self.layout_file_paths_dict = {
p.name: p for p in (ROOT_DIR / "configs" / "layouts").rglob("*.layout")
}
self.layout_file_paths = sorted(list(self.layout_file_paths_dict.keys()))
self.current_layout_idx = 0
self.last_state: StateRepresentation
self.player_info = {"0": {"name": "0"}}
self.level_info = {
"name": "Level",
"recipe_graphs": [],
"number_players": -1,
"kitchen_size": (0, 0),
}
self.last_level = False
self.beeped_once = False
self.all_completed_meals = []
self.last_completed_meals = []
self.all_recipes_labels = []
self.last_recipes_labels = []
def setup_player_keys(self, players: list[str], number_key_sets=1, disjunct=False):
# First four keys are for movement. Order: Down, Up, Left, Right.
# 5th key is for interacting with counters.
# 6th key ist for picking up things or dropping them.
if number_key_sets:
key_set1 = PlayerKeySet(
move_keys=[pygame.K_a, pygame.K_d, pygame.K_w, pygame.K_s],
interact_key=pygame.K_f,
pickup_key=pygame.K_e,
switch_key=pygame.K_SPACE,
players=players,
joystick=0,
)
key_set2 = PlayerKeySet(
move_keys=[pygame.K_LEFT, pygame.K_RIGHT, pygame.K_UP, pygame.K_DOWN],
interact_key=pygame.K_i,
pickup_key=pygame.K_o,
switch_key=pygame.K_p,
players=players,
joystick=1,
)
key_sets = [key_set1, key_set2]
if disjunct:
key_set1.set_controlled_players(players[::2])
key_set2.set_controlled_players(players[1::2])
elif number_key_sets > 1:
key_set1.set_controlled_players(players)
key_set2.set_controlled_players(players)
key_set1.other_keyset = [key_set2]
key_set2.other_keyset = [key_set1]
key_set2.next_player()
return key_sets[:number_key_sets]
else:
return []
def handle_keys(self):
"""Handles keyboard inputs. Sends action for the respective players. When a key is held down, every frame
an action is sent in this function.
"""
keys = pygame.key.get_pressed()
for key_set in self.key_sets:
current_player_name = str(key_set.current_player)
relevant_keys = [keys[k] for k in key_set.move_keys]
if any(relevant_keys):
move_vec = np.zeros(2)
for idx, pressed in enumerate(relevant_keys):
if pressed:
move_vec += key_set.move_vectors[idx]
if np.linalg.norm(move_vec) != 0:
move_vec = move_vec / np.linalg.norm(move_vec)
action = Action(
current_player_name,
ActionType.MOVEMENT.value,
move_vec,
duration=self.time_delta,
)
self.send_action(action)
def handle_joy_stick_input(self, joysticks):
"""Handles joystick inputs for movement every frame
Args:
joysticks: list of joysticks
"""
# Axis 0: joy stick left: -1 = left, ~0 = center, 1 = right
# Axis 1: joy stick left: -1 = up, ~0 = center, 1 = down
# see control stuff here (at the end of the page): https://www.pygame.org/docs/ref/joystick.html
for key_set in self.key_sets:
current_player_name = str(key_set.current_player)
# if a joystick is connected for current player
if key_set.joystick in joysticks:
# Usually axis run in pairs, up/down for one, and left/right for the other. Triggers count as axes.
# You may want to take into account some tolerance to handle jitter, and
# joystick drift may keep the joystick from centering at 0 or using the full range of position values.
tolerance_threshold = 0.2
# axis 0 = joy stick left --> left & right
axis_left_right = joysticks[key_set.joystick].get_axis(0)
axis_up_down = joysticks[key_set.joystick].get_axis(1)
if (
abs(axis_left_right) > tolerance_threshold
or abs(axis_up_down) > tolerance_threshold
):
move_vec = np.zeros(2)
if abs(axis_left_right) > tolerance_threshold:
move_vec[0] += axis_left_right
# axis 1 = joy stick right --> up & down
if abs(axis_up_down) > tolerance_threshold:
move_vec[1] += axis_up_down
# if np.linalg.norm(move_vec) != 0:
# move_vec = move_vec / np.linalg.norm(move_vec)
action = Action(
current_player_name,
ActionType.MOVEMENT.value,
move_vec,
duration=self.time_delta,
)
self.send_action(action)
def handle_key_event(self, event):
"""Handles key events for the pickup and interaction keys. Pickup is a single action,
for interaction keydown and keyup is necessary, because the player has to be able to hold
the key down.
Args:
event: Pygame event for extracting the key action.
"""
for key_set in self.key_sets:
current_player_name = str(key_set.current_player)
if event.key == key_set.pickup_key and event.type == pygame.KEYDOWN:
action = Action(current_player_name, ActionType.PICK_UP_DROP, None)
self.send_action(action)
if event.key == key_set.interact_key:
if event.type == pygame.KEYDOWN:
action = Action(
current_player_name, ActionType.INTERACT, InterActionData.START
)
self.send_action(action)
elif event.type == pygame.KEYUP:
action = Action(
current_player_name, ActionType.INTERACT, InterActionData.STOP
)
self.send_action(action)
if event.key == key_set.switch_key:
if event.type == pygame.KEYDOWN:
key_set.next_player()
def handle_joy_stick_event(self, event, joysticks):
"""Handles joy stick events for the pickup and interaction keys. Pickup is a single action,
for interaction buttondown and buttonup is necessary, because the player has to be able to hold
the button down.
Args:
event: Pygame event for extracting the button action.
joysticks: list of joysticks
"""
for key_set in self.key_sets:
current_player_name = str(key_set.current_player)
# if a joystick is connected for current player
if key_set.joystick in joysticks:
# pickup = Button A <-> 0
if (
joysticks[key_set.joystick].get_button(0)
and event.type == pygame.JOYBUTTONDOWN
):
action = Action(current_player_name, ActionType.PICK_UP_DROP, None)
self.send_action(action)
# interact = Button X <-> 2
if (
joysticks[key_set.joystick].get_button(2)
and event.type == pygame.JOYBUTTONDOWN
):
action = Action(
current_player_name, ActionType.INTERACT, InterActionData.START
)
self.send_action(action)
# stop interaction if last pressed button was X <-> 2
if event.button == 2 and event.type == pygame.JOYBUTTONUP:
action = Action(
current_player_name, ActionType.INTERACT, InterActionData.STOP
)
self.send_action(action)
# switch button Y <-> 3
if joysticks[key_set.joystick].get_button(3):
if event.type == pygame.JOYBUTTONDOWN:
key_set.next_player()
def set_window_size(self):
if self.fullscreen:
flags = pygame.FULLSCREEN
self.window_width = self.window_width_fullscreen
self.window_height = self.window_height_fullscreen
else:
flags = 0
self.window_width = self.window_width_windowed
self.window_height = self.window_height_windowed
self.main_window = pygame.display.set_mode(
(
self.window_width,
self.window_height,
),
flags=flags,
)
def reset_window_size(self):
self.game_width = 0
self.game_height = 0
self.set_window_size()
def set_game_size(self, max_width=None, max_height=None):
if max_width is None:
max_width = self.window_width - (2 * self.screen_margin)
if max_height is None:
max_height = self.window_height - (2 * self.screen_margin)
self.kitchen_aspect_ratio = self.kitchen_height / self.kitchen_width
if self.kitchen_width > self.kitchen_height:
self.game_width = max_width
self.game_height = self.game_width * self.kitchen_aspect_ratio
if self.game_height > max_height:
self.game_height = max_height
self.game_width = self.game_height / self.kitchen_aspect_ratio
else:
self.game_height = max_height
self.game_width = self.game_height / self.kitchen_aspect_ratio
if self.game_width > max_width:
self.game_width = max_width
self.game_height = self.game_width * self.kitchen_aspect_ratio
self.grid_size = int(self.game_width / self.kitchen_width)
self.game_width = max(self.game_width, 100)
self.game_height = max(self.game_height, 100)
self.grid_size = max(self.grid_size, 1)
residual_x = self.game_width - (self.kitchen_width * self.grid_size)
residual_y = self.game_height - (self.kitchen_height * self.grid_size)
self.game_width -= residual_x
self.game_height -= residual_y
self.game_screen = pygame.Surface(
(
self.game_width,
self.game_height,
)
)
def init_ui_elements(self):
self.manager = pygame_gui.UIManager(
(self.window_width, self.window_height),
starting_language=self.language,
translation_directory_paths=[ROOT_DIR / "pygame_2d_vis" / "locales"],
)
self.manager.get_theme().load_theme(
ROOT_DIR / "pygame_2d_vis" / "gui_theme.json"
)
self.elements_margin = self.window_height * 0.02
########################################################################
# All screens
########################################################################
fullscreen_button_rect = pygame.Rect(
(0, 0), (self.buttons_width * 0.7, self.buttons_height)
)
fullscreen_button_rect.topright = (-self.buttons_width, 0)
self.fullscreen_button = pygame_gui.elements.UIButton(
relative_rect=fullscreen_button_rect,
text="translations.fullscreen",
manager=self.manager,
object_id="#fullscreen_button",
anchors={"right": "right", "top": "top"},
)
rect = pygame.Rect((0, 0), (self.buttons_width, self.buttons_height))
rect.topright = (0, 0)
self.quit_button = pygame_gui.elements.UIButton(
relative_rect=rect,
text="translations.quit_game",
manager=self.manager,
object_id="#quit_button",
anchors={"right": "right", "top": "top"},
)
########################################################################
# Start screen
########################################################################
self.start_button = pygame_gui.elements.UIButton(
relative_rect=pygame.Rect(
(0, 0), (self.buttons_width, self.buttons_height)
),
text="translations.start_game",
text_kwargs={},
manager=self.manager,
anchors={"center": "center"},
object_id="#start_button",
)
img = pygame.image.load(
ROOT_DIR / "pygame_2d_vis" / "gui_images" / f"continue_{self.language}.png"
).convert_alpha()
image_rect = img.get_rect()
img_width = self.buttons_width * 1.5
img_height = img_width * (image_rect.height / image_rect.width)
new_dims = (img_width, img_height)
img = pygame.transform.smoothscale(img, new_dims)
image_rect = img.get_rect()
image_rect.centery += 80
self.press_a_image = pygame_gui.elements.UIImage(
image_rect,
img,
manager=self.manager,
anchors={"centerx": "centerx", "centery": "centery"},
)
# self.press_a_image.set_dimensions(new_dims)
if not self.CONNECT_WITH_STUDY_SERVER:
assert len(self.layout_file_paths) != 0, "No layout files."
dropdown_width, dropdown_height = 200, 40
self.layout_selection = pygame_gui.elements.UIDropDownMenu(
relative_rect=pygame.Rect(
(
0,
0,
),
(dropdown_width, dropdown_height),
),
manager=self.manager,
options_list=self.layout_file_paths,
starting_option="basic.layout"
if "basic.layout" in self.layout_file_paths
else random.choice(self.layout_file_paths),
)
player_selection_rect = pygame.Rect(
(0, 0),
(
self.window_width * 0.9,
(self.window_height // 4),
),
)
player_selection_rect.bottom = -self.elements_margin
self.player_selection_container = pygame_gui.elements.UIPanel(
player_selection_rect,
manager=self.manager,
object_id="#players",
anchors={"bottom": "bottom", "centerx": "centerx"},
)
multiple_keysets_button_rect = pygame.Rect((0, 0), (190, 50))
self.multiple_keysets_button = pygame_gui.elements.UIButton(
relative_rect=multiple_keysets_button_rect,
manager=self.manager,
container=self.player_selection_container,
text="not set",
anchors={"left": "left", "centery": "centery"},
object_id="#multiple_keysets_button",
)
split_players_button_rect = pygame.Rect((0, 0), (190, 50))
self.split_players_button = pygame_gui.elements.UIButton(
relative_rect=split_players_button_rect,
manager=self.manager,
container=self.player_selection_container,
text="not set",
anchors={"centerx": "centerx", "centery": "centery"},
object_id="#split_players_button",
)
rect = pygame.Rect(
(0, 0),
(
self.window_width * 0.6,
self.player_selection_container.get_abs_rect().height * 0.35,
),
)
self.player_number_container = pygame_gui.elements.UIPanel(
relative_rect=rect,
manager=self.manager,
object_id="#players_players",
container=self.player_selection_container,
anchors={"top": "top", "centerx": "centerx"},
)
rect = pygame.Rect(
(0, 0),
(
self.window_width * 0.6,
self.player_selection_container.get_abs_rect().height * 0.35,
),
)
rect.bottom = 0
self.bot_number_container = pygame_gui.elements.UIPanel(
relative_rect=rect,
manager=self.manager,
object_id="#players_bots",
container=self.player_selection_container,
anchors={"bottom": "bottom", "centerx": "centerx"},
)
number_players_rect = pygame.Rect((0, 0), (200, 200))
self.added_players_label = pygame_gui.elements.UILabel(
number_players_rect,
manager=self.manager,
object_id="#number_players_label",
container=self.player_number_container,
text=f"not set: -",
anchors={"center": "center"},
)
number_bots_rect = pygame.Rect((0, 0), (200, 200))
self.added_bots_label = pygame_gui.elements.UILabel(
number_bots_rect,
manager=self.manager,
object_id="#number_bots_label",
container=self.bot_number_container,
text=f"not set: -",
anchors={"center": "center"},
)
size = 50
add_player_button_rect = pygame.Rect((0, 0), (size, size))
self.add_human_player_button = pygame_gui.elements.UIButton(
relative_rect=add_player_button_rect,
text="+",
manager=self.manager,
object_id="#quantity_button",
container=self.player_number_container,
anchors={"left_target": self.added_bots_label, "centery": "centery"},
)
remove_player_button_rect = pygame.Rect((0, 0), (size, size))
remove_player_button_rect.right = 0
self.remove_human_button = pygame_gui.elements.UIButton(
relative_rect=remove_player_button_rect,
text="-",
manager=self.manager,
object_id="#quantity_button",
container=self.player_number_container,
anchors={
"right": "right",
"right_target": self.added_bots_label,
"centery": "centery",
},
)
add_bot_button_rect = pygame.Rect((0, 0), (size, size))
self.add_bot_button = pygame_gui.elements.UIButton(
relative_rect=add_bot_button_rect,
text="+",
manager=self.manager,
object_id="#quantity_button",
container=self.bot_number_container,
anchors={"left_target": self.added_bots_label, "centery": "centery"},
)
remove_bot_button_rect = pygame.Rect((0, 0), (size, size))
remove_bot_button_rect.right = 0
self.remove_bot_button = pygame_gui.elements.UIButton(
relative_rect=remove_bot_button_rect,
text="-",
manager=self.manager,
object_id="#quantity_button",
container=self.bot_number_container,
anchors={
"right": "right",
"right_target": self.added_bots_label,
"centery": "centery",
},
)
########################################################################
# Tutorial screen
########################################################################
button_rect = pygame.Rect((0, 0), (220, 80))
button_rect.bottom = -self.elements_margin
self.continue_button = pygame_gui.elements.UIButton(
relative_rect=button_rect,
text="translations.continue",
manager=self.manager,
anchors={"centerx": "centerx", "bottom": "bottom"},
)
image = pygame.image.load(
ROOT_DIR / "pygame_2d_vis" / "gui_images" / f"controls_{self.language}.png"
).convert_alpha()
image_rect = image.get_rect()
# img_width = self.window_width * 0.68
# img_height = img_width * (image_rect.height / image_rect.width)
img_height = self.window_height * 0.95
img_width = img_height * (image_rect.width / image_rect.height)
new_dims = (img_width, img_height)
image = pygame.transform.smoothscale(image, new_dims)
image_rect = image.get_rect()
image_rect.left = self.elements_margin
image_rect.top = self.elements_margin
self.tutorial_image = pygame_gui.elements.UIImage(
image_rect,
image,
manager=self.manager,
anchors={"top": "top", "left": "left"},
)
arrow_img = pygame.image.load(
ROOT_DIR / "pygame_2d_vis" / "gui_images" / f"try_{self.language}.png"
).convert_alpha()
image_rect = arrow_img.get_rect()
img_width = self.window_width * 0.2
img_height = img_width * (image_rect.height / image_rect.width)
new_dims = (img_width, img_height)
arrow_img = pygame.transform.smoothscale(arrow_img, new_dims)
rect = arrow_img.get_rect()
rect.left = self.window_width * 0.55
rect.top = self.window_height * 0.7
self.arrow_img = pygame_gui.elements.UIImage(
relative_rect=rect,
image_surface=arrow_img,
manager=self.manager,
anchors={
"left": "left",
"top": "top",
},
)
rect = pygame.Rect(
(0, 0),
(self.window_width * 0.25, self.window_height * 0.4),
)
rect.right = -self.elements_margin
self.tutorial_recipe_container = pygame_gui.elements.UIPanel(
relative_rect=rect,
manager=self.manager,
object_id="#graph_container",
anchors={"right": "right", "top": "top", "top_target": self.quit_button},
)
self.tutorial_recipe_graph_rect = pygame.Rect(
(0, 0), (self.window_width * 0.25, self.window_height * 0.2)
)
self.tutorial_recipe_graph_rect.bottom = 0
self.tutorial_graph_image = pygame_gui.elements.UIImage(
relative_rect=self.tutorial_recipe_graph_rect,
image_surface=pygame.Surface(self.tutorial_recipe_graph_rect.size),
manager=self.manager,
object_id="#recipe_graph",
container=self.tutorial_recipe_container,
anchors={"centerx": "centerx", "bottom": "bottom"},
)
r = pygame.Rect((0, 0), (self.window_width * 0.25, self.window_height * 0.05))
r.bottom = 0
text = pygame_gui.elements.UILabel(
text="translations.salad_recipe",
relative_rect=r,
manager=self.manager,
container=self.tutorial_recipe_container,
object_id="#recipe_name",
anchors={
"centerx": "centerx",
"bottom": "bottom",
"bottom_target": self.tutorial_graph_image,
},
)
########################################################################
# PreGame screen
########################################################################
rect = pygame.Rect(
(0, 0),
(self.window_width, 50),
)
rect.top = 20
self.level_name_label = pygame_gui.elements.UILabel(
text=f"not set",
relative_rect=rect,
manager=self.manager,
object_id="#level_name",
anchors={"centerx": "centerx", "top": "top"},
)
self.text_recipes_label = pygame_gui.elements.UILabel(
text="translations.recipes_in_this_level",
relative_rect=pygame.Rect(
(0, 0),
(self.window_width * 0.5, 50),
),
manager=self.manager,
anchors={"centerx": "centerx", "top_target": self.level_name_label},
)
scroll_height = (
self.continue_button.get_abs_rect().top
- self.text_recipes_label.get_abs_rect().bottom
)
self.scroll_width = self.window_width
self.scroll_space_recipes = pygame_gui.elements.UIScrollingContainer(
relative_rect=pygame.Rect((0, 0), (self.scroll_width, scroll_height)),
manager=self.manager,
anchors={"centerx": "centerx", "top_target": self.text_recipes_label},
)
########################################################################
# Game screen
########################################################################
self.orders_label = pygame_gui.elements.UILabel(
text="translations.orders",
relative_rect=pygame.Rect(0, 0, self.buttons_width, self.screen_margin),
manager=self.manager,
object_id="#orders_label",
)
self.orders_container_width = (
self.window_width - (2 * self.buttons_width) - (self.buttons_width * 0.7)
)
# rect = pygame.Rect(
# 0,
# 0,
# self.orders_container_width,
# self.screen_margin,
# )
# self.orders_container = pygame_gui.elements.UIPanel(
# relative_rect=rect,
# manager=self.manager,
# object_id="#graph_container",
# anchors={
# "top": "top",
# "left": "left",
# "left_target": self.orders_label,
# },
# )
self.orders_image = pygame_gui.elements.UIImage(
relative_rect=pygame.Rect(
0, 0, self.orders_container_width, self.screen_margin
),
image_surface=pygame.Surface(
(self.orders_container_width, self.screen_margin)
),
manager=self.manager,
object_id="#recipe_graph",
anchors={"top": "top", "left_target": self.orders_label},
)
rect = pygame.Rect(
(0, 0),
(self.window_width * 0.3, self.buttons_height),
)
rect.bottomleft = (0, 0)
self.score_label = pygame_gui.elements.UILabel(
text=f"Score not set",
relative_rect=rect,
manager=self.manager,
object_id="#score_label",
anchors={"bottom": "bottom", "left": "left"},
)
rect = pygame.Rect(
(0, 0),
(self.window_width * 0.4, self.buttons_height),
)
rect.bottom = 0
self.timer_label = pygame_gui.elements.UILabel(
text="GAMETIME not set",
relative_rect=rect,
manager=self.manager,
object_id="#timer_label",
anchors={"bottom": "bottom", "centerx": "centerx"},
)
rect = pygame.Rect(
(0, 0),
(self.window_width, self.screen_margin),
)
rect.right = 20
self.wait_players_label = pygame_gui.elements.UILabel(
text="translations.waiting_for_players",
relative_rect=rect,
manager=self.manager,
object_id="#wait_players_label",
anchors={"centery": "centery", "right": "right"},
)
########################################################################
# PostGame screen
########################################################################
rect = pygame.Rect((0, 0), (220, 80))
rect.bottom = -self.elements_margin
self.next_game_button = pygame_gui.elements.UIButton(
relative_rect=rect,
manager=self.manager,
text="translations.next_game",
anchors={"centerx": "centerx", "bottom": "bottom"},
object_id="#split_players_button",
)
rect = pygame.Rect(
(0, 0),
(self.window_width, self.window_height * 0.07),
)
self.score_conclusion = pygame_gui.elements.UILabel(
text=f"not set",
relative_rect=rect,
manager=self.manager,
object_id="#score_conclusion",
anchors={"centerx": "centerx", "top_target": self.level_name_label},
)
self.completed_meals_text_label = pygame_gui.elements.UILabel(
text="translations.completed_meals",
relative_rect=pygame.Rect(
(0, 0),
(self.window_width, self.window_height * 0.05),
),
manager=self.manager,
object_id="#completed_meals_label",
anchors={"centerx": "centerx", "top_target": self.score_conclusion},
)
scroll_height = (
self.continue_button.get_abs_rect().top
- self.completed_meals_text_label.get_abs_rect().bottom
- 10
)
self.scroll_width_completed_meals = self.window_width
self.scroll_space_completed_meals = pygame_gui.elements.UIScrollingContainer(
relative_rect=pygame.Rect((0, 0), (self.scroll_width, scroll_height)),
manager=self.manager,
anchors={
"centerx": "centerx",
"top_target": self.completed_meals_text_label,
},
)
rect = pygame.Rect((0, 0), (220, 80))
rect.bottom = -self.elements_margin
self.finish_study_button = pygame_gui.elements.UIButton(
relative_rect=rect,
manager=self.manager,
text="translations.finish_study",
anchors={"centerx": "centerx", "bottom": "bottom"},
object_id="#split_players_button",
)
########################################################################
# End screen
########################################################################
rect = pygame.Rect(0, 0, self.window_width * 0.9, self.window_height * 0.4)
final_text_container = pygame_gui.elements.UIPanel(
relative_rect=rect,
manager=self.manager,
object_id="#graph_container",
anchors={"center": "center"},
)
height = self.window_height * 0.1
rect = pygame.Rect((0, 0), (self.window_width * 0.9, height))
rect.centery -= height / 2
text1 = pygame_gui.elements.UILabel(
text="translations.thank_you",
relative_rect=rect,
manager=self.manager,
object_id="#score_label",
container=final_text_container,
anchors={"centery": "centery", "centerx": "centerx"},
)
rect = pygame.Rect((0, 0), (self.window_width * 0.9, self.window_height * 0.1))
text2 = pygame_gui.elements.UILabel(
text="translations.signal_supervisor",
relative_rect=rect,
manager=self.manager,
object_id="#score_label",
container=final_text_container,
anchors={"top_target": text1, "centerx": "centerx"},
)
########################################################################
if self.show_debug_elements:
self.start_screen_elements = [
self.start_button,
self.quit_button,
self.fullscreen_button,
self.player_selection_container,
self.bot_number_container,
self.press_a_image,
]
self.on_all_screens = [
self.fullscreen_button,
self.quit_button,
]
self.other_elements = []
else:
self.start_screen_elements = [
self.start_button,
self.press_a_image,
]
self.on_all_screens = []
self.other_elements = [
self.player_selection_container,
self.bot_number_container,
self.press_a_image,
self.quit_button,
self.fullscreen_button,
]
if not self.CONNECT_WITH_STUDY_SERVER:
self.start_screen_elements.append(self.layout_selection)
else:
self.other_elements.append(self.layout_selection)
self.tutorial_screen_elements = [
self.tutorial_image,
self.continue_button,
self.tutorial_recipe_container,
self.arrow_img,
]
self.pregame_screen_elements = [
self.level_name_label,
self.text_recipes_label,
self.scroll_space_recipes,
self.continue_button,
]
self.game_screen_elements = [
self.orders_label,
self.orders_image,
self.score_label,
self.timer_label,
self.wait_players_label,
]
self.postgame_screen_elements = [
self.score_conclusion,
self.scroll_space_completed_meals,
self.level_name_label,
self.next_game_button,
self.finish_study_button,
self.completed_meals_text_label,
]
self.end_screen_elements = [
final_text_container,
]
def show_screen_elements(self, elements: list):
all_elements = (
self.start_screen_elements
+ self.tutorial_screen_elements
+ self.pregame_screen_elements
+ self.game_screen_elements
+ self.postgame_screen_elements
+ self.end_screen_elements
+ self.on_all_screens
+ self.other_elements
)
for element in all_elements:
element.hide()
for element in elements + self.on_all_screens:
element.show()
def update_tutorial_screen(self):
self.show_screen_elements(self.tutorial_screen_elements)
self.set_game_size(
max_height=self.window_height * 0.4,
max_width=self.window_width * 0.3,
)
self.game_center = (
self.window_width - self.game_width / 2 - (self.window_height * 0.04),
self.window_height - self.game_height / 2 - (self.window_height * 0.04),
)
width, height = self.tutorial_recipe_graph_rect.size
tutorial_graph_surface = pygame.Surface(
self.tutorial_recipe_graph_rect.size, flags=pygame.SRCALPHA
)
self.vis.draw_recipe_image(
tutorial_graph_surface,
self.level_info["recipe_graphs"][0],
width,
height,
grid_size=self.window_height / 18,
)
self.tutorial_graph_image.set_image(tutorial_graph_surface)
# self.tutorial_graph_image.set_dimensions((self.game_width, self.game_height))
def update_screen_elements(self):
match self.menu_state:
case MenuStates.Start:
self.show_screen_elements(self.start_screen_elements)
if self.CONNECT_WITH_STUDY_SERVER:
self.bot_number_container.hide()
self.update_selection_elements()
case MenuStates.ControllerTutorial:
self.update_tutorial_screen()
case MenuStates.PreGame:
self.init_ui_elements()
self.show_screen_elements(self.pregame_screen_elements)
self.update_pregame_screen()
case MenuStates.Game:
self.show_screen_elements(self.game_screen_elements)
case MenuStates.PostGame:
self.init_ui_elements()
self.show_screen_elements(self.postgame_screen_elements)
self.update_postgame_screen(self.last_state)
if self.last_level:
self.next_game_button.hide()
self.finish_study_button.show()
else:
self.next_game_button.show()
self.finish_study_button.hide()
case MenuStates.End:
self.show_screen_elements(self.end_screen_elements)
def draw_main_window(self):
self.main_window.fill(
colors[self.visualization_config["GameWindow"]["background_color"]]
)
match self.menu_state:
case MenuStates.ControllerTutorial:
self.draw_tutorial_screen_frame()
case MenuStates.Game:
self.draw_game_screen_frame()
self.manager.draw_ui(self.main_window)
self.manager.update(self.time_delta)
pygame.display.flip()
def draw_tutorial_screen_frame(self):
self.handle_keys()
self.handle_joy_stick_input(joysticks=self.joysticks)
state = self.request_state()
self.vis.draw_gamescreen(
self.game_screen,
state,
self.grid_size,
[int(k.current_player) for k in self.key_sets],
)
game_screen_rect = self.game_screen.get_rect()
game_screen_rect.center = self.game_center
self.main_window.blit(self.game_screen, game_screen_rect)
def update_postgame_screen(self, state):
score = state["score"]
# self.score_conclusion.set_text(f"Your final score is {score}. Hurray!")
self.score_conclusion.set_text(
"translations.final_score", text_kwargs={"score": str(score)}
)
self.level_name_label.set_text(
"translations.completed_level",
text_kwargs={"level": str(self.level_info["name"])},
)
served_meals = state["served_meals"]
row_height = self.window_height * 0.07
container_width = self.scroll_width_completed_meals
container_height = len(served_meals) * row_height
main_container = pygame_gui.elements.UIPanel(
relative_rect=pygame.Rect(
(0, 0),
(
container_width,
container_height,
),
),
object_id="#graph_container",
manager=self.manager,
container=self.scroll_space_completed_meals,
)
last_completed_meals = []
for idx, (player, meal) in enumerate(served_meals):
if idx == 0:
anchors = {"centerx": "centerx", "top": "top"}
else:
anchors = {
"centerx": "centerx",
"top_target": last_completed_meals[idx - 1],
}
rect = pygame.Rect(
(0, 0),
(
container_width / 2,
row_height,
),
)
container = pygame_gui.elements.UIPanel(
relative_rect=rect,
object_id="#graph_container",
manager=self.manager,
container=main_container,
anchors=anchors,
)
text = ":"
rect = pygame.Rect(
(0, 0),
(container_width / 10, row_height),
)
rect.left = 0
meal_label = pygame_gui.elements.UILabel(
text=text,
relative_rect=rect,
manager=self.manager,
container=container,
object_id="#served_meal",
anchors={"center": "center"},
)
cook_surface = pygame.Surface(
(row_height, row_height), flags=pygame.SRCALPHA
)
player_idx = int(player)
player_color = colors[self.vis.player_colors[player_idx]]
self.vis.draw_cook(
screen=cook_surface,
grid_size=row_height,
pos=np.array([row_height / 2, row_height / 2]),
color=player_color,
facing=np.array([0, 1]),
)
rect = cook_surface.get_rect()
rect.right = 0
cook_image = pygame_gui.elements.UIImage(
relative_rect=rect,
image_surface=cook_surface,
manager=self.manager,
container=container,
anchors={
"centery": "centery",
"right": "right",
"right_target": meal_label,
},
)
meal_surface = pygame.Surface(
(row_height, row_height), flags=pygame.SRCALPHA
)
self.vis.draw_item(
pos=np.array([row_height / 2, row_height / 2]),
item={"type": "Plate"},
plate=True,
screen=meal_surface,
grid_size=row_height,
)
self.vis.draw_item(
pos=np.array([row_height / 2, row_height / 2]),
item={"type": meal},
plate=True,
screen=meal_surface,
grid_size=row_height,
)
rect = meal_surface.get_rect()
# rect.left = 0
meal_image = pygame_gui.elements.UIImage(
relative_rect=rect,
image_surface=meal_surface,
manager=self.manager,
container=container,
anchors={"centery": "centery", "left_target": meal_label},
)
last_completed_meals.append(container)
self.scroll_space_completed_meals.set_scrollable_area_dimensions(
(self.scroll_width_completed_meals * 0.95, container_height)
)
def exit_game(self):
self.menu_state = MenuStates.PostGame
if self.CONNECT_WITH_STUDY_SERVER:
self.send_level_done()
self.disconnect_websockets()
self.update_postgame_screen(self.last_state)
self.update_screen_elements()
self.beeped_once = False
def draw_game_screen_frame(self):
self.last_state = self.request_state()
self.handle_keys()
self.handle_joy_stick_input(joysticks=self.joysticks)
if not self.beeped_once and self.last_state["all_players_ready"]:
self.beeped_once = True
self.play_bell_sound()
if self.last_state["ended"]:
self.exit_game()
else:
self.draw_game(self.last_state)
game_screen_rect = self.game_screen.get_rect()
game_screen_rect.center = [
self.window_width // 2,
self.window_height // 2,
]
self.main_window.blit(self.game_screen, game_screen_rect)
if not self.last_state["all_players_ready"]:
self.wait_players_label.show()
else:
self.wait_players_label.hide()
def draw_game(self, state):
"""Main visualization function.
Args: state: The game state returned by the environment."""
self.vis.draw_gamescreen(
self.game_screen,
state,
self.grid_size,
[int(k.current_player) for k in self.key_sets],
)
# orders_surface = pygame.Surface((self.orders_container_width, self.screen_margin))
self.vis.draw_orders(
screen=self.orders_image.image,
state=state,
grid_size=self.buttons_height,
width=self.orders_container_width,
height=self.screen_margin,
config=self.visualization_config,
)
# self.orders_image.set_image(orders_surface)
border = self.visualization_config["GameWindow"]["game_border_size"]
border_rect = pygame.Rect(
self.window_width // 2 - (self.game_width // 2) - border,
self.window_height // 2 - (self.game_height // 2) - border,
self.game_width + 2 * border,
self.game_height + 2 * border,
)
pygame.draw.rect(
self.main_window,
colors[self.visualization_config["GameWindow"]["game_border_color"]],
border_rect,
width=border,
)
self.update_score_label(state)
self.update_remaining_time(state["remaining_time"])
if state["info_msg"]:
for idx, msg in enumerate(reversed(state["info_msg"])):
text_surface = self.comic_sans.render(
msg[0],
antialias=True,
color=(0, 0, 0)
if msg[1] == "Normal"
else ((255, 0, 0) if msg[1] == "Warning" else (0, 255, 0)),
# bgcolor=(255, 255, 255),
)
self.main_window.blit(
text_surface,
(
self.window_width / 4,
self.window_height - self.screen_margin + 5 + (20 * idx),
),
)
def update_score_label(self, state):
score = state["score"]
self.score_label.set_text(
"translations.score", text_kwargs={"score": str(score)}
)
def update_remaining_time(self, remaining_time: float):
hours, rem = divmod(int(remaining_time), 3600)
minutes, seconds = divmod(rem, 60)
display_time = f"{minutes}:{'%02d' % seconds}"
self.timer_label.set_text(
"translations.time_remaining", text_kwargs={"time": display_time}
)
def create_env_on_game_server(self, tutorial):
if tutorial:
layout_path = ROOT_DIR / "configs" / "layouts" / "tutorial.layout"
environment_config_path = ROOT_DIR / "configs" / "tutorial_env_config.yaml"
else:
environment_config_path = ROOT_DIR / "configs" / "environment_config.yaml"
layout_path = self.layout_file_paths_dict[
self.layout_selection.selected_option
]
# layout_path = self.layout_file_paths[self.current_layout_idx]
item_info_path = ROOT_DIR / "configs" / "item_info.yaml"
with open(item_info_path, "r") as file:
item_info = file.read()
with open(layout_path, "r") as file:
layout = file.read()
with open(environment_config_path, "r") as file:
environment_config = file.read()
num_players = 1 if tutorial else self.number_players
seed = int(random.random() * 100000)
creation_json = CreateEnvironmentConfig(
manager_id=self.manager_id,
number_players=num_players,
environment_settings={"all_player_can_pause_game": False},
item_info_config=item_info,
environment_config=environment_config,
layout_config=layout,
seed=seed,
).model_dump(mode="json")
# print(CreateEnvironmentConfig.model_validate_json(json_data=creation_json))
env_info = requests.post(
f"{self.request_url}/manage/create_env/",
json=creation_json,
)
if env_info.status_code == 403:
raise ValueError(f"Forbidden Request: {env_info.json()['detail']}")
elif env_info.status_code == 409:
print("CONFLICT")
env_info = env_info.json()
assert isinstance(env_info, dict), "Env info must be a dictionary"
self.current_env_id = env_info["env_id"]
self.player_info = env_info["player_info"]
if tutorial:
self.player_ids = [str(list(self.player_info.keys())[0])]
if tutorial:
self.key_sets = self.setup_player_keys(
players=["0"],
number_key_sets=1,
disjunct=False,
)
else:
num_key_set = 2 if self.multiple_keysets else 1
self.key_sets = self.setup_player_keys(
players=[str(i) for i in range(self.number_humans_to_be_added)],
number_key_sets=min(self.number_humans_to_be_added, num_key_set),
disjunct=self.split_players,
)
self.level_info = env_info
self.level_info["name"] = self.layout_selection.selected_option
self.level_info["number_players"] = num_players
def update_pregame_screen(self):
self.level_name_label.set_text(
"translations.level_name", text_kwargs={"level": self.level_info["name"]}
)
graph_width = self.window_width * 0.55
rows = 0
for rg in self.level_info["recipe_graphs"]:
rows += len(np.unique(np.array(list(rg["layout"].values()))[:, 1]))
row_height = self.window_height / 14
container_width = self.scroll_width * 0.9
container_height = rows * row_height
icon_size = row_height * 0.9
main_container = pygame_gui.elements.UIPanel(
relative_rect=pygame.Rect(
(0, 0),
(
container_width,
container_height,
),
),
object_id="#graph_container",
manager=self.manager,
container=self.scroll_space_recipes,
)
last_recipes_labels = []
for idx, rg in enumerate(self.level_info["recipe_graphs"]):
meal = rg["meal"]
meal = re.sub(r"(?<!^)(?=[A-Z])", " ", meal)
positions = np.array(list(rg["layout"].values()))
unique_vals = np.unique(positions[:, 1])
height = row_height * len(unique_vals)
graph_height = height * 0.9
graph_surface = pygame.Surface(
(graph_width, graph_height), flags=pygame.SRCALPHA
)
self.vis.draw_recipe_image(
graph_surface, rg, graph_width, graph_height, icon_size
)
if idx == 0:
anchors = {"centerx": "centerx", "top": "top"}
else:
anchors = {
"centerx": "centerx",
"top_target": last_recipes_labels[idx - 1],
}
container = pygame_gui.elements.UIPanel(
relative_rect=pygame.Rect(
(0, 0),
(
container_width,
height,
),
),
object_id="#graph_container",
manager=self.manager,
container=main_container,
anchors=anchors,
)
rect = pygame.Rect(
(0, 0),
(self.window_width / 5, height),
)
label = pygame_gui.elements.UILabel(
text=f"translations.{meal}",
relative_rect=rect,
manager=self.manager,
container=container,
object_id="#recipe_name",
anchors={"centery": "centery", "left": "left"},
)
rect = graph_surface.get_rect()
rect.right = 0
graph_image = pygame_gui.elements.UIImage(
relative_rect=rect,
image_surface=graph_surface,
manager=self.manager,
object_id="#recipe_graph",
container=container,
anchors={"centery": "centery", "right": "right"},
)
last_recipes_labels.append(container)
self.scroll_space_recipes.set_scrollable_area_dimensions(
(self.scroll_width * 0.95, container_height)
)
def setup_tutorial(self):
answer = requests.post(
f"{self.request_url}/connect_to_tutorial/{self.participant_id}"
)
if answer.status_code == 200:
answer = requests.post(
f"{self.request_url}/get_game_connection/{self.participant_id}"
)
if answer.status_code == 200:
answer_json = answer.json()
self.player_info = answer_json["player_info"]["0"]
print("TUTORIAL PLAYER INFO", self.player_info)
self.level_info = answer_json["level_info"]
self.player_info = {self.player_info["player_id"]: self.player_info}
else:
log.warning("Could not connect to tutorial.")
else:
self.menu_state = MenuStates.Start
log.warning("Could not create tutorial.")
def get_game_connection(self):
if self.menu_state == MenuStates.ControllerTutorial:
self.setup_tutorial()
self.key_sets = self.setup_player_keys(["0"], 1, False)
self.vis.create_player_colors(1)
else:
answer = requests.post(
f"{self.request_url}/get_game_connection/{self.participant_id}"
)
if answer.status_code == 200:
answer_json = answer.json()
self.player_info = answer_json["player_info"]
print("GAME PLAYER INFO", self.player_info)
self.level_info = answer_json["level_info"]
self.last_level = self.level_info["last_level"]
else:
log.warning("COULD NOT GET GAME CONNECTION")
self.menu_state = MenuStates.Start
self.number_players = -1
if self.split_players:
assert (
self.number_humans_to_be_added > 1
), "Not enough players for key configuration."
num_key_set = 2 if self.multiple_keysets else 1
self.key_sets = self.setup_player_keys(
list(self.player_info.keys()),
min(self.number_humans_to_be_added, num_key_set),
self.split_players,
)
self.player_ids = list(self.player_info.keys())
def create_and_connect_bot(self, player_id, player_info):
player_hash = player_info["player_hash"]
print(
f'--general_plus="agent_websocket:{player_info["websocket_url"]};player_hash:{player_hash};agent_id:{player_id}"'
)
if self.USE_AAAMBOS_AGENT:
sub = Popen(
" ".join(
[
"exec",
"aaambos",
"run",
"--arch_config",
str(ROOT_DIR / "configs" / "agents" / "arch_config.yml"),
"--run_config",
str(ROOT_DIR / "configs" / "agents" / "run_config.yml"),
f'--general_plus="agent_websocket:{player_info["websocket_url"]};player_hash:{player_hash};agent_id:{player_id}"',
f"--instance={player_hash}",
]
),
shell=True,
)
else:
sub = Popen(
" ".join(
[
"python",
str(ROOT_DIR / "configs" / "agents" / "random_agent.py"),
f'--uri {player_info["websocket_url"]}',
f"--player_hash {player_hash}",
f"--player_id {player_id}",
]
),
shell=True,
)
self.sub_processes.append(sub)
def connect_websockets(self):
for p, (player_id, player_info) in enumerate(self.player_info.items()):
if p < self.number_humans_to_be_added:
# add player websockets
websocket = connect(player_info["websocket_url"])
message_dict = {
"type": PlayerRequestType.READY.value,
"player_hash": player_info["player_hash"],
}
ws_message = WebsocketMessage(**message_dict)
websocket.send(ws_message.json())
assert (
json.loads(websocket.recv())["status"] == 200
), "not accepted player"
self.websockets[player_id] = websocket
else:
# create bots and add bot websockets
self.create_and_connect_bot(player_id, player_info)
if p == 0:
self.state_player_id = player_id
def setup_game(self):
self.connect_websockets()
self.vis.create_player_colors(self.level_info["number_players"])
self.kitchen_width, self.kitchen_height = self.level_info["kitchen_size"]
def stop_game_on_server(self, reason: str) -> None:
log.debug(f"Stopping game: {reason}")
if not self.CONNECT_WITH_STUDY_SERVER:
answer = requests.post(
f"{self.request_url}/manage/stop_env/",
json={
"manager_id": self.manager_id,
"env_id": self.current_env_id,
"reason": reason,
},
)
if answer.status_code != 200:
log.warning(
"Could not send level done.",
answer.status_code,
answer.json()["detail"],
)
def send_tutorial_finished(self):
requests.post(
f"{self.request_url}/disconnect_from_tutorial/{self.participant_id}",
)
def finished_button_press(self):
if not self.CONNECT_WITH_STUDY_SERVER:
self.stop_game_on_server("finished_button_pressed")
self.menu_state = MenuStates.PostGame
log.debug("Pressed finished button")
self.update_screen_elements()
def fullscreen_button_press(self):
self.fullscreen = not self.fullscreen
self.set_window_size()
self.init_ui_elements()
self.set_game_size()
self.update_screen_elements()
def reset_gui_values(self):
self.currently_controlled_player_idx = 0
self.number_humans_to_be_added = 1
self.number_bots_to_be_added = 0
self.number_players = (
self.number_humans_to_be_added + self.number_bots_to_be_added
)
self.split_players = False
self.multiple_keysets = False
def update_selection_elements(self):
if self.number_humans_to_be_added == 1:
self.remove_human_button.disable()
self.multiple_keysets_button.hide()
self.split_players_button.hide()
else:
self.remove_human_button.enable()
if self.number_humans_to_be_added < 2:
self.multiple_keysets_button.hide()
else:
self.multiple_keysets_button.show()
if self.number_humans_to_be_added > 1 and self.multiple_keysets:
self.split_players_button.show()
else:
self.split_players_button.hide()
if self.number_bots_to_be_added == 0:
self.remove_bot_button.disable()
else:
self.remove_bot_button.enable()
self.number_players = (
self.number_humans_to_be_added + self.number_bots_to_be_added
)
text = (
"translations.2_players"
if self.multiple_keysets
else "translations.1_player"
)
self.multiple_keysets_button.set_text(text)
self.added_players_label.set_text(
"translations.humans_to_be_added",
text_kwargs={"number": str(self.number_humans_to_be_added)},
)
self.added_bots_label.set_text(
"translations.bots_to_be_added",
text_kwargs={"number": str(self.number_bots_to_be_added)},
)
text = (
"translations.split_players_yes"
if self.split_players
else "translations.split_players_no"
)
self.split_players_button.set_text(text)
if self.number_players == 0:
self.start_button.disable()
else:
self.start_button.enable()
def send_action(self, action: Action):
"""Sends an action to the game environment.
Args:
action: The action to be sent. Contains the player, action type and move direction if action is a movement.
"""
if isinstance(action.action_data, np.ndarray):
action.action_data = [
float(action.action_data[0]),
float(action.action_data[1]),
]
message_dict = {
"type": PlayerRequestType.ACTION.value,
"action": action,
"player_hash": self.player_info[action.player]["player_hash"],
}
ws_message = WebsocketMessage(**message_dict)
self.websockets[action.player].send(ws_message.json())
self.websockets[action.player].recv()
def request_state(self):
message_dict = {
"type": PlayerRequestType.GET_STATE.value,
"action": None,
"player_hash": self.player_info[self.state_player_id]["player_hash"],
}
ws_message = WebsocketMessage(**message_dict)
self.websockets[self.state_player_id].send(ws_message.json())
state = json.loads(self.websockets[self.state_player_id].recv())
return state
def disconnect_websockets(self):
for sub in self.sub_processes:
try:
if self.USE_AAAMBOS_AGENT:
pgrp = os.getpgid(sub.pid)
os.killpg(pgrp, signal.SIGINT)
subprocess.run(
"kill $(ps aux | grep 'aaambos' | awk '{print $2}')", shell=True
)
else:
sub.kill()
except ProcessLookupError:
pass
self.sub_processes = []
for websocket in self.websockets.values():
websocket.close()
def play_bell_sound(self):
bell_path = str(ROOT_DIR / "pygame_2d_vis" / "sync_bell.wav")
mixer.init()
mixer.music.load(bell_path)
mixer.music.set_volume(0.9)
mixer.music.play()
log.log(logging.INFO, "Started game, played bell sound")
def start_study(self):
answer = requests.post(
f"{self.request_url}/start_study/{self.participant_id}/{self.number_humans_to_be_added}"
)
if answer.status_code == 200:
self.last_level = False
self.get_game_connection()
else:
self.menu_state = MenuStates.Start
print(
"COULD NOT START STUDY; Response:",
answer.status_code,
answer.json()["detail"],
)
def send_level_done(self):
answer = requests.post(f"{self.request_url}/level_done/{self.participant_id}")
if answer.status_code != 200:
log.warning(
"Could not send level done.",
answer.status_code,
answer.json()["detail"],
)
def button_continue_postgame_pressed(self):
if self.CONNECT_WITH_STUDY_SERVER:
if not self.last_level:
self.get_game_connection()
else:
# self.current_layout_idx += 1
self.menu_state = MenuStates.Start
return
# self.create_env_on_game_server(tutorial=False)
# if self.current_layout_idx == len(self.layout_file_paths) - 1:
# self.last_level = True
# else:
# log.debug(f"LEVEL: {self.layout_file_paths[self.current_layout_idx]}")
self.menu_state = MenuStates.PreGame
def manage_button_event(self, button: pygame_gui.core.UIElement | None = None):
if button == self.quit_button:
if self.fullscreen:
self.fullscreen_button_press()
self.running = False
self.disconnect_websockets()
self.stop_game_on_server("Quit button")
self.menu_state = MenuStates.Start
log.debug("Pressed quit button")
return
elif button == self.fullscreen_button:
self.fullscreen_button_press()
log.debug("Pressed fullscreen button")
return
# Filter by shown screen page
match self.menu_state:
############################################
case MenuStates.Start:
match button:
case self.start_button:
if not (
self.number_humans_to_be_added
+ self.number_bots_to_be_added
):
pass
else:
self.menu_state = MenuStates.ControllerTutorial
if self.CONNECT_WITH_STUDY_SERVER:
self.get_game_connection()
else:
self.create_env_on_game_server(tutorial=True)
self.setup_game()
case self.add_human_player_button:
self.number_humans_to_be_added += 1
case self.add_bot_button:
self.number_bots_to_be_added += 1
case self.remove_human_button:
self.number_humans_to_be_added = max(
0, self.number_humans_to_be_added - 1
)
case self.remove_bot_button:
self.number_bots_to_be_added = max(
0, self.number_bots_to_be_added - 1
)
case self.multiple_keysets_button:
self.multiple_keysets = not self.multiple_keysets
self.split_players = False
case self.split_players_button:
self.split_players = not self.split_players
############################################
case MenuStates.ControllerTutorial:
self.exit_tutorial()
if self.CONNECT_WITH_STUDY_SERVER:
self.start_study()
else:
self.create_env_on_game_server(tutorial=False)
############################################
case MenuStates.PreGame:
if button == self.continue_button:
self.setup_game()
self.set_game_size()
self.menu_state = MenuStates.Game
############################################
case MenuStates.PostGame:
if button in [
self.finish_study_button,
self.next_game_button,
self.continue_button,
]:
if self.last_level:
self.menu_state = MenuStates.End
else:
self.button_continue_postgame_pressed()
############################################
def start_pygame(self):
"""Starts pygame and the gui loop. Each frame the game state is visualized and keyboard inputs are read."""
log.debug(f"Starting pygame gui at {self.FPS} fps")
pygame.font.init()
self.comic_sans = pygame.font.SysFont("Comic Sans MS", 30)
pygame.display.set_caption("Cooperative Cuisine")
clock = pygame.time.Clock()
self.reset_window_size()
self.init_ui_elements()
self.reset_window_size()
self.reset_gui_values()
self.update_screen_elements()
# Game loop
self.running = True
# This dict can be left as-is, since pygame will generate a
# pygame.JOYDEVICEADDED event for every joystick connected
# at the start of the program.
self.joysticks = {}
while self.running:
try:
self.time_delta = clock.tick(self.FPS) / 1000
# PROCESSING EVENTS
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.disconnect_websockets()
self.running = False
# connect joystick
if (
pygame.joystick.get_count() > 0
and event.type == pygame.JOYDEVICEADDED
):
# This event will be generated when the program starts for every
# joystick, filling up the list without needing to create them manually.
joy = pygame.joystick.Joystick(event.device_index)
self.joysticks[joy.get_instance_id()] = joy
print(f"Joystick {joy.get_instance_id()} connected")
# disconnect joystick
if event.type == pygame.JOYDEVICEREMOVED:
del self.joysticks[event.instance_id]
print(f"Joystick {event.instance_id} disconnected")
print("Number of joysticks:", pygame.joystick.get_count())
# Press enter key or controller start button instead of mouse button press
if (
event.type == pygame.JOYBUTTONDOWN
and any(
[
self.joysticks and self.joysticks[i].get_button(7)
for i in range(len(self.joysticks))
]
)
or (
event.type == pygame.KEYDOWN
and event.key == pygame.K_RETURN
)
):
if self.menu_state == MenuStates.Start:
self.manage_button_event(self.start_button)
self.update_screen_elements()
elif self.menu_state in [
MenuStates.ControllerTutorial,
MenuStates.PreGame,
MenuStates.PostGame,
]:
self.manage_button_event(self.continue_button)
self.update_screen_elements()
if event.type == pygame_gui.UI_BUTTON_PRESSED:
button = event.ui_element
self.manage_button_event(button)
if button in [
self.start_button,
self.continue_button,
self.finish_study_button,
self.next_game_button,
]:
self.update_screen_elements()
elif self.menu_state == MenuStates.Start:
self.update_selection_elements()
if event.type in [
pygame.KEYDOWN,
pygame.KEYUP,
] and self.menu_state in [
MenuStates.Game,
MenuStates.ControllerTutorial,
]:
self.handle_key_event(event)
if event.type in [
pygame.JOYBUTTONDOWN,
pygame.JOYBUTTONUP,
] and self.menu_state in [
MenuStates.Game,
MenuStates.ControllerTutorial,
]:
self.handle_joy_stick_event(event, joysticks=self.joysticks)
self.manager.process_events(event)
# DRAWING
self.draw_main_window()
except (KeyboardInterrupt, SystemExit):
self.running = False
self.disconnect_websockets()
if not self.CONNECT_WITH_STUDY_SERVER:
self.stop_game_on_server("Program exited.")
if self.fullscreen_button:
self.fullscreen_button_press()
self.disconnect_websockets()
if not self.CONNECT_WITH_STUDY_SERVER:
self.stop_game_on_server("Program exited.")
if self.fullscreen:
self.fullscreen_button_press()
pygame.quit()
sys.exit()
def exit_tutorial(self):
self.disconnect_websockets()
self.menu_state = MenuStates.PreGame
if self.CONNECT_WITH_STUDY_SERVER:
self.send_tutorial_finished()
else:
self.stop_game_on_server("tutorial_finished")
def main(
study_url: str,
study_port: int,
game_url: str,
game_port: int,
manager_ids: list[str],
CONNECT_WITH_STUDY_SERVER=False,
USE_AAAMBOS_AGENT=False,
debug=False,
):
setup_logging()
gui = PyGameGUI(
study_host=study_url,
study_port=study_port,
game_host=game_url,
game_port=game_port,
manager_ids=manager_ids,
CONNECT_WITH_STUDY_SERVER=CONNECT_WITH_STUDY_SERVER,
USE_AAAMBOS_AGENT=USE_AAAMBOS_AGENT,
debug=debug,
)
gui.start_pygame()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Cooperative Cuisine 2D PyGame Visualization",
description="PyGameGUI: a PyGame 2D Visualization window.",
epilog="For further information, see https://scs.pages.ub.uni-bielefeld.de/cocosy/overcooked-simulator/overcooked_simulator.html",
)
url_and_port_arguments(parser)
disable_websocket_logging_arguments(parser)
add_list_of_manager_ids_arguments(parser)
add_gui_arguments(parser)
args = parser.parse_args()
main(
args.study_url,
args.study_port,
args.game_url,
args.game_port,
args.manager_ids,
CONNECT_WITH_STUDY_SERVER=True,
debug=args.do_study,
)