Skip to content

Commit

Permalink
Refactor logging in NetworkRelay to use the logging module for improv…
Browse files Browse the repository at this point in the history
…ed message handling and debugging
  • Loading branch information
elikoga committed Jan 28, 2025
1 parent b142624 commit 83bf7f9
Showing 1 changed file with 25 additions and 36 deletions.
61 changes: 25 additions & 36 deletions http_network_relay/network_relay.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#!/usr/bin/env python
import asyncio
import base64
import os
import sys
import logging
import threading
import uuid
from contextlib import AbstractAsyncContextManager, AbstractContextManager
Expand All @@ -24,14 +23,7 @@
RtETCPDataMessage,
)

debug = False
if os.getenv("DEBUG") == "1":
debug = True


def eprint(*args, only_debug=False, **kwargs):
if (debug and only_debug) or (not only_debug):
print(*args, file=sys.stderr, **kwargs)
logger = logging.getLogger(__name__)


class TcpConnection(AbstractContextManager):
Expand Down Expand Up @@ -216,9 +208,9 @@ async def accept_ws_and_start_msg_loop_for_edge_agents(
start_message = EdgeAgentToRelayMessage.model_validate_json(
start_message_json_data
).inner
eprint(f"Message received from agent: {start_message}")
logger.info("Message received from agent: %s", start_message)
if not isinstance(start_message, EtRStartMessage):
eprint(f"Unknown message received from agent: {start_message}")
logger.warning("Unknown message received from agent: %s", start_message)
return

if self.CustomAgentToRelayStartMessage is not None:
Expand All @@ -232,7 +224,7 @@ async def accept_ws_and_start_msg_loop_for_edge_agents(
)
)
if not connection_id_or_falsy:
eprint(f"Agent {start_message} cannot proceed to tcp relaying")
logger.warning("Agent %s cannot proceed to tcp relaying", start_message)
# close the connection
await edge_agent_connection.close()
return
Expand All @@ -244,13 +236,13 @@ async def accept_ws_and_start_msg_loop_for_edge_agents(
if self.registered_agent_connections[connection_id].closed:
del self.registered_agent_connections[connection_id]
else:
eprint(f"Agent already registered: {connection_id}")
logger.warning("Agent already registered: %s", connection_id)
# close the connection
await edge_agent_connection.close()
return

self.registered_agent_connections[connection_id] = edge_agent_connection
eprint(f"Registered agent connection: {connection_id}")
logger.info("Registered agent connection: %s", connection_id)

msg_loop_task = asyncio.create_task(
self._msg_loop(edge_agent_connection, connection_id)
Expand All @@ -270,9 +262,9 @@ async def _msg_loop(self, edge_agent_connection: WebSocket, connection_id: str):
while True:
try:
json_data = await edge_agent_connection.receive_text()
eprint(f"Received message from agent: {json_data}", only_debug=True)
logger.debug("Received message from agent: %s", json_data)
except WebSocketDisconnect:
eprint(f"Agent disconnected: {connection_id}")
logger.warning("Agent disconnected: %s", connection_id)
del self.registered_agent_connections[connection_id]
break
message_outer = None
Expand All @@ -285,26 +277,26 @@ async def _msg_loop(self, edge_agent_connection: WebSocket, connection_id: str):
message = self.CustomAgentToRelayMessage.model_validate_json(
json_data
) # pylint: disable=E1101
eprint(f"Message received from agent: {message}", only_debug=True)
logger.debug("Message received from agent: %s", message)
if isinstance(message, EtRTCPDataMessage):
eprint(
f"Received TCP data message from agent: {message}", only_debug=True
)
logger.debug("Received TCP data message from agent: %s", message)
await self.handle_tcp_data_message(message)
elif isinstance(message, EtRConnectionResetMessage):
eprint(f"Received connection reset message from agent: {message}")
logger.info("Received connection reset message from agent: %s", message)
await self.handle_connection_reset_message(message)
elif message_outer is not None:
# this is a request-response message
# get queue
eprint(f"Got request-response message: {message}", only_debug=True)
logger.debug("Got request-response message: %s", message)
response_queue = self.initiate_connection_answer_queues.get(
message.connection_id
)
if response_queue is None:
eprint(
f"Got a message {message} for connection_id: "
f"{message.connection_id} but no corresponding waiting queue found"
logger.warning(
"Got a message %s for connection_id: %s "
"but no corresponding waiting queue found",
message,
message.connection_id,
)
continue
# put message in queue
Expand All @@ -314,8 +306,7 @@ async def _msg_loop(self, edge_agent_connection: WebSocket, connection_id: str):
self.CustomAgentToRelayMessage,
):
await self.handle_custom_agent_message(message)
else:
eprint(f"Unknown message received from agent: {message}")
logger.warning("Unknown message received from agent: %s", message)

async def _create_connection(
self,
Expand Down Expand Up @@ -373,14 +364,14 @@ async def create_connection_async(

async def handle_tcp_data_message(self, message: EtRTCPDataMessage):
if message.connection_id not in self.active_relayed_connections:
eprint(f"Unknown connection_id: {message.connection_id}")
logger.warning("Unknown connection_id: %s", message.connection_id)
return
connection = self.active_relayed_connections[message.connection_id]
connection.fill_recv(base64.b64decode(message.data_base64))

async def handle_connection_reset_message(self, message: EtRConnectionResetMessage):
if message.connection_id not in self.active_relayed_connections:
eprint(f"Unknown connection_id: {message.connection_id}")
logger.warning(f"Unknown connection_id: {message.connection_id}")
return
del self.active_relayed_connections[message.connection_id]

Expand All @@ -392,14 +383,12 @@ async def send_message_and_wait_for_answer(
await agent_connection.send_text(
RelayToEdgeAgentMessage(inner=message).model_dump_json()
)
eprint(
f"Waiting for response for connection_id: {message.connection_id}",
only_debug=True,
logger.debug(
"Waiting for response for connection_id: %s",
message.connection_id,
)
response = await response_queue.get()
eprint(
f"Got response for connection_id: {message.connection_id}", only_debug=True
)
logger.debug("Got response for connection_id: %s", message.connection_id)
del self.initiate_connection_answer_queues[message.connection_id]
return response

Expand Down

0 comments on commit 83bf7f9

Please sign in to comment.