Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g committed Dec 20, 2023
1 parent 9f0a6e7 commit 1df5c49
Showing 1 changed file with 13 additions and 27 deletions.
40 changes: 13 additions & 27 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,6 @@ AWS_TEST_CASE(Mqtt5WillTest, s_TestMqtt5WillTest)
*/
static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, void *)
{
fprintf(stderr, "starting s_TestMqtt5SharedSubscriptionTest ===========\n");
Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_IOT_CORE);
if (!mqtt5TestVars)
{
Expand Down Expand Up @@ -1981,7 +1980,6 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
std::promise<void> client_received;
auto onMessage_client1 = [&](const PublishReceivedEventData &eventData) -> int {
String topic = eventData.publishPacket->getTopic();
fprintf(stderr, "========= packet 1 received %s\n", topic.c_str());
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
Expand All @@ -1997,9 +1995,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
client_messages++;
exchanged = client_messages.compare_exchange_strong(tested, desired);
if (exchanged == true)
// if (client_messages == 10)
{
fprintf(stderr, "client 1 future set ======\n");
client_received.set_value();
}
}
Expand All @@ -2017,7 +2013,6 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi

auto onMessage_client2 = [&](const PublishReceivedEventData &eventData) -> int {
String topic = eventData.publishPacket->getTopic();
fprintf(stderr, "========= packet 2 received %s\n", topic.c_str());
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
Expand All @@ -2033,9 +2028,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
client_messages++;
exchanged = client_messages.compare_exchange_strong(tested, desired);
if (exchanged == true)
// if (client_messages == 10)
{
fprintf(stderr, " client 2 future set=======\n");
client_received.set_value();
}
}
Expand All @@ -2062,39 +2055,38 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
std::shared_ptr<Aws::Crt::Mqtt5::ConnectPacket> packetConnect = std::make_shared<Aws::Crt::Mqtt5::ConnectPacket>();

/* first subscriber */
/* set a different client id so we are not disconnected from the server */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
subscribe_builder->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(subscribe_builder, connectionPromise, stoppedPromise, "Subscriber 1");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client = subscribe_builder->Build();
ASSERT_TRUE(mqtt5Client);
ASSERT_TRUE(mqtt5Client->Start());

/* second subscriber */
/* set a different client id so we are not disconnected from the server */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
subscribe_builder2->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(subscribe_builder2, connectionPromise2, stoppedPromise2, "Subscriber 2");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client2 = subscribe_builder2->Build();
ASSERT_TRUE(mqtt5Client2);
ASSERT_TRUE(mqtt5Client2->Start());

/* publisher */
/* set a different client id so we are not disconnected from the server */
packetConnect->WithClientId("s_TestMqtt5SharedSubscriptionTest" + Aws::Crt::UUID().ToString());
publish_builder->WithConnectOptions(packetConnect);
s_setupConnectionLifeCycle(publish_builder, connectionPromise3, stoppedPromise3, "Publisher");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Publisher = publish_builder->Build();
ASSERT_TRUE(mqtt5Publisher);

/* Connect all clients */
ASSERT_TRUE(mqtt5Client->Start());
ASSERT_TRUE(mqtt5Client2->Start());
ASSERT_TRUE(mqtt5Publisher->Start());

/* Wait for all clents to connect */
fprintf(stderr, "waiting for connections =========\n");
ASSERT_TRUE(connectionPromise.get_future().get());
ASSERT_TRUE(connectionPromise2.get_future().get());
ASSERT_TRUE(connectionPromise3.get_future().get());
fprintf(stderr, "all connections started =========\n");

connectionPromise = std::promise<bool>();
connectionPromise2 = std::promise<bool>();
connectionPromise3 = std::promise<bool>();

/* Subscribe to test topic */
Mqtt5::Subscription subscription(sharedTopicFilter, Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
Expand All @@ -2119,8 +2111,6 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
ASSERT_TRUE(mqtt5Client2->Subscribe(subscribe2, onSubAck));
suback.get_future().wait();

suback = std::promise<void>();

/* Publish message 10 to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
Expand All @@ -2130,11 +2120,10 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
ASSERT_TRUE(mqtt5Publisher->Publish(publish));
}

fprintf(stderr, "all packets sent =========\n");
/* Wait for all packets to be received on both clients */
client_received.get_future().wait();
// client2_received.get_future().wait();
fprintf(stderr, "all packets received =========\n");

/* Unsubscribe from the topic from both clients*/
Vector<String> unsubList;
unsubList.push_back(TEST_TOPIC);
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 =
Expand All @@ -2147,20 +2136,18 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
unsubscribe_client2->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client2));

client_received = std::promise<void>();
/* make sure all messages are received */
ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */

/* makes sure messages are distrubuted evenly between the two clients*/
ASSERT_INT_EQUALS(11, client_messages); /* We are adding one at the end, so 10 messages received */
/* makes sure both clients received at least one message */
ASSERT_TRUE(client1_received);
ASSERT_TRUE(client2_received);

/* make sure all messages are received */
/* make sure all messages are received with no duplicates*/
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
ASSERT_TRUE(receivedMessages[i] > 0);
}
fprintf(stderr, "all packets checked =========\n");

/* Stop all clients */
ASSERT_TRUE(mqtt5Client->Stop());
ASSERT_TRUE(mqtt5Client2->Stop());
Expand All @@ -2170,7 +2157,6 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
stoppedPromise.get_future().get();
stoppedPromise2.get_future().get();
stoppedPromise3.get_future().get();
fprintf(stderr, "all connections stopped =========\n");

delete subscribe_builder;
delete subscribe_builder2;
Expand Down

0 comments on commit 1df5c49

Please sign in to comment.