Skip to content
Snippets Groups Projects
Commit 85601ce4 authored by Florian Schröder's avatar Florian Schröder
Browse files

Add optional parameters to control websocket timing

The create and connect methods within the websocket extension now include optional parameters for sleep_between_recv and timeout_loop. These parameters, passed down to the recv_messages method, provide fine-grained control over the timing behavior of the websocket communication, improving flexibility for various use cases.
parent ec9d2196
No related branches found
No related tags found
No related merge requests found
Pipeline #45323 passed
......@@ -31,24 +31,25 @@ class WebsocketExtension(Extension):
def get_extension_setup(cls) -> Type[ExtensionSetup]:
return WebsocketExtensionSetup
async def create(self, ref: str, url: str, callback: Callable[[str, Any], Awaitable], port: int):
await websockets.serve(partial(self.recv_messages, callback=callback, ref=ref, active_websockets=self.active_websockets), url, port)
async def create(self, ref: str, url: str, callback: Callable[[str, Any], Awaitable], port: int, sleep_between_recv: float = 0.05, timeout_loop: float = 0.1):
await websockets.serve(partial(self.recv_messages, callback=callback, ref=ref, active_websockets=self.active_websockets, sleep_between_recv=sleep_between_recv, timeout_loop=timeout_loop), url, port)
async def connect(self, ref: str, uri: str, callback: Callable[[str, Any], Awaitable]):
async def connect(self, ref: str, uri: str, callback: Callable[[str, Any], Awaitable], sleep_between_recv: float = 0.05, timeout_loop: float = 0.1):
# sleep_between_recv in seconds
websocket = await websockets.connect(uri)
asyncio.create_task(self.recv_messages(websocket=websocket, callback=callback, ref=ref, active_websockets=self.active_websockets))
asyncio.create_task(self.recv_messages(websocket=websocket, callback=callback, ref=ref, active_websockets=self.active_websockets, sleep_between_recv=sleep_between_recv, timeout_loop=timeout_loop))
@staticmethod
async def recv_messages(websocket, callback: Callable[[str, Any], Awaitable], ref: str, active_websockets):
async def recv_messages(websocket, callback: Callable[[str, Any], Awaitable], ref: str, active_websockets, sleep_between_recv: float, timeout_loop: float):
active_websockets[ref] = websocket
try:
while True:
try:
async with timeout(0.1):
async with timeout(timeout_loop):
message = await websocket.recv()
await callback(ref, message)
except (CancelledError, TimeoutError):
await asyncio.sleep(0.05)
await asyncio.sleep(sleep_between_recv)
except Exception as e:
traceback.print_exception(e)
finally:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment