From c4767eaad4110d259add34e8827ab2f07b38a0e3 Mon Sep 17 00:00:00 2001 From: paulhamer-noaa <81647525+paulhamer-noaa@users.noreply.github.com> Date: Tue, 4 Feb 2025 11:17:17 -0700 Subject: [PATCH 1/3] Update request_id in properties for RPC --- python/idsse_common/idsse/common/rabbitmq_utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 8887829c..2555060c 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -359,7 +359,8 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None: # send request to external RMQ service, providing the queue where it should respond properties = BasicProperties(content_type='application/json', - correlation_id=request_id, + #correlation_id=request_id, + headers={'rpc': request_id}, reply_to=self._queue.name) # add future to dict where callback can retrieve it and set result @@ -376,7 +377,8 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None: # block until callback runs (we'll know when the future's result has been changed) return request_future.result(timeout=self._timeout) except TimeoutError: - logger.warning('Timed out waiting for response. correlation_id: %s', request_id) + # logger.warning('Timed out waiting for response. correlation_id: %s', request_id) + logger.warning('Timed out waiting for response. rpc request_id: %s', request_id) self._pending_requests.pop(request_id) # stop tracking request Future return None except Exception as exc: # pylint: disable=broad-exception-caught @@ -415,7 +417,8 @@ def _response_callback( method.routing_key, properties.content_type, str(body, encoding='utf-8')) # remove future from pending list. we will update result shortly - request_future = self._pending_requests.pop(properties.correlation_id) + # request_future = self._pending_requests.pop(properties.correlation_id) + request_future = self._pending_requests.pop(properties.headers['rpc']) # messages sent through RabbitMQ Direct reply-to are auto acked is_direct_reply = str(method.routing_key).startswith(DIRECT_REPLY_QUEUE) From 35ea7d3a07525e440b8d5052e84f8a6422c974dc Mon Sep 17 00:00:00 2001 From: paulhamer-noaa <81647525+paulhamer-noaa@users.noreply.github.com> Date: Tue, 4 Feb 2025 11:26:12 -0700 Subject: [PATCH 2/3] Updated test with properties headers to hold UUID for response confirmation. --- python/idsse_common/test/test_rabbitmq_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/idsse_common/test/test_rabbitmq_utils.py b/python/idsse_common/test/test_rabbitmq_utils.py index 5effb30c..68319a1a 100644 --- a/python/idsse_common/test/test_rabbitmq_utils.py +++ b/python/idsse_common/test/test_rabbitmq_utils.py @@ -296,9 +296,9 @@ def test_send_request_works_without_calling_start(rpc_thread: Rpc, def mock_blocking_publish(*_args, **_kwargs): # build mock message from imaginary external service method = Method('', 123) - props = BasicProperties(content_type='application/json', correlation_id=EXAMPLE_UUID) + #props = BasicProperties(content_type='application/json', correlation_id=EXAMPLE_UUID) + props = BasicProperties(content_type='application/json', headers={'rpc': EXAMPLE_UUID}) body = bytes(json.dumps(example_message), encoding='utf-8') - rpc_thread._response_callback(mock_channel, method, props, body) monkeypatch.setattr('idsse.common.rabbitmq_utils._blocking_publish', From 6e2ba1ba21d2a7f903594a8b979aae574f814c53 Mon Sep 17 00:00:00 2001 From: paulhamer-noaa <81647525+paulhamer-noaa@users.noreply.github.com> Date: Tue, 4 Feb 2025 14:31:16 -0700 Subject: [PATCH 3/3] Remove old code --- python/idsse_common/idsse/common/rabbitmq_utils.py | 2 -- python/idsse_common/test/test_rabbitmq_utils.py | 1 - 2 files changed, 3 deletions(-) diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 2555060c..f0eb8b97 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -359,7 +359,6 @@ def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None: # send request to external RMQ service, providing the queue where it should respond properties = BasicProperties(content_type='application/json', - #correlation_id=request_id, headers={'rpc': request_id}, reply_to=self._queue.name) @@ -417,7 +416,6 @@ def _response_callback( method.routing_key, properties.content_type, str(body, encoding='utf-8')) # remove future from pending list. we will update result shortly - # request_future = self._pending_requests.pop(properties.correlation_id) request_future = self._pending_requests.pop(properties.headers['rpc']) # messages sent through RabbitMQ Direct reply-to are auto acked diff --git a/python/idsse_common/test/test_rabbitmq_utils.py b/python/idsse_common/test/test_rabbitmq_utils.py index 68319a1a..c1163db7 100644 --- a/python/idsse_common/test/test_rabbitmq_utils.py +++ b/python/idsse_common/test/test_rabbitmq_utils.py @@ -296,7 +296,6 @@ def test_send_request_works_without_calling_start(rpc_thread: Rpc, def mock_blocking_publish(*_args, **_kwargs): # build mock message from imaginary external service method = Method('', 123) - #props = BasicProperties(content_type='application/json', correlation_id=EXAMPLE_UUID) props = BasicProperties(content_type='application/json', headers={'rpc': EXAMPLE_UUID}) body = bytes(json.dumps(example_message), encoding='utf-8') rpc_thread._response_callback(mock_channel, method, props, body)