Newer
Older

Florian Schröder
committed
"""The player contains the logic which method to call on counters and items for a pick action:
* If the player **holds nothing**, it **picks up** the content from the counter.
* If the **item** the player **holds** can be **dropped** on the counter it will do so.
* If the counter is not a sink or plate dispenser, it checks if it **can combine the content** on the counter **with the
holding object**. If so, it picks up the content and combines it on its hands.

Florian Schröder
committed
"""
import dataclasses

Florian Schröder
committed
from collections import deque
from datetime import datetime, timedelta

Florian Schröder
committed
from typing import Optional

Fabian Heinrich
committed
import numpy as np
from cooperative_cuisine.counters import Counter
from cooperative_cuisine.hooks import (
Hooks,
PLAYER_START_INTERACT,
PLAYER_END_INTERACT,
)
from cooperative_cuisine.items import Item, ItemType
from cooperative_cuisine.state_representation import PlayerState

Florian Schröder
committed
@dataclasses.dataclass
class PlayerConfig:
"""Configure the player attributes in the `environment.yml`."""
radius: float = 0.4
"""The size of the player. The size of a counter is 1"""
speed_units_per_seconds: float | int = 8

Florian Schröder
committed
"""The move distance/speed of the player per action call."""
interaction_range: float = 1.6
"""How far player can interact with counters."""
restricted_view: bool = False
"""Whether or not the player can see the entire map at once or just a view frustrum."""
view_angle: int | None = None
"""Angle of the players view if restricted."""
view_range: float | None = None
"""Range of the players view if restricted. In grid units."""

Florian Schröder
committed
class Player:
"""Class representing a player in the game environment. A player consists of a name, their position and what
the player is currently holding in the hands.
This class handles interactions with counters and objects.
"""

Fabian Heinrich
committed
def __init__(
self,

Florian Schröder
committed
player_config: PlayerConfig,

Fabian Heinrich
committed
pos: Optional[npt.NDArray[float]] = None,
):
"""Constructor of Player.
Args:
name: The name of the player.
player_config: The player's configuration object.
pos: The initial position of the player. Defaults to None.
"""
self.name: str = name
"""Reference for the player"""
self.pos: npt.NDArray[float] | None = None
"""The initial/suggested position of the player."""

Fabian Heinrich
committed
if pos is not None:
self.pos: npt.NDArray[float] = np.array(pos, dtype=float)
"""What item the player is holding."""
self.player_config: PlayerConfig = player_config
"""See `PlayerConfig`."""
self.facing_direction: npt.NDArray[float] = np.array([0, 1])

Florian Schröder
committed
"""Current direction the player looks."""

Fabian Heinrich
committed
self.last_interacted_counter: Optional[
Counter
] = None # needed to stop progress when moved away
"""With which counter the player interacted with in the last environment step."""

Fabian Heinrich
committed
self.current_nearest_counter: Optional[Counter] = None
"""The counter to interact with."""

Fabian Heinrich
committed
self.facing_point: npt.NDArray[float] = np.zeros(2, float)
"""A point on the "circle" of the players border in the `facing_direction` with which the closest counter is
calculated with."""

Fabian Heinrich
committed
self.current_movement: npt.NDArray[float] = np.zeros(2, float)
"""The movement vector that will be used to calculate the movement in the next step call."""
self.movement_until: datetime = datetime.min
"""The env time until the player wants to move."""
self.interacting: bool = False
"""Is the player currently interacting with a counter."""
self.hook: Hooks = hook
"""Hook manager. Register callbacks and create hook points with additional kwargs."""
def set_movement(self, move_vector, move_until):
"""Called by the `perform_action` method. Movements will be performed (pos will be updated) in the `step`
function of the environment"""
self.current_movement = move_vector
self.movement_until = move_until
if self.interacting:
self.perform_interact_stop()
def move_abs(self, new_pos: npt.NDArray[float]):
"""Overwrites the player location by the new_pos 2d-vector. Absolute movement.
Mostly needed for resetting the player after a collision.
Args:
new_pos: 2D-Vector of the new player position.
"""
def turn(self, direction: npt.NDArray[float]):
"""Turns the player in the given direction. Overwrites the facing_direction by a given 2d-vector.
facing_direction is normalized to length 1.
Args:
direction: 2D-Vector of the direction for the player to face.
"""
if np.linalg.norm(direction) != 0:
self.facing_direction = direction / np.linalg.norm(direction)
self.update_facing_point()
def update_facing_point(self):
"""Update facing point on the player border circle based on the radius."""
self.facing_point = self.pos + (self.facing_direction * 0.62)
def can_reach(self, counter: Counter) -> bool:
"""Checks whether the player can reach the counter in question. Simple check if the distance is not larger

Fabian Heinrich
committed
than the player interaction range.
Args:
counter: The counter, can the player reach it?
Returns:
True if the counter is in range of the player, False if not.

Fabian Heinrich
committed
"""
return (
np.linalg.norm(counter.pos - self.facing_point)
<= self.player_config.interaction_range
)

