diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 627c3334e..9c093c403 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 627c3334e52021aa8d5772b6ca076884610f3219 +Subproject commit 9c093c4039478c04c7e55a1825c391dd4742fd61 diff --git a/include/aws/iot/MqttRequestResponseClient.h b/include/aws/iot/MqttRequestResponseClient.h index 805db22c3..35123d6c6 100644 --- a/include/aws/iot/MqttRequestResponseClient.h +++ b/include/aws/iot/MqttRequestResponseClient.h @@ -121,12 +121,28 @@ namespace Aws /** * Default constructor */ - IncomingPublishEvent() : m_payload() { AWS_ZERO_STRUCT(m_payload); } + IncomingPublishEvent() : m_topic(), m_payload() + { + AWS_ZERO_STRUCT(m_topic); + AWS_ZERO_STRUCT(m_payload); + } + + /** + * Sets the message response topic associated with this event. The event does not own this topic. + * + * @param topic the message response topic associated with this event + * @return reference to this + */ + IncomingPublishEvent &WithTopic(Aws::Crt::ByteCursor topic) + { + m_topic = topic; + return *this; + } /** * Sets the message payload associated with this event. The event does not own this payload. * - * @param payload he message payload associated with this event + * @param payload the message payload associated with this event * @return reference to this */ IncomingPublishEvent &WithPayload(Aws::Crt::ByteCursor payload) @@ -135,6 +151,13 @@ namespace Aws return *this; } + /** + * Gets the message response topic associated with this event. + * + * @return the message response topic associated with this event + */ + Aws::Crt::ByteCursor GetTopic() const { return m_topic; } + /** * Gets the message payload associated with this event. * @@ -143,6 +166,7 @@ namespace Aws Aws::Crt::ByteCursor GetPayload() const { return m_payload; } private: + Aws::Crt::ByteCursor m_topic; Aws::Crt::ByteCursor m_payload; }; diff --git a/source/iot/MqttRequestResponseClient.cpp b/source/iot/MqttRequestResponseClient.cpp index 2bdcb960f..cb142797b 100644 --- a/source/iot/MqttRequestResponseClient.cpp +++ b/source/iot/MqttRequestResponseClient.cpp @@ -89,7 +89,10 @@ namespace Aws int error_code, void *user_data); - static void OnIncomingPublishCallback(struct aws_byte_cursor payload, void *user_data); + static void OnIncomingPublishCallback( + struct aws_byte_cursor payload, + struct aws_byte_cursor topic, + void *user_data); static void OnTerminatedCallback(void *user_data); @@ -187,7 +190,10 @@ namespace Aws } } - void StreamingOperationImpl::OnIncomingPublishCallback(struct aws_byte_cursor payload, void *user_data) + void StreamingOperationImpl::OnIncomingPublishCallback( + struct aws_byte_cursor payload, + struct aws_byte_cursor topic, + void *user_data) { auto *handle = static_cast(user_data); StreamingOperationImpl *impl = handle->m_impl.get(); @@ -198,7 +204,7 @@ namespace Aws if (!impl->m_closed && impl->m_config.incomingPublishEventHandler) { IncomingPublishEvent event; - event.WithPayload(payload); + event.WithTopic(topic).WithPayload(payload); impl->m_config.incomingPublishEventHandler(std::move(event)); } diff --git a/tests/MqttRequestResponse.cpp b/tests/MqttRequestResponse.cpp index f701d208d..0658713be 100644 --- a/tests/MqttRequestResponse.cpp +++ b/tests/MqttRequestResponse.cpp @@ -40,6 +40,12 @@ struct ResponseTracker bool complete; }; +struct TestPublishEvent +{ + Aws::Crt::String topic; + Aws::Crt::String payload; +}; + struct TestState { TestState(Aws::Crt::Allocator *allocator) : allocator(allocator) {} @@ -54,7 +60,7 @@ struct TestState Aws::Crt::Vector> responseTrackers; Aws::Crt::Vector subscriptionStatusEvents; - Aws::Crt::Vector incomingPublishEvents; + Aws::Crt::Vector incomingPublishEvents; }; static void s_waitForConnected(struct TestState *state) @@ -168,17 +174,20 @@ static void s_onIncomingPublishEvent(Aws::Iot::RequestResponse::IncomingPublishE { std::unique_lock lock(state->lock); + auto topicCursor = event.GetTopic(); + Aws::Crt::String topicAsString((const char *)topicCursor.ptr, topicCursor.len); + auto payloadCursor = event.GetPayload(); Aws::Crt::String payloadAsString((const char *)payloadCursor.ptr, payloadCursor.len); - state->incomingPublishEvents.push_back(payloadAsString); + state->incomingPublishEvents.push_back({std::move(topicAsString), std::move(payloadAsString)}); } state->signal.notify_one(); } static void s_waitForIncomingPublishWithPredicate( TestState *state, - const std::function &predicate) + const std::function &predicate) { { std::unique_lock lock(state->lock); @@ -189,7 +198,7 @@ static void s_waitForIncomingPublishWithPredicate( return std::any_of( state->incomingPublishEvents.cbegin(), state->incomingPublishEvents.cend(), - [=](const Aws::Crt::String &payload) { return predicate(payload); }); + [=](const TestPublishEvent &publishEvent) { return predicate(publishEvent); }); }); } } @@ -1077,7 +1086,9 @@ static int s_doShadowUpdatedStreamIncomingPublishTest(Aws::Crt::Allocator *alloc s_publishToProtocolClient(context, uuid, s_publishPayload, allocator); s_waitForIncomingPublishWithPredicate( - &state, [](const Aws::Crt::String &payload) { return payload == Aws::Crt::String(s_publishPayload); }); + &state, + [&uuid](const TestPublishEvent &publishEvent) + { return publishEvent.topic == uuid && publishEvent.payload == Aws::Crt::String(s_publishPayload); }); return AWS_OP_SUCCESS; }