Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Shared Subscription #587

Merged
merged 31 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7357838
Test Shared Subscription
alfred2g Dec 19, 2023
dab7753
Fix warning unsused variable
alfred2g Dec 19, 2023
f3e1354
Fix unused variables
alfred2g Dec 19, 2023
c2f736b
clang format
alfred2g Dec 19, 2023
8e2e721
clang format
alfred2g Dec 19, 2023
fc9fae8
clang format
alfred2g Dec 19, 2023
589b21f
Add testcase to cmakefile
alfred2g Dec 19, 2023
363c2c5
Fix client connection
alfred2g Dec 19, 2023
cd71d8e
add client id for each connection
alfred2g Dec 20, 2023
0ea4c7a
Fix client id
alfred2g Dec 20, 2023
af856e5
Fix promise
alfred2g Dec 20, 2023
5bfcbba
print future set
alfred2g Dec 20, 2023
f579921
print to stderr
alfred2g Dec 20, 2023
64adf47
add print at start
alfred2g Dec 20, 2023
1ba04ec
add debug prints
alfred2g Dec 20, 2023
1d009c9
add more debug prints
alfred2g Dec 20, 2023
ff2c27d
add debug prints
alfred2g Dec 20, 2023
79a6c7a
fix topic path
alfred2g Dec 20, 2023
3d245f5
commenting futures
alfred2g Dec 20, 2023
4f6f7bc
send correct subscription variable
alfred2g Dec 20, 2023
652ba1d
add futures waiting without setting
alfred2g Dec 20, 2023
c8ccf90
add unsubscribe from topic
alfred2g Dec 20, 2023
2677e34
fix assumptions that clients receive messages equally
alfred2g Dec 20, 2023
80aaf44
add atomic computations
alfred2g Dec 20, 2023
27a1f14
fix compare and exchange
alfred2g Dec 20, 2023
a25d9cf
fix test rule
alfred2g Dec 20, 2023
9f0a6e7
Fix wrong assert for unsubscribe
alfred2g Dec 20, 2023
1df5c49
Cleanup code
alfred2g Dec 20, 2023
60e2adc
try to use one callback lambda
alfred2g Dec 21, 2023
c75539b
Remove commented code
alfred2g Dec 21, 2023
685b5cd
clang format
alfred2g Dec 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ if(NOT BYO_CRYPTO)
add_net_test_case(Mqtt5InterruptUnsub)
add_net_test_case(Mqtt5InterruptPublishQoS1)
add_net_test_case(Mqtt5OperationStatisticsSimple)
add_net_test_case(Mqtt5SharedSubscriptionTest)

# Mqtt5-to-3 Adapter
add_test_case(Mqtt5to3AdapterNewConnectionMin)
Expand Down
242 changes: 235 additions & 7 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ static void s_setupConnectionLifeCycle(
{
mqtt5Options.WithClientConnectionSuccessCallback(
[&connectionPromise, clientName](const OnConnectionSuccessEventData &) {
printf("[MQTT5]%s Connection Success.", clientName);
printf("[MQTT5]%s Connection Success.\n", clientName);
connectionPromise.set_value(true);
});

mqtt5Options.WithClientConnectionFailureCallback(
[&connectionPromise, clientName](const OnConnectionFailureEventData &eventData) {
printf("[MQTT5]%s Connection failed with error : %s", clientName, aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});
mqtt5Options.WithClientConnectionFailureCallback([&connectionPromise,
clientName](const OnConnectionFailureEventData &eventData) {
printf("[MQTT5]%s Connection failed with error : %s\n", clientName, aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});

mqtt5Options.WithClientStoppedCallback([&stoppedPromise, clientName](const OnStoppedEventData &) {
printf("[MQTT5]%s Stopped", clientName);
printf("[MQTT5]%s Stopped\n", clientName);
stoppedPromise.set_value();
});
}
Expand Down Expand Up @@ -1914,7 +1914,9 @@ static int s_TestMqtt5WillTest(Aws::Crt::Allocator *allocator, void *)
/* Subscribe to test topic */
Mqtt5::Subscription subscription(TEST_TOPIC, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(allocator);

subscribe->WithSubscription(std::move(subscription));

std::promise<void> subscribed;
ASSERT_TRUE(subscriber->Subscribe(
subscribe, [&subscribed](int, std::shared_ptr<Mqtt5::SubAckPacket>) { subscribed.set_value(); }));
Expand All @@ -1936,6 +1938,232 @@ static int s_TestMqtt5WillTest(Aws::Crt::Allocator *allocator, void *)
}
AWS_TEST_CASE(Mqtt5WillTest, s_TestMqtt5WillTest)

/*
* Shared Subscription test
*/
static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, void *)
{
fprintf(stderr, "starting s_TestMqtt5SharedSubscriptionTest ===========\n");
Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_IOT_CORE);
if (!mqtt5TestVars)
{
printf("Environment Variables are not set for the test, skip the test");
return AWS_OP_SKIP;
}

ApiHandle apiHandle(allocator);

String currentUUID = Aws::Crt::UUID().ToString();

const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID;
const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID;

const int MESSAGE_NUMBER = 10;
int client1_messages = 0;
int client2_messages = 0;

std::vector<int> receivedMessages;
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
receivedMessages.push_back(0);
}

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(subscribe_builder);

std::promise<void> client1_received;
auto onMessage_client1 = [&](const PublishReceivedEventData &eventData) -> int {
String topic = eventData.publishPacket->getTopic();
fprintf(stderr, "========= packet 1 received %s\n", topic.c_str());
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
++receivedMessages[message_int];
client1_messages++;
if (client1_messages == 5)
{
fprintf(stderr, "client 1 future set ======\n");
client1_received.set_value();
}
}
return 0;
};
subscribe_builder->WithPublishReceivedCallback(onMessage_client1);

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(subscribe_builder2);