Fabian Heinrich
committed
def put_action(self, counter: Counter):
"""Performs the pickup-action with the counter. Handles the logic of what the player is currently holding,
what is currently on the counter and what can be picked up or combined in hand.
Args:
counter: The counter to pick things up from or put things down.
"""

Fabian Heinrich
committed
if self.holding is None:
self.holding = counter.pick_up(player=self.name)

Fabian Heinrich
committed

Fabian Heinrich
committed
elif counter.can_drop_off(self.holding):
self.holding = counter.drop_off(self.holding, player=self.name)

Florian Schröder
committed
elif not isinstance(
counter.occupied_by, (list, deque)
) and self.holding.can_combine(counter.occupied_by):
returned_by_counter = counter.pick_up(on_hands=False, player=self.name)
self.holding.combine(returned_by_counter)
f"Self: {self.holding.__class__.__name__}: {self.holding}, {counter.__class__.__name__}: {counter.occupied_by}"

Fabian Heinrich
committed
# if isinstance(self.holding, Plate):
# log.debug(self.holding.clean)
def perform_interact_start(self, counter: Counter):

Fabian Heinrich
committed
"""Starts an interaction with the counter. Should be called for a
keydown event, for holding down a key on the keyboard.
Args:
counter: The counter to start the interaction with.
"""
self.interacting = True
self.last_interacted_counter = counter
self.hook(PLAYER_START_INTERACT, player=self.name, counter=counter)

Fabian Heinrich
committed
def perform_interact_stop(self):
"""Stops an interaction with the counter. Should be called for a keyup event, for letting go of a keyboard
key."""
self.hook(
PLAYER_END_INTERACT,
player=self.name,
counter=self.last_interacted_counter,
)
self.last_interacted_counter = None
def progress(self, passed_time: timedelta, now: datetime):
"""Iterative progress call on the player.
Similar to the `Counter.progress` method.
How often it is called depends on the `env_step_frequency` or how often `step` is called.
Args:
passed_time: The amount of time that has passed since the last call of the method.
now: The current env time.
"""
if self.interacting and self.last_interacted_counter:
# TODO only interact on counter (Sink/CuttingBoard) if hands are free configure in config?
if self.holding:
if self.holding.item_info.type == ItemType.Tool:
self.last_interacted_counter.do_tool_interaction(
passed_time, self.holding
)
else:
self.last_interacted_counter.do_hand_free_interaction(
passed_time, now, self.name
)

Fabian Heinrich
committed
def __repr__(self):
return f"Player(name:{self.name},pos:{str(self.pos)},holds:{self.holding})"
def to_dict(self) -> PlayerState:
"""For the state representation. Only the relevant attributes are put into the dict."""
# TODO add color to player class for vis independent player color
return {
"id": self.name,
"pos": self.pos.tolist(),
"facing_direction": self.facing_direction.tolist(),
"holding": self.holding.to_dict() if self.holding else None,
"current_nearest_counter_pos": self.current_nearest_counter.pos.tolist()
if self.current_nearest_counter
else None,
"current_nearest_counter_id": self.current_nearest_counter.uuid
if self.current_nearest_counter
else None,
}