Newer
Older
"""
Implements the moving of the players. Checks collisions and pushing other around.
For efficiency, we tried to do everything with numpy arrays and functions.
"""
from datetime import timedelta, datetime
from typing import Tuple
import numpy as np
import numpy.typing as npt
from scipy.spatial import distance_matrix
from cooperative_cuisine.counters import Counter
from cooperative_cuisine.hooks import Hooks, PLAYERS_COLLIDE

Florian Schröder
committed
from cooperative_cuisine.player import Player, PlayerConfig
class Movement:
"""Does the movement of the players."""
world_borders_lower: npt.NDArray[float] = None
"""World borders lower bounds."""
world_borders_upper: npt.NDArray[float] = None
"""World borders upper bounds."""

Florian Schröder
committed
def __init__(
self,
counter_positions: npt.NDArray[float],
player_config: PlayerConfig,
world_borders: npt.NDArray[float],
hook: Hooks,
):
"""Constructor of Movement.
Args:
counter_positions: Positions of all counters in an environment. Needs to be updated if the counters position changes.
player_config: Dictionary containing player configuration settings.
world_borders: The world border arrays. For easier numpy comparison with player position. The outer dimension needs to be equal to the number of players.
hook: Hook manager. Register callbacks and create hook points with additional kwargs.
"""
self.counter_positions: npt.NDArray[float] = counter_positions
"""Positions of all counters in an environment. Needs to be updated if the counters position changes."""

Florian Schröder
committed
self.player_radius: float = player_config.radius
"""The radius of a player (indicating its size and collision sphere). Relative to one grid cell, e.g., `0.4`."""

Florian Schröder
committed
self.player_interaction_range: float = player_config.interaction_range
"""The range of how far a player can interact with the closest counter."""

