From baedfa6c1eec6f11fbd1727f76104dda1b20e840 Mon Sep 17 00:00:00 2001 From: percyjw-2 <joris.wachsmuth@gmx.de> Date: Mon, 25 Jul 2022 00:24:55 +0200 Subject: [PATCH] Documented ConnectionManagerTCPHandler --- .gitlab-ci.yml | 2 +- doc/conf.py | 4 + swarm/ConnectionManager.py | 90 ++----------------- swarm/ConnectionManagerTCPHandler.py | 125 +++++++++++++++++++++++++++ 4 files changed, 135 insertions(+), 86 deletions(-) create mode 100644 swarm/ConnectionManagerTCPHandler.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index bcba794..cc61c26 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -38,7 +38,7 @@ run: pages: script: - source venv/bin/activate - - pip install sphinx sphinx-rtd-theme + - pip install sphinx sphinx-rtd-theme sphinx-autoapi - cd doc - make html - mv _build/html/ ../public/ diff --git a/doc/conf.py b/doc/conf.py index 2b8c23c..9cd10a9 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -31,8 +31,12 @@ release = '0.0.1' # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ + 'autoapi.extension' ] +autoapi_type = 'python' +autoapi_dirs = ['../swarm'] + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/swarm/ConnectionManager.py b/swarm/ConnectionManager.py index d6ff41e..6b62ee9 100644 --- a/swarm/ConnectionManager.py +++ b/swarm/ConnectionManager.py @@ -5,7 +5,9 @@ import threading from enum import Enum import re from functools import partial -from typing import List, Callable, Optional, Union, Tuple +from typing import List, Callable, Optional, Tuple + +from swarm.ConnectionManagerTCPHandler import ConnectionManagerTCPHandler try: from time import time_ns @@ -34,86 +36,6 @@ class InvalidIPString(Exception): pass -class NotInContextManagerMode(Exception): - pass - - -class ConnectionManagerTCPHandler(socketserver.BaseRequestHandler): - def __init__(self, connection_manager, *args, **kwargs): - self.connection_manager = connection_manager - super().__init__(*args, **kwargs) - - # match statement is not available in Python 3.6 :( - def announce(self, launch_time: str, addr: str): - address_parsed = _string_to_ip_and_port(addr) - self.connection_manager.sockets[address_parsed] = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.connection_manager.sockets[address_parsed].connect(address_parsed) - announced_launch_time = int(launch_time) - updated_launch_time = False - while announced_launch_time in self.connection_manager.connectedIPs.values(): - updated_launch_time = True - announced_launch_time = announced_launch_time + 1 - self.connection_manager.connectedIPs[address_parsed] = announced_launch_time - updated_time = "" - if updated_launch_time: - updated_time = "," + str(announced_launch_time) - self.send_message(str(self.connection_manager.creation_time) + updated_time) - - def update_launch_time(self, launch_time, addr): - address_parsed = _string_to_ip_and_port(addr) - self.connection_manager.connectedIPs[address_parsed] = int(launch_time) - - def heartbeat(self): - self.send_message(StandardMessages.ACKNOWLEDGED.value) - - def get_addresses(self): - address_string =\ - ",".join([addr[0] + ":" + str(addr[1]) for addr in self.connection_manager.connectedIPs.keys()]) - self.send_message(address_string) - - def get_master(self): - master_addr = self.connection_manager.master_addr - addr_str = master_addr[0] + ":" + str(master_addr[1]) - self.send_message(addr_str) - - def default_case(self, message): - for func in self.connection_manager.listeners: - return_msg = func(message) - if return_msg is not None: - self.send_message(return_msg) - - def handle(self): - while not self.connection_manager.stop_socketserver: - msg_recvd = "" - try: - msg_recvd = str(self.request.recv(self.connection_manager._buffer_size), "utf-8").lower() - except ConnectionError: - pass - if not msg_recvd: - break - msg_split = msg_recvd.split(":") - cmd = msg_split[0] - msg = ":".join(msg_split[1:len(msg_split)]) - msg_args = msg.split(",") - - if cmd == StandardMessages.ANNOUNCE.value: - self.announce(*msg_args) - elif cmd == StandardMessages.UPDATE_LAUNCH.value: - self.update_launch_time(*msg_args) - elif cmd == StandardMessages.HEARTBEAT.value: - self.heartbeat() - elif cmd == StandardMessages.GET_MASTER.value: - self.get_master() - elif cmd == StandardMessages.GET_ADDRESSES.value: - self.get_addresses() - else: - self.default_case(msg_recvd) - - def send_message(self, message: Union[str, int]): - message = str(message).encode("utf-8") - self.request.sendall(message) - - def _string_to_ip_and_port(message: str) -> Tuple[str, int]: valid_ipv4 = re.compile(r"^(\d?\d?\d.){3}\d?\d?\d:(\d?){4}\d$") valid_ipv6 = re.compile(r"^([a-f\d:]+:+)+[a-f\d]+:(\d?){4}\d$") @@ -155,9 +77,6 @@ class ConnectionManager: self.listeners = [] def __enter__(self): - if self._ip_list is None: - raise NotInContextManagerMode("An IP List needs to be provided to the Constructor to use this class " - "with the 'with' keyword.") self.connect(self._ip_list) return self @@ -263,7 +182,8 @@ class ConnectionManager: self.sockets.clear() if self.socketServerThread is not None: self.stop_socketserver = True - self.socketServer.shutdown() + if self.socketServer is not None: + self.socketServer.shutdown() def send_message(self, message: str, address: Tuple[str, int]): if self.sockets.get(address) is None: diff --git a/swarm/ConnectionManagerTCPHandler.py b/swarm/ConnectionManagerTCPHandler.py new file mode 100644 index 0000000..8605a21 --- /dev/null +++ b/swarm/ConnectionManagerTCPHandler.py @@ -0,0 +1,125 @@ +import socket +import socketserver +from typing import Union + +from swarm import _string_to_ip_and_port, StandardMessages + + +class ConnectionManagerTCPHandler(socketserver.BaseRequestHandler): + """ + Handles Connections from the ConnectionManager. + Is created by the ConnectionManager during the launch of the socketserver. + """ + def __init__(self, connection_manager, *args, **kwargs): + self.connection_manager = connection_manager + super().__init__(*args, **kwargs) + + # match statement is not available in Python 3.6 :( + def announce(self, launch_time: str, addr: str): + """ + Handles new connections. + 1. Establishes new socket to client + 2. Checks if launch time is unique + 3. If not a new launch time is generated + 4. Own launch time and if generated the updated launch time is sent back to the client + + :param launch_time: time when the connecting client launched + :param addr: address of the socketserver of the connecting client + """ + address_parsed = _string_to_ip_and_port(addr) + self.connection_manager.sockets[address_parsed] = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.connection_manager.sockets[address_parsed].connect(address_parsed) + announced_launch_time = int(launch_time) + updated_launch_time = False + while announced_launch_time in self.connection_manager.connectedIPs.values(): + updated_launch_time = True + announced_launch_time = announced_launch_time + 1 + self.connection_manager.connectedIPs[address_parsed] = announced_launch_time + updated_time = "" + if updated_launch_time: + updated_time = "," + str(announced_launch_time) + self.send_message(str(self.connection_manager.creation_time) + updated_time) + + def update_launch_time(self, launch_time, addr): + """ + If a client got an updated launch time it announces its new one to all connected clients. + Here this time is getting updated + + :param launch_time: updated time when the connecting client started + :param addr: address of the connecting client + """ + address_parsed = _string_to_ip_and_port(addr) + self.connection_manager.connectedIPs[address_parsed] = int(launch_time) + + def heartbeat(self): + """ + A client always needs to answer the heartbeat message, or it will be removed from the network + """ + self.send_message(StandardMessages.ACKNOWLEDGED.value) + + def get_addresses(self): + """ + Sends all addresses of the connected clients to the requester + """ + address_string =\ + ",".join([addr[0] + ":" + str(addr[1]) for addr in self.connection_manager.connectedIPs.keys()]) + self.send_message(address_string) + + def get_master(self): + """ + Sends back current master of the network + """ + master_addr = self.connection_manager.master_addr + addr_str = master_addr[0] + ":" + str(master_addr[1]) + self.send_message(addr_str) + + def default_case(self, message): + """ + If a message is not part of the Standard Messages it will be sent to all custom message handlers that are + listening for new messages. If a handler returns a string it will be sent to the client. + + :param message: complete received message from client + """ + for func in self.connection_manager.listeners: + return_msg = func(message) + if return_msg is not None: + self.send_message(return_msg) + + def handle(self): + """ + Handles all incoming messages and parses them from following schema: [Command:arg,arg,...] + """ + while not self.connection_manager.stop_socketserver: + msg_recvd = "" + try: + msg_recvd = str(self.request.recv(self.connection_manager._buffer_size), "utf-8").lower() + except ConnectionError: + pass + if not msg_recvd: + break + msg_split = msg_recvd.split(":") + cmd = msg_split[0] + msg = ":".join(msg_split[1:len(msg_split)]) + msg_args = msg.split(",") + + if cmd == StandardMessages.ANNOUNCE.value: + self.announce(*msg_args) + elif cmd == StandardMessages.UPDATE_LAUNCH.value: + self.update_launch_time(*msg_args) + elif cmd == StandardMessages.HEARTBEAT.value: + self.heartbeat() + elif cmd == StandardMessages.GET_MASTER.value: + self.get_master() + elif cmd == StandardMessages.GET_ADDRESSES.value: + self.get_addresses() + else: + self.default_case(msg_recvd) + + def send_message(self, message: Union[str, int]): + """ + Parses provided message to bytes and sends the message back to the client + + :param message: message that will be parsed and sent + """ + message = str(message).encode("utf-8") + self.request.sendall(message) -- GitLab