std::promise<void> client2_received;
auto onMessage_client2 = [&](const PublishReceivedEventData &eventData) -> int {
String topic = eventData.publishPacket->getTopic();
fprintf(stderr, "========= packet 2 received %s\n", topic.c_str());
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
++receivedMessages[message_int];
client2_messages++;
if (client2_messages == 5)
{
fprintf(stderr, " client 2 future set=======\n");
client2_received.set_value();
}
}
return 0;
};
subscribe_builder2->WithPublishReceivedCallback(onMessage_client2);

Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(publish_builder);

std::promise<bool> connectionPromise;
std::promise<void> stoppedPromise;

std::promise<bool> connectionPromise2;
std::promise<void> stoppedPromise2;

std::promise<bool> connectionPromise3;
std::promise<void> stoppedPromise3;

std::shared_ptr<Aws::Crt::Mqtt5::ConnectPacket> packetConnect = std::make_shared<Aws::Crt::Mqtt5::ConnectPacket>();

/* first subscriber */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
subscribe_builder->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(subscribe_builder, connectionPromise, stoppedPromise, "Subscriber 1");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client = subscribe_builder->Build();
ASSERT_TRUE(mqtt5Client);
ASSERT_TRUE(mqtt5Client->Start());

/* second subscriber */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
subscribe_builder2->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(subscribe_builder2, connectionPromise2, stoppedPromise2, "Subscriber 2");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client2 = subscribe_builder2->Build();
ASSERT_TRUE(mqtt5Client2);
ASSERT_TRUE(mqtt5Client2->Start());

/* publisher */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
publish_builder->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(publish_builder, connectionPromise3, stoppedPromise3, "Publisher");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Publisher = publish_builder->Build();
ASSERT_TRUE(mqtt5Publisher);
ASSERT_TRUE(mqtt5Publisher->Start());

/* Wait for all clents to connect */
fprintf(stderr, "waiting for connections =========\n");
ASSERT_TRUE(connectionPromise.get_future().get());
ASSERT_TRUE(connectionPromise2.get_future().get());
ASSERT_TRUE(connectionPromise3.get_future().get());
fprintf(stderr, "all connections started =========\n");

connectionPromise = std::promise<bool>();
connectionPromise2 = std::promise<bool>();
connectionPromise3 = std::promise<bool>();

/* Subscribe to test topic */
Mqtt5::Subscription subscription(sharedTopicFilter, Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(allocator);
subscribe->WithSubscription(std::move(subscription));

/* Subscribe to test topic */
Mqtt5::Subscription subscription2(sharedTopicFilter, Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe2 = std::make_shared<Mqtt5::SubscribePacket>(allocator);
subscribe2->WithSubscription(std::move(subscription2));

std::promise<void> suback;
auto onSubAck = [&](int, std::shared_ptr<SubAckPacket>) { suback.set_value(); };

/* subscribe first client */
ASSERT_TRUE(mqtt5Client->Subscribe(subscribe, onSubAck));
suback.get_future().wait();

suback = std::promise<void>();

/* subscribe second client */
ASSERT_TRUE(mqtt5Client2->Subscribe(subscribe2, onSubAck));
suback.get_future().wait();

suback = std::promise<void>();

/* Publish message 10 to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
std::string payload = std::to_string(i);
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
TEST_TOPIC, ByteCursorFromCString(payload.c_str()), Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
ASSERT_TRUE(mqtt5Publisher->Publish(publish));
}

fprintf(stderr, "all packets sent =========\n");
client1_received.get_future().wait();
client2_received.get_future().wait();
fprintf(stderr, "all packets received =========\n");

Vector<String> unsubList;
unsubList.clear();
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 = std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client1->WithTopicFilters(unsubList);
ASSERT_FALSE(mqtt5Client->Unsubscribe(unsubscribe_client1));

std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client2 = std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client2->WithTopicFilters(unsubList);
ASSERT_FALSE(mqtt5Client->Unsubscribe(unsubscribe_client2));

client1_received = std::promise<void>();
client2_received = std::promise<void>();

/* makes sure messages are distrubuted evenly between the two clients*/
ASSERT_INT_EQUALS(5, client1_messages);
ASSERT_INT_EQUALS(5, client2_messages);

/* make sure all messages are received */
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
ASSERT_TRUE(receivedMessages[i] > 0);
}
fprintf(stderr, "all packets checked =========\n");

/* Stop all clients */
ASSERT_TRUE(mqtt5Client->Stop());
ASSERT_TRUE(mqtt5Client2->Stop());
ASSERT_TRUE(mqtt5Publisher->Stop());

/* Wait for all clents to disconnect */
//stoppedPromise.get_future().get();
//stoppedPromise2.get_future().get();
//stoppedPromise3.get_future().get();
fprintf(stderr, "all connections stopped =========\n");

delete subscribe_builder;
delete subscribe_builder2;
delete publish_builder;

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(Mqtt5SharedSubscriptionTest, s_TestMqtt5SharedSubscriptionTest)

//////////////////////////////////////////////////////////
// Error Operation Tests [ErrorOp-UC]
//////////////////////////////////////////////////////////
Expand Down
Loading