Florian Schröder
committed
self.player_movement_speed: float | int = player_config.speed_units_per_seconds
"""How many grid cells a player can move in a second."""
self.world_borders: npt.NDArray[float] = world_borders
"""The world border arrays. For easier numpy comparison with player position. The outer dimension needs to be
equal to the number of player."""
self.set_collision_arrays(1)
self.hook: Hooks = hook
"""Hook manager. Register callbacks and create hook points with additional kwargs."""
def set_collision_arrays(self, number_players: int):
"""Sets collision arrays for the given number of players.
Args:
number_players (int): The number of players.
"""
self.world_borders_lower = self.world_borders[np.newaxis, :, 0].repeat(
number_players, axis=0
)
self.world_borders_upper = self.world_borders[np.newaxis, :, 1].repeat(
number_players, axis=0
)
def get_counter_collisions(
self, player_positions: npt.NDArray[float]
) -> Tuple[npt.NDArray[bool], npt.NDArray[int], npt.NDArray[float]]:
"""
Args:
player_positions: 2D numpy array containing the positions of the players.
Returns:
collided: 1D numpy array indicating whether each player has collided with a counter.
relevant_axes: 1D numpy array indicating the relevant axis for each player's collision.
nearest_counter_to_player: 2D numpy array indicating the vector from each player to the nearest counter.
"""
counter_diff_vecs = (
player_positions[:, np.newaxis, :]
- self.counter_positions[np.newaxis, :, :]
)
counter_distances = np.max((np.abs(counter_diff_vecs)), axis=2)
closest_counter_positions = self.counter_positions[
np.argmin(counter_distances, axis=1)
]
nearest_counter_to_player = player_positions - closest_counter_positions
relevant_axes = np.abs(nearest_counter_to_player).argmax(axis=1)
distances = np.linalg.norm(
np.max(
[
np.abs(counter_diff_vecs) - 0.5,
np.zeros(counter_diff_vecs.shape),
],
axis=0,
),
axis=2,
)
collided = np.any(distances < self.player_radius, axis=1)
return collided, relevant_axes, nearest_counter_to_player
def get_player_push(
self, player_positions: npt.NDArray[float]
"""Calculates the collision and push vectors for each player.
Args:
player_positions (numpy.ndarray): An array of shape (n, 2) representing the positions of the players.
Returns:
Array (push_vectors) is a numpy array of shape (2,)
representing the total push vector for each player.
"""
distances_players_after = distance_matrix(
player_positions, player_positions
)
collisions = distances_players_after < 2 * self.player_radius
player_diff_vecs = -(
player_positions[:, np.newaxis, :] - player_positions[np.newaxis, :, :]
)
eye_idxs = np.eye(len(player_positions), len(player_positions), dtype=bool)
collisions[eye_idxs] = False
if np.any(collisions):
self.hook(
PLAYERS_COLLIDE,
collisions=collisions,
)
player_diff_vecs[collisions == False] = 0
push_vectors = np.sum(player_diff_vecs, axis=0)
def perform_movement(
self,
duration: timedelta,
env_time: datetime,
players: dict[str, Player],
counters: list[Counter],
):
"""Moves a player in the direction specified in the action.action.
If the player collides with a counter or other player through this movement, then they are not moved.
(The extended code with the two ifs is for sliding movement at the counters, which feels a bit smoother.
This happens, when the player moves diagonally against the counters or world boundary.
This just checks if the single axis party of the movement could move the player and does so at a lower rate.)
The movement action is a unit 2d vector.
Detects collisions with other players and pushes them out of the way.
Args:
duration: The duration for how long the movement to perform.
env_time: The current time of the environment.
players: The players in the environment.
counters: The counters in the environment.
"""
d_time = duration.total_seconds()
player_positions: npt.NDArray[float] = np.array(
[p.pos for p in players.values()], dtype=float
)
player_movement_vectors: npt.NDArray[float] = np.array(
[
p.current_movement if env_time <= p.movement_until else [0, 0]
for p in players.values()
],
dtype=float,
)
targeted_positions = player_positions + (
player_movement_vectors * (self.player_movement_speed * d_time)
)
# Collisions player between player
force_factor = 1.2
push_vectors = self.get_player_push(targeted_positions)
updated_movement = (force_factor * push_vectors) + player_movement_vectors
new_targeted_positions = player_positions + (
updated_movement * (self.player_movement_speed * d_time)
)
# same again to prevent squeezing into other players
push_vectors2 = self.get_player_push(new_targeted_positions)
updated_movement = (force_factor * push_vectors2) + updated_movement
new_targeted_positions = player_positions + (
updated_movement * (self.player_movement_speed * d_time)
)
# check if players collided with counters through movement or through being pushed
(
collided,
relevant_axes,
nearest_counter_to_player,
) = self.get_counter_collisions(new_targeted_positions)
# If collided, check if the players could still move along the axis, starting with x
# This leads to players beeing able to slide along counters, which feels alot nicer.
projected_x = updated_movement.copy()
projected_x[collided, 1] = 0
new_targeted_positions[collided] = player_positions[collided] + (
projected_x[collided] * (self.player_movement_speed * d_time)
)
(
collided,
relevant_axes,
nearest_counter_to_player,
) = self.get_counter_collisions(new_targeted_positions)
new_targeted_positions[collided] = player_positions[collided]
projected_y = updated_movement.copy()
projected_y[collided, 0] = 0
new_targeted_positions[collided] = player_positions[collided] + (
projected_y[collided] * (self.player_movement_speed * d_time)
new_positions = new_targeted_positions
# Check collisions with counters a final time, now absolute with no sliding possible.
# Players should never be able to enter counters this way.
(
collided,
relevant_axes,
nearest_counter_to_player,
) = self.get_counter_collisions(new_positions)
new_positions[collided] = player_positions[collided]
# Collisions of players with world borders
new_positions = np.clip(
new_positions,
self.world_borders_lower + self.player_radius,
self.world_borders_upper - self.player_radius,
)
for idx, p in enumerate(players.values()):
if not (new_positions[idx] == player_positions[idx]).all():
p.pos = new_positions[idx]
p.turn(player_movement_vectors[idx])
facing_distances = np.linalg.norm(
p.facing_point - self.counter_positions, axis=1
)
closest_counter = counters[facing_distances.argmin()]
p.current_nearest_counter = (
closest_counter
if facing_distances.min() <= self.player_interaction_range
else None
)
if p.last_interacted_counter != p.current_nearest_counter:
if p.interacting:
p.perform_interact_stop()