Skip to content

Commit

Permalink
Check that all are reassembled before closing (#128)
Browse files Browse the repository at this point in the history
* Check that all fragments are reassembled before closing
change WPC close order, add a check to fragment list.
Adapt gw-example to do clean exit
* Handle fragment only in dispatch task
remove garbage collect from polling task, call periodically garbage collect from dispatch
garbage collect still called on frag received for compatibility
  • Loading branch information
StephaneTriomphe authored Jan 10, 2025
1 parent 7fddd04 commit a40af2f
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 79 deletions.
46 changes: 38 additions & 8 deletions example/linux/gw-example/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <getopt.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand Down Expand Up @@ -43,6 +44,10 @@ static pthread_t m_thread_publish;
static pthread_mutex_t m_pub_queue_mutex;
static pthread_cond_t m_pub_queue_not_empty_cond = PTHREAD_COND_INITIALIZER;

static char topic_all_requests[16 + sizeof(m_gateway_id)]; //"gw-request/+/<gateway_id/#"

static volatile bool running = true;

// Statically allocated but could be mallocated
typedef struct
{
Expand All @@ -69,6 +74,11 @@ static bool m_pub_queue_empty = true;

static MQTTClient m_client = NULL;

static void signal_handler(int signum)
{
running = false;
}

static bool MQTT_publish(char * topic, uint8_t * payload, size_t payload_size, bool retained)
{
message_to_publish_t * message_p;
Expand Down Expand Up @@ -242,7 +252,6 @@ static bool reconnect(uint32_t timeout_s)
int rc;
size_t proto_size;
char topic_status[sizeof(TOPIC_EVENT_PREFIX) + sizeof(m_gateway_id) + 1];
char topic_all_requests[16 + sizeof(m_gateway_id)]; //"gw-request/+/<gateway_id/#"
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_willOptions will_options = MQTTClient_willOptions_initializer;
MQTTClient_SSLOptions ssl_options = MQTTClient_SSLOptions_initializer;
Expand Down Expand Up @@ -289,11 +298,6 @@ static bool reconnect(uint32_t timeout_s)
TOPIC_EVENT_PREFIX,
m_gateway_id);

snprintf(topic_all_requests,
sizeof(topic_all_requests),
"gw-request/+/%s/#",
m_gateway_id);

while (timeout_s > 0)
{
if ((rc = MQTTClient_connect(m_client, &conn_opts)) == MQTTCLIENT_SUCCESS)
Expand Down Expand Up @@ -331,6 +335,17 @@ static bool reconnect(uint32_t timeout_s)
return true;
}

static bool mqtt_unsubscribe_topics(void)
{
if (MQTTClient_unsubscribe(m_client, topic_all_requests) != MQTTCLIENT_SUCCESS)
{
LOGE("Failed to unsubscribe from topic %s\n", topic_all_requests);
return false;
}
LOGI("Successfully unsubscribed from topic %s\n", topic_all_requests);
return true;
}

static void on_mqtt_connection_lost(void *context, char *cause)
{
LOGE("Connection lost\n");
Expand All @@ -345,6 +360,8 @@ static bool MQTT_connect(uint32_t timeout_s,

MQTTClient_init_options global_init_options = MQTTClient_init_options_initializer;
global_init_options.do_openssl_init = true;

snprintf(topic_all_requests, sizeof(topic_all_requests), "gw-request/+/%s/#", m_gateway_id);

MQTTClient_global_init(&global_init_options);

Expand Down Expand Up @@ -434,6 +451,9 @@ int main(int argc, char * argv[])
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);

signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);

// Parse the arguments
static struct option long_options[]
= { { "baudrate", required_argument, 0, 'b' },
Expand Down Expand Up @@ -540,10 +560,20 @@ int main(int argc, char * argv[])

LOGI("Starting gw with id %s on host %s\n", gateway_id, mqtt_host);

for (;;)
while (running)
{
sleep(2);
}

LOGE("End of program\n");
LOGI("Clean exit requested\n");
mqtt_unsubscribe_topics();
WPC_Proto_close();
if (MQTTClient_disconnect(m_client, 10000) != MQTTCLIENT_SUCCESS)
{
LOGE("MQTT failed to disconnect\n");
}
LOGI("MQTT disconnected\n");
MQTTClient_destroy(&m_client);
pthread_mutex_destroy(&m_pub_queue_mutex);
LOGI("Clean exit completed\n");
}
7 changes: 6 additions & 1 deletion lib/api/wpc_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@
#include <stddef.h>
#include <stdint.h>

/**
* Max default delay to keep incomplete fragmented packet inside our buffers
*/
#define FRAGMENT_MAX_DURATION_S 45

/**
* Max possible overhead estimation for wp_GenericMessage
* compared to specific single message. Should be added to
* size of single message to estimate the max sized occupied
* by the full proto encoded message */
* by the full proto encoded message
*/
#define WPC_PROTO_GENERIC_MESSAGE_OVERHEAD 20

/*
Expand Down
91 changes: 74 additions & 17 deletions lib/platform/linux/platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* See file LICENSE for full license details.
*
*/
#include <errno.h>
#include <stdbool.h>
#include <stdlib.h>
#include <pthread.h>
Expand All @@ -14,21 +15,32 @@
#define MAX_LOG_LEVEL INFO_LOG_LEVEL
#include "logger.h"
#include "platform.h"
#include "reassembly.h"
#include "wpc_proto.h"

// Maximum number of indication to be retrieved from a single poll
#define MAX_NUMBER_INDICATION 30

// Polling interval to check for indication
#define POLLING_INTERVAL_MS 20

// Wakeup timeout for dispatch thread, mainly for garbage collection of fragments
#define DISPATCH_WAKEUP_TIMEOUT_MS 5000

// Mutex for sending, ie serial access
static pthread_mutex_t sending_mutex;

// This thread is used to poll for indication
static pthread_t thread_polling;

// Set to false to stop polling thread execution
static bool m_polling_thread_running;
typedef enum {
POLLING_THREAD_RUN,
POLLING_THREAD_STOP,
POLLING_THREAD_STOP_REQUESTED
} polling_thread_state_t;

// Request to handle polling thread state
static polling_thread_state_t m_polling_thread_state_request = POLLING_THREAD_STOP;

// This thread is used to dispatch indication
static pthread_t thread_dispatch;
Expand Down Expand Up @@ -85,13 +97,20 @@ static pthread_cond_t m_queue_not_empty_cond = PTHREAD_COND_INITIALIZER;
*/
static void * dispatch_indication(void * unused)
{
struct timespec ts;

pthread_mutex_lock(&m_queue_mutex);
while (m_dispatch_thread_running)
{
if (m_queue_empty)
{
// Queue is empty, wait
pthread_cond_wait(&m_queue_not_empty_cond, &m_queue_mutex);
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += DISPATCH_WAKEUP_TIMEOUT_MS ; // 5 second timeout
pthread_cond_timedwait(&m_queue_not_empty_cond, &m_queue_mutex, &ts);

// Force a garbage collect (to be sure it's called even if no frag are received)
reassembly_garbage_collect();

// Check if we wake up but nothing in queue
if (m_queue_empty)
Expand Down Expand Up @@ -170,10 +189,29 @@ static void * poll_for_indication(void * unused)
// Initially wait for 500ms before any polling
uint32_t wait_before_next_polling_ms = 500;

while (m_polling_thread_running)
m_polling_thread_state_request = POLLING_THREAD_RUN;

while (m_polling_thread_state_request != POLLING_THREAD_STOP)
{
usleep(wait_before_next_polling_ms * 1000);

if(m_polling_thread_state_request == POLLING_THREAD_STOP_REQUESTED)
{
if (!m_queue_empty)
{
// Dispatch did not process all indications. Just wait for it to complete.
wait_before_next_polling_ms = POLLING_INTERVAL_MS;
continue;
}

if (reassembly_is_queue_empty())
{
LOGI("Reassembly queue is empty, exiting polling thread\n");
m_polling_thread_state_request = POLLING_THREAD_STOP;
break;
}
}

// Get the number of free buffers in the indication queue
// Note: No need to lock the queue as only m_ind_queue_read can be updated
// and could still be modified when we release the lock after computing
Expand Down Expand Up @@ -207,16 +245,26 @@ static void * poll_for_indication(void * unused)
free_buffer_room = m_ind_queue_read - m_ind_queue_write;
}
}

max_num_indication = free_buffer_room > MAX_NUMBER_INDICATION ?
MAX_NUMBER_INDICATION :
free_buffer_room;

if (m_polling_thread_state_request == POLLING_THREAD_STOP_REQUESTED)
{
// In case we are about to stop, let's poll only one by one to have more chance to
// finish uncomplete fragmented packet and not start to receive a new one
max_num_indication = 1;
LOGD("Poll for one more fragment to empty reassembly queue\n");
}
else
{
// Let's read max indications that can fit in the queue
max_num_indication = MIN(MAX_NUMBER_INDICATION, free_buffer_room);
}

LOGD("Poll for %d indications\n", max_num_indication);

get_ind_res = m_get_indication_f(max_num_indication, onIndicationReceivedLocked);

if (get_ind_res == 1)
if ((get_ind_res == 1)
&& (m_polling_thread_state_request != POLLING_THREAD_STOP_REQUESTED))
{
// Still pending indication, only wait 1 ms to give a chance
// to other threads but not more to have better throughput
Expand All @@ -226,6 +274,7 @@ static void * poll_for_indication(void * unused)
{
// In case of error or if no more indication, just wait
// the POLLING INTERVAL to avoid polling all the time
// In case of stop request, wait for to give time to push data received
wait_before_next_polling_ms = POLLING_INTERVAL_MS;
}
}
Expand All @@ -242,7 +291,14 @@ bool Platform_lock_request()
{
// It must never happen but add a check and
// return to avoid a deadlock
LOGE("Mutex already locked %d\n", res);
if (res == EINVAL)
{
LOGW("Mutex no longer exists (destroyed)\n");
}
else
{
LOGE("Mutex lock failed %d\n", res);
}
return false;
}
return true;
Expand Down Expand Up @@ -320,7 +376,6 @@ bool Platform_init(Platform_get_indication_f get_indication_f,
goto error2;
}

m_polling_thread_running = true;
// Start a thread to poll for indication
if (pthread_create(&thread_polling, NULL, poll_for_indication, NULL) != 0)
{
Expand Down Expand Up @@ -352,21 +407,23 @@ void Platform_close()
{
void * res;
pthread_t cur_thread = pthread_self();
// Signal our dispatch thread to stop
m_dispatch_thread_running = false;
// Signal condition to wakeup thread
pthread_cond_signal(&m_queue_not_empty_cond);

// Signal our polling thread to stop
// No need to signal it as it will wakeup periodically
m_polling_thread_running = false;
m_polling_thread_state_request = POLLING_THREAD_STOP_REQUESTED;

// Wait for both tread to finish
// Wait for polling tread to finish
if (cur_thread != thread_polling)
{
pthread_join(thread_polling, &res);
}

// Signal our dispatch thread to stop
m_dispatch_thread_running = false;
// Signal condition to wakeup thread
pthread_cond_signal(&m_queue_not_empty_cond);

// Wait for dispatch tread to finish
if (cur_thread != thread_dispatch)
{
pthread_join(thread_dispatch, &res);
Expand Down
22 changes: 3 additions & 19 deletions lib/wpc/dsap.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ typedef struct
bool busy;
} packet_with_indication_t;

// Minimum period between two consecutive garbage collects of
// uncomplete fragments
// GC is anyway synchronous with received fragment so period between 2 GC
// could be much bigger if not fragments are received for a while
#define MIN_GARBAGE_COLLECT_PERIOD_S 5

// Max timeout in seconds for uncomplete fragmented packet to be discarded
// from rx queue.
static uint32_t m_fragment_max_duration_s = 0;

// Static buffer used to reassemble messages. Allocated statically
// to not have it allocated on stack dynamically. Could also be allocated
// dynamically with platform malloc, but as there is only one needed, static
Expand Down Expand Up @@ -318,7 +308,6 @@ void dsap_data_tx_indication_handler(dsap_data_tx_ind_pl_t * payload)
void dsap_data_rx_frag_indication_handler(dsap_data_rx_frag_ind_pl_t * payload,
unsigned long long timestamp_ms_epoch)
{
static unsigned long long last_gc_ts_ms = 0;
reassembly_fragment_t frag;
size_t full_size;
app_qos_e qos;
Expand Down Expand Up @@ -394,13 +383,8 @@ void dsap_data_rx_frag_indication_handler(dsap_data_rx_frag_ind_pl_t * payload,

// Do GC synchronously to avoid races as all fragment related actions happens on same thread
// and no need for an another scheduling method to add in Platform
if (m_fragment_max_duration_s > 0 &&
Platform_get_timestamp_ms_monotonic() - last_gc_ts_ms > (MIN_GARBAGE_COLLECT_PERIOD_S * 1000))
{
// Time for a new GC
reassembly_garbage_collect(m_fragment_max_duration_s);
last_gc_ts_ms = Platform_get_timestamp_ms_monotonic();
}
reassembly_garbage_collect();

}

void dsap_data_rx_indication_handler(dsap_data_rx_ind_pl_t * payload,
Expand Down Expand Up @@ -500,7 +484,7 @@ bool dsap_unregister_for_data()

bool dsap_set_max_fragment_duration(unsigned int fragment_max_duration_s)
{
m_fragment_max_duration_s = fragment_max_duration_s;
reassembly_set_max_fragment_duration(fragment_max_duration_s);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/wpc/include/dsap.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ bool dsap_unregister_for_data();
#endif

/**
* \brief Set maximum duration to keep fragment in our buffer until packet is ful
* \brief Set maximum duration to keep fragment in our buffer until packet is full
* \param fragment_max_duration_s
* Maximum time in s to keep fragments from incomplete packets inside our buffers
*/
Expand Down
Loading

0 comments on commit a40af2f

Please sign in to comment.