From 7ae96fd2746dc89e79d2eba5f65f23959b99ea4a Mon Sep 17 00:00:00 2001 From: Stephane Triomphe Date: Wed, 8 Jan 2025 09:43:16 +0100 Subject: [PATCH] Handle fragment only in dispatch task remove garbage collect from polling task call periodically garbage collect garbage collect still called on frag received for compatibility --- example/linux/gw-example/main.c | 1 + lib/platform/linux/platform.c | 16 ++++-- lib/wpc/dsap.c | 22 ++------ lib/wpc/include/dsap.h | 2 +- lib/wpc/include/reassembly.h | 18 +++++-- lib/wpc/reassembly/reassembly.c | 89 ++++++++++++++++++++++----------- 6 files changed, 89 insertions(+), 59 deletions(-) diff --git a/example/linux/gw-example/main.c b/example/linux/gw-example/main.c index 5b72533..9340714 100644 --- a/example/linux/gw-example/main.c +++ b/example/linux/gw-example/main.c @@ -572,6 +572,7 @@ int main(int argc, char * argv[]) { LOGE("MQTT failed to disconnect\n"); } + LOGI("MQTT disconnected\n"); MQTTClient_destroy(&m_client); pthread_mutex_destroy(&m_pub_queue_mutex); LOGI("Clean exit completed\n"); diff --git a/lib/platform/linux/platform.c b/lib/platform/linux/platform.c index 8d3109d..6a8f6fb 100644 --- a/lib/platform/linux/platform.c +++ b/lib/platform/linux/platform.c @@ -24,6 +24,9 @@ // Polling interval to check for indication #define POLLING_INTERVAL_MS 20 +// Wakeup timeout for dispatch thread, mainly for garbage collection of fragments +#define DISPATCH_WAKEUP_TIMEOUT_MS 5000 + // Mutex for sending, ie serial access static pthread_mutex_t sending_mutex; @@ -94,13 +97,20 @@ static pthread_cond_t m_queue_not_empty_cond = PTHREAD_COND_INITIALIZER; */ static void * dispatch_indication(void * unused) { + struct timespec ts; + pthread_mutex_lock(&m_queue_mutex); while (m_dispatch_thread_running) { if (m_queue_empty) { // Queue is empty, wait - pthread_cond_wait(&m_queue_not_empty_cond, &m_queue_mutex); + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += DISPATCH_WAKEUP_TIMEOUT_MS ; // 5 second timeout + pthread_cond_timedwait(&m_queue_not_empty_cond, &m_queue_mutex, &ts); + + // Force a garbage collect call in case we don't receive any indication + reassembly_garbage_collect(); // Check if we wake up but nothing in queue if (m_queue_empty) @@ -200,10 +210,6 @@ static void * poll_for_indication(void * unused) m_polling_thread_state_request = POLLING_THREAD_STOP; break; } - else - { - reassembly_garbage_collect(FRAGMENT_MAX_DURATION_S); - } } // Get the number of free buffers in the indication queue diff --git a/lib/wpc/dsap.c b/lib/wpc/dsap.c index 712cbbc..928a991 100644 --- a/lib/wpc/dsap.c +++ b/lib/wpc/dsap.c @@ -28,16 +28,6 @@ typedef struct bool busy; } packet_with_indication_t; -// Minimum period between two consecutive garbage collects of -// uncomplete fragments -// GC is anyway synchronous with received fragment so period between 2 GC -// could be much bigger if not fragments are received for a while -#define MIN_GARBAGE_COLLECT_PERIOD_S 5 - -// Max timeout in seconds for uncomplete fragmented packet to be discarded -// from rx queue. -static uint32_t m_fragment_max_duration_s = 0; - // Static buffer used to reassemble messages. Allocated statically // to not have it allocated on stack dynamically. Could also be allocated // dynamically with platform malloc, but as there is only one needed, static @@ -318,7 +308,6 @@ void dsap_data_tx_indication_handler(dsap_data_tx_ind_pl_t * payload) void dsap_data_rx_frag_indication_handler(dsap_data_rx_frag_ind_pl_t * payload, unsigned long long timestamp_ms_epoch) { - static unsigned long long last_gc_ts_ms = 0; reassembly_fragment_t frag; size_t full_size; app_qos_e qos; @@ -394,13 +383,8 @@ void dsap_data_rx_frag_indication_handler(dsap_data_rx_frag_ind_pl_t * payload, // Do GC synchronously to avoid races as all fragment related actions happens on same thread // and no need for an another scheduling method to add in Platform - if (m_fragment_max_duration_s > 0 && - Platform_get_timestamp_ms_monotonic() - last_gc_ts_ms > (MIN_GARBAGE_COLLECT_PERIOD_S * 1000)) - { - // Time for a new GC - reassembly_garbage_collect(m_fragment_max_duration_s); - last_gc_ts_ms = Platform_get_timestamp_ms_monotonic(); - } + reassembly_garbage_collect(); + } void dsap_data_rx_indication_handler(dsap_data_rx_ind_pl_t * payload, @@ -500,7 +484,7 @@ bool dsap_unregister_for_data() bool dsap_set_max_fragment_duration(unsigned int fragment_max_duration_s) { - m_fragment_max_duration_s = fragment_max_duration_s; + reassembly_set_max_fragment_duration(fragment_max_duration_s); return true; } diff --git a/lib/wpc/include/dsap.h b/lib/wpc/include/dsap.h index 6248b29..eabd494 100644 --- a/lib/wpc/include/dsap.h +++ b/lib/wpc/include/dsap.h @@ -203,7 +203,7 @@ bool dsap_unregister_for_data(); #endif /** - * \brief Set maximum duration to keep fragment in our buffer until packet is ful + * \brief Set maximum duration to keep fragment in our buffer until packet is full * \param fragment_max_duration_s * Maximum time in s to keep fragments from incomplete packets inside our buffers */ diff --git a/lib/wpc/include/reassembly.h b/lib/wpc/include/reassembly.h index 245cfb7..ac6de65 100644 --- a/lib/wpc/include/reassembly.h +++ b/lib/wpc/include/reassembly.h @@ -22,6 +22,15 @@ typedef struct { unsigned long long timestamp; //< When was the fragment received } reassembly_fragment_t; +/** + * \brief Set maximum duration for fragment + * \param duration_s + * the maximum duration in seconds to keep fragment from incomplete packets. + * Zero equals forever + * \return Return code of the operation + */ +void reassembly_set_max_fragment_duration(unsigned int duration_s); + /** * \brief Initialize reassembly module */ @@ -29,9 +38,10 @@ void reassembly_init(); /** * \brief Check queue emptyness - * \return true if empty, false otherwise + * \return True means that queue is empty, false that queue is most probably not empty + * \note This function can be called from any task */ -bool reassembly_is_queue_empty(void); +bool reassembly_is_queue_empty(); /** * \brief Add fragment to an existing full message @@ -65,9 +75,7 @@ bool reassembly_get_full_message(uint32_t src_addr, uint16_t packet_id, uint8_t /** * \brief Clear all the uncomplete fragmented message that have no activity for \ref timeout_s * - * \param timeout_s - * Limit in second for inactivity before message being deleted */ -void reassembly_garbage_collect(uint32_t timeout_s); +void reassembly_garbage_collect(); #endif //REASSEMBLY_H__ diff --git a/lib/wpc/reassembly/reassembly.c b/lib/wpc/reassembly/reassembly.c index c4c2e0a..97ac15b 100644 --- a/lib/wpc/reassembly/reassembly.c +++ b/lib/wpc/reassembly/reassembly.c @@ -14,6 +14,9 @@ #include "platform.h" +// Minimum period between two consecutive garbage collects of uncomplete fragments +#define MIN_GARBAGE_COLLECT_PERIOD_S 5 + /* undefine the defaults */ #undef uthash_malloc #undef uthash_free @@ -52,11 +55,23 @@ typedef struct UT_hash_handle hh; } full_packet_t; -/** - * Hash containing all the fragmented packet under construction - */ +// Max timeout in seconds for uncomplete fragmented packet to be discarded +// from rx queue. +static uint32_t m_fragment_max_duration_s = 0; + +// Hash containing all the fragmented packet under construction static full_packet_t * m_packets = NULL; +// Keep track of the queue emptyness, to get info from other tasks +// True indicate that queue is empty, false indicate that queue is most probably not empty +static bool m_is_queue_empty; + + +void reassembly_set_max_fragment_duration(unsigned int fragment_max_duration_s) +{ + m_fragment_max_duration_s = fragment_max_duration_s; +} + static full_packet_t * get_packet_from_hash(uint32_t src_add, uint16_t packet_id) { full_packet_t * p; @@ -221,6 +236,8 @@ static bool reassemble_full_packet(full_packet_t * full_packet_p, uint8_t * buff // release also full packet struct form hash HASH_DEL(m_packets, full_packet_p); Platform_free(full_packet_p, sizeof(full_packet_t)); + + m_is_queue_empty = (HASH_COUNT(m_packets) == 0); return true; } @@ -229,9 +246,9 @@ void reassembly_init() // Nothing to do at the moment } -bool reassembly_is_queue_empty(void) +bool reassembly_is_queue_empty() { - return HASH_COUNT(m_packets) == 0; + return m_is_queue_empty; } bool reassembly_add_fragment(reassembly_fragment_t * frag, size_t * full_size_p) @@ -239,6 +256,9 @@ bool reassembly_add_fragment(reassembly_fragment_t * frag, size_t * full_size_p) full_packet_t *full_packet_p; *full_size_p = 0; + + // set the queue empty flag in advance, even if we fail later (corrected by garbage collection) + m_is_queue_empty = false; // Get packet or create it full_packet_p = get_packet_from_hash(frag->src_add, frag->packet_id); @@ -288,32 +308,43 @@ bool reassembly_get_full_message(uint32_t src_add, uint16_t packet_id, uint8_t * } -void reassembly_garbage_collect(uint32_t timeout_s) +void reassembly_garbage_collect() { - full_packet_t *fp, *tmp; - uint32_t messages_removed = 0; - HASH_ITER(hh, m_packets, fp, tmp) { - uint32_t last_activity = - (Platform_get_timestamp_ms_monotonic() - fp->timestamp_ms_epoch_last) / 1000; - - /* Check if message is not getting too old */ - if (last_activity > timeout_s) - { - LOGW("Fragmented message from src %u with id %u has no activity for more than %u s => delete it\n", - fp->key.src_add, fp->key.packet_id, timeout_s); - - internal_fragment_t *f, *tmp; - LL_FOREACH_SAFE(fp->head, f, tmp) { - Platform_free(f->bytes, f->size); - LL_DELETE(fp->head, f); - Platform_free(f, sizeof(internal_fragment_t)); - } + static unsigned long long last_gc_ts_ms = 0; - // release also full packet struct from hash - HASH_DEL(m_packets, fp); - Platform_free(fp, sizeof(full_packet_t)); - messages_removed ++; + if (m_fragment_max_duration_s > 0 && + Platform_get_timestamp_ms_monotonic() - last_gc_ts_ms > (MIN_GARBAGE_COLLECT_PERIOD_S * 1000)) + { + // Time for a new GC + last_gc_ts_ms = Platform_get_timestamp_ms_monotonic(); + + full_packet_t *fp, *tmp; + uint32_t messages_removed = 0; + HASH_ITER(hh, m_packets, fp, tmp) { + uint32_t last_activity = + (Platform_get_timestamp_ms_monotonic() - fp->timestamp_ms_epoch_last) / 1000; + + /* Check if message is not getting too old */ + if (last_activity > m_fragment_max_duration_s) + { + LOGW("Fragmented message from src %u with id %u has no activity for more than %u s => delete it\n", + fp->key.src_add, fp->key.packet_id, m_fragment_max_duration_s); + + internal_fragment_t *f, *tmp; + LL_FOREACH_SAFE(fp->head, f, tmp) { + Platform_free(f->bytes, f->size); + LL_DELETE(fp->head, f); + Platform_free(f, sizeof(internal_fragment_t)); + } + + // release also full packet struct from hash + HASH_DEL(m_packets, fp); + Platform_free(fp, sizeof(full_packet_t)); + messages_removed ++; + } } + LOGD("GC: %d message removed\n", messages_removed); + + m_is_queue_empty = (HASH_COUNT(m_packets) == 0); } - LOGD("GC: %d message removed\n", messages_removed); }