From a989d5f6a6088b141cdaefb1fe2fd0a6ea45f44a Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Tue, 15 Oct 2024 18:03:35 +0300 Subject: [PATCH 01/18] [memory][event] Don't erase elements if multiple subsciptions use them --- sc-memory/sc-core/include/sc-core/sc_types.h | 4 +- .../src/sc-store/sc-event/sc_event_private.h | 4 +- .../src/sc-store/sc-event/sc_event_queue.c | 56 +++ .../src/sc-store/sc-event/sc_event_queue.h | 2 + .../src/sc-store/sc_event_subscription.c | 2 +- sc-memory/sc-core/src/sc-store/sc_storage.c | 366 ++++++++++-------- .../sc-core/src/sc_memory_context_manager.c | 2 +- .../sc-core/src/sc_memory_context_manager.h | 5 +- .../sc-memory/units/events/test_sc_event.cpp | 49 +++ 9 files changed, 328 insertions(+), 162 deletions(-) diff --git a/sc-memory/sc-core/include/sc-core/sc_types.h b/sc-memory/sc-core/include/sc-core/sc_types.h index d820bb7ee..4b6d0dca6 100644 --- a/sc-memory/sc-core/include/sc-core/sc_types.h +++ b/sc-memory/sc-core/include/sc-core/sc_types.h @@ -111,7 +111,7 @@ struct _sc_addr * and get them back from int */ # define SC_ADDR_LOCAL_TO_INT(addr) (sc_uint32)(((addr).seg << 16) | ((addr).offset & 0xffff)) -# define SC_ADDR_LOCAL_OFFSET_FROM_INT(v) (sc_uint16)((v) & 0x0000ffff) +# define SC_ADDR_LOCAL_OFFSET_FROM_INT(v) (sc_uint16)((v)&0x0000ffff) # define SC_ADDR_LOCAL_SEG_FROM_INT(v) SC_ADDR_LOCAL_OFFSET_FROM_INT(v >> 16) # define SC_ADDR_LOCAL_FROM_INT(hash, addr) \ addr.seg = SC_ADDR_LOCAL_SEG_FROM_INT(hash); \ @@ -231,7 +231,7 @@ typedef sc_uint16 sc_type; typedef sc_uint16 sc_states; # define SC_STATE_REQUEST_ERASURE 0x1 -# define SC_STATE_IS_ERASABLE 0x200 +# define SC_STATE_IS_UNDER_ERASURE 0x200 # define SC_STATE_ELEMENT_EXIST 0x2 // results diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h index 2f86ba349..f2c0e260e 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h @@ -50,7 +50,7 @@ sc_result sc_event_notify_element_deleted(sc_addr addr); * @param subscription_addr sc-addr of element that emitting event * @param event_type_addr Emitting event type * @param connector_addr A sc-address of added/removed sc-connector (just for specified events) - * @param edge_type A sc-type of added/removed sc-connector (just for specified events) + * @param connector_type A sc-type of added/removed sc-connector (just for specified events) * @param other_addr A sc-address of the second sc-element of sc-connector. If \p subscription_addr is a source, then \p * other_addr is a target. If \p subscription_addr is a target, then \p other_addr is a source. * @param callback A pointer function that is executed after the execution of a function that was called on the @@ -63,7 +63,7 @@ sc_result sc_event_emit( sc_addr subscription_addr, sc_event_type event_type_addr, sc_addr connector_addr, - sc_type edge_type, + sc_type connector_type, sc_addr other_addr, sc_event_do_after_callback callback, sc_addr event_addr); diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c index 9972ac462..600f55ee3 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c @@ -13,6 +13,7 @@ #include "sc-store/sc_storage_private.h" #include "sc_memory_private.h" #include "sc-core/sc_memory.h" +#include "sc-core/sc_keynodes.h" #include "sc-core/sc-base/sc_allocator.h" @@ -95,6 +96,31 @@ void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) sc_storage_end_new_process(); + if (SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_connector_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_incoming_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_outgoing_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) + { + sc_monitor_acquire_write(&queue->pool_monitor); + sc_uint32 * count = (sc_uint32 *)sc_hash_table_get( + queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(event->connector_addr))); + if (count != null_ptr) + { + --(*count); + if (*count == 0) + { + sc_hash_table_remove( + queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(event->connector_addr))); + sc_mem_free(count); + } + else + sc_hash_table_insert( + queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(event->connector_addr)), count); + } + sc_monitor_release_write(&queue->pool_monitor); + } + sc_monitor_release_read(&event_subscription->monitor); end: @@ -112,6 +138,16 @@ void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) } } +guint emitted_events_hash_func(gconstpointer pointer) +{ + return GPOINTER_TO_UINT(pointer); +} + +gboolean emitted_events_equal_func(gconstpointer a, gconstpointer b) +{ + return (a == b); +} + void sc_event_emission_manager_initialize(sc_event_emission_manager ** manager, sc_memory_params const * params) { *manager = sc_mem_new(sc_event_emission_manager, 1); @@ -134,6 +170,8 @@ void sc_event_emission_manager_initialize(sc_event_emission_manager ** manager, sc_monitor_init(&(*manager)->destroy_monitor); sc_monitor_init(&(*manager)->pool_monitor); + (*manager)->emitted_erase_events = + sc_hash_table_init(emitted_events_hash_func, emitted_events_equal_func, null_ptr, null_ptr); (*manager)->thread_pool = g_thread_pool_new( _sc_event_emission_pool_worker, *manager, @@ -185,6 +223,7 @@ void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) sc_monitor_destroy(&manager->pool_monitor); sc_monitor_destroy(&manager->destroy_monitor); + sc_hash_table_destroy(manager->emitted_erase_events); sc_mem_free(manager); } @@ -205,6 +244,23 @@ void _sc_event_emission_manager_add( _sc_event_new(event_subscription, user_addr, connector_addr, connector_type, other_addr, callback, event_addr); sc_monitor_acquire_write(&manager->pool_monitor); + if (SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_connector_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_incoming_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_outgoing_arc_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr) + || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) + { + sc_uint32 * count = (sc_uint32 *)sc_hash_table_get( + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr))); + if (count == null_ptr) + { + count = sc_mem_new(sc_uint32, 1); + *count = 0; + } + ++(*count); + sc_hash_table_insert(manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)), count); + } + g_thread_pool_push(manager->thread_pool, event, null_ptr); sc_monitor_release_write(&manager->pool_monitor); } diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h index a7ed9a4d3..11aee6d4a 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h @@ -32,6 +32,8 @@ typedef struct sc_monitor destroy_monitor; ///< Monitor for synchronizing access to the destruction process. GThreadPool * thread_pool; ///< Thread pool used for worker threads processing events. sc_monitor pool_monitor; ///< Monitor for synchronizing access to the thread pool. + sc_hash_table * emitted_erase_events; ///< Table that stores amount of active event subscriptions that were initiated + ///< due to erasure of sc-element which sc-addr is stored as a key } sc_event_emission_manager; /*! Function that initializes an sc-event emission manager. diff --git a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c index 75cbbfa92..d72392e0d 100644 --- a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c +++ b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c @@ -304,7 +304,7 @@ sc_result sc_event_emit( if (_sc_memory_context_are_events_pending(ctx)) { - _sc_memory_context_pend_event(ctx, event_type_addr, subscription_addr, connector_addr, connector_type, other_addr); + _sc_memory_context_pend_event(ctx, subscription_addr, event_type_addr, connector_addr, connector_type, other_addr); return SC_RESULT_OK; } diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 36c7594bd..5694e31ca 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -669,198 +669,256 @@ sc_result _sc_storage_element_erase(sc_addr addr) return result; } -sc_result sc_storage_element_erase(sc_memory_context const * ctx, sc_addr addr) +sc_result _sc_storage_element_erase_with_base_element( + sc_memory_context const * ctx, + sc_addr connector_chain_begin_addr, + sc_addr addr, + sc_hash_table * processed_connectors, + sc_list * connector_lists_required_for_erase_events, + sc_list * elements_that_can_be_erased) { sc_result result; - sc_element * el = null_ptr; + sc_monitor * monitor = sc_monitor_table_get_monitor_for_addr(&storage->addr_monitors_table, addr); + sc_monitor_acquire_write(monitor); result = sc_storage_get_element_by_addr(addr, &el); if (result != SC_RESULT_OK) - goto error; + { + sc_monitor_release_write(monitor); + return result; + } - sc_hash_table * cache_table = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); + sc_type const type = el->flags.type; + sc_addr const begin_addr = el->arc.begin; + sc_addr const end_addr = el->arc.end; - sc_queue iter_queue; - sc_queue_init(&iter_queue); - sc_pointer p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr)); - sc_queue_push(&iter_queue, p_addr); + sc_result erase_incoming_connector_result = SC_RESULT_NO; + sc_result erase_outgoing_connector_result = SC_RESULT_NO; + sc_result erase_incoming_arc_result = SC_RESULT_NO; + sc_result erase_outgoing_arc_result = SC_RESULT_NO; + sc_result erase_element_result = SC_RESULT_NO; + sc_bool there_are_active_erase_events_with_addr = SC_FALSE; - sc_queue addrs_with_not_emitted_erase_events; - sc_queue_init(&addrs_with_not_emitted_erase_events); - while (!sc_queue_empty(&iter_queue)) + if ((el->flags.states & SC_STATE_IS_UNDER_ERASURE) != SC_STATE_IS_UNDER_ERASURE) { - p_addr = sc_queue_pop(&iter_queue); - - sc_addr element_addr; - element_addr.seg = SC_ADDR_LOCAL_SEG_FROM_INT((sc_pointer_to_sc_addr_hash)p_addr); - element_addr.offset = SC_ADDR_LOCAL_OFFSET_FROM_INT((sc_pointer_to_sc_addr_hash)p_addr); - - sc_monitor * monitor = sc_monitor_table_get_monitor_for_addr(&storage->addr_monitors_table, element_addr); - sc_monitor_acquire_read(monitor); - result = sc_storage_get_element_by_addr(element_addr, &el); - if (result != SC_RESULT_OK) + el->flags.states |= SC_STATE_IS_UNDER_ERASURE; + if ((type & sc_type_connector_mask) != 0) { - sc_monitor_release_read(monitor); - continue; + erase_incoming_connector_result = sc_event_emit( + ctx, + begin_addr, + sc_event_before_erase_connector_addr, + addr, + type, + end_addr, + sc_storage_element_erase, + connector_chain_begin_addr); + erase_outgoing_connector_result = sc_event_emit( + ctx, + end_addr, + sc_event_before_erase_connector_addr, + addr, + type, + begin_addr, + sc_storage_element_erase, + connector_chain_begin_addr); } - sc_type const type = el->flags.type; - sc_addr const begin_addr = el->arc.begin; - sc_addr const end_addr = el->arc.end; - - sc_result erase_incoming_connector_result = SC_RESULT_NO; - sc_result erase_outgoing_connector_result = SC_RESULT_NO; - sc_result erase_incoming_arc_result = SC_RESULT_NO; - sc_result erase_outgoing_arc_result = SC_RESULT_NO; - sc_result erase_element_result = SC_RESULT_NO; - - if ((el->flags.states & SC_STATE_IS_ERASABLE) != SC_STATE_IS_ERASABLE) + if (sc_type_has_subtype(type, sc_type_common_edge)) { - if ((type & sc_type_connector_mask) != 0) - { - erase_incoming_connector_result = sc_event_emit( - ctx, - begin_addr, - sc_event_before_erase_connector_addr, - element_addr, - type, - end_addr, - sc_storage_element_erase, - element_addr); - erase_outgoing_connector_result = sc_event_emit( - ctx, - end_addr, - sc_event_before_erase_connector_addr, - element_addr, - type, - begin_addr, - sc_storage_element_erase, - element_addr); - } - - if (sc_type_has_subtype(type, sc_type_common_edge)) - { - erase_incoming_arc_result = sc_event_emit( - ctx, - begin_addr, - sc_event_before_erase_edge_addr, - element_addr, - type, - end_addr, - sc_storage_element_erase, - element_addr); - erase_outgoing_arc_result = sc_event_emit( - ctx, - end_addr, - sc_event_before_erase_edge_addr, - element_addr, - type, - begin_addr, - sc_storage_element_erase, - element_addr); - } - else if (sc_type_has_subtype_in_mask(type, sc_type_arc_mask)) - { - erase_outgoing_arc_result = sc_event_emit( - ctx, - begin_addr, - sc_event_before_erase_outgoing_arc_addr, - element_addr, - type, - end_addr, - sc_storage_element_erase, - element_addr); - erase_incoming_arc_result = sc_event_emit( - ctx, - end_addr, - sc_event_before_erase_incoming_arc_addr, - element_addr, - type, - begin_addr, - sc_storage_element_erase, - element_addr); - } - - erase_element_result = sc_event_emit( + erase_incoming_arc_result = sc_event_emit( ctx, - element_addr, - sc_event_before_erase_element_addr, - SC_ADDR_EMPTY, - 0, - SC_ADDR_EMPTY, + begin_addr, + sc_event_before_erase_edge_addr, + addr, + type, + end_addr, sc_storage_element_erase, - element_addr); - - el->flags.states |= SC_STATE_IS_ERASABLE; + connector_chain_begin_addr); + erase_outgoing_arc_result = sc_event_emit( + ctx, + end_addr, + sc_event_before_erase_edge_addr, + addr, + type, + begin_addr, + sc_storage_element_erase, + connector_chain_begin_addr); + } + else if (sc_type_has_subtype_in_mask(type, sc_type_arc_mask)) + { + erase_outgoing_arc_result = sc_event_emit( + ctx, + begin_addr, + sc_event_before_erase_outgoing_arc_addr, + addr, + type, + end_addr, + sc_storage_element_erase, + connector_chain_begin_addr); + erase_incoming_arc_result = sc_event_emit( + ctx, + end_addr, + sc_event_before_erase_incoming_arc_addr, + addr, + type, + begin_addr, + sc_storage_element_erase, + connector_chain_begin_addr); } - if (erase_incoming_connector_result == SC_RESULT_OK || erase_outgoing_connector_result == SC_RESULT_OK - || erase_incoming_arc_result == SC_RESULT_OK || erase_outgoing_arc_result == SC_RESULT_OK - || erase_element_result == SC_RESULT_OK) + erase_element_result = sc_event_emit( + ctx, + addr, + sc_event_before_erase_element_addr, + SC_ADDR_EMPTY, + 0, + SC_ADDR_EMPTY, + sc_storage_element_erase, + connector_chain_begin_addr); + } + else + { + sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); + sc_monitor_acquire_read(&emission_manager->pool_monitor); + if (sc_hash_table_get(emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))) + != null_ptr) { - sc_monitor_release_read(monitor); - continue; + there_are_active_erase_events_with_addr = SC_TRUE; } + sc_monitor_release_read(&emission_manager->pool_monitor); + } - sc_queue_push(&addrs_with_not_emitted_erase_events, p_addr); + if (erase_incoming_connector_result == SC_RESULT_OK || erase_outgoing_connector_result == SC_RESULT_OK + || erase_incoming_arc_result == SC_RESULT_OK || erase_outgoing_arc_result == SC_RESULT_OK + || erase_element_result == SC_RESULT_OK || there_are_active_erase_events_with_addr) + { + sc_list * end_of_erased_connectors_chain; + sc_list_init(&end_of_erased_connectors_chain); + sc_list_push_back(end_of_erased_connectors_chain, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); + sc_list_push_back(connector_lists_required_for_erase_events, end_of_erased_connectors_chain); + sc_monitor_release_write(monitor); + return SC_RESULT_OK; + } + sc_addr connector_addr = el->first_out_arc; + while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) + { + sc_pointer p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); - sc_addr connector_addr = el->first_out_arc; - while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) + sc_element * connector = sc_hash_table_get(processed_connectors, p_addr); + if (connector == null_ptr) { - p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); - - sc_element * connector = sc_hash_table_get(cache_table, p_addr); - if (connector == null_ptr) + result = sc_storage_get_element_by_addr(connector_addr, &connector); + if (result != SC_RESULT_OK) + break; + + sc_hash_table_insert(processed_connectors, p_addr, connector); + sc_list * connectors_with_emitted_erase_events; + sc_list_init(&connectors_with_emitted_erase_events); + _sc_storage_element_erase_with_base_element( + ctx, + connector_chain_begin_addr, + connector_addr, + processed_connectors, + connectors_with_emitted_erase_events, + elements_that_can_be_erased); + if (connectors_with_emitted_erase_events->size == 0) + sc_list_destroy(connectors_with_emitted_erase_events); + else { - result = sc_storage_get_element_by_addr(connector_addr, &connector); - if (result != SC_RESULT_OK) - break; - - sc_hash_table_insert(cache_table, p_addr, connector); - sc_queue_push(&iter_queue, p_addr); + sc_iterator * lists_it = sc_list_iterator(connectors_with_emitted_erase_events); + while (sc_iterator_next(lists_it)) + { + sc_list * connectors_list = (sc_list *)sc_iterator_get(lists_it); + sc_list_push_back(connector_lists_required_for_erase_events, connectors_list); + } + sc_iterator_destroy(lists_it); } - - connector_addr = connector->arc.next_begin_out_arc; } - connector_addr = el->first_in_arc; - while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) - { - p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); + connector_addr = connector->arc.next_begin_out_arc; + } - sc_element * connector = sc_hash_table_get(cache_table, p_addr); - if (connector == null_ptr) - { - result = sc_storage_get_element_by_addr(connector_addr, &connector); - if (result != SC_RESULT_OK) - break; + connector_addr = el->first_in_arc; + while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) + { + sc_pointer p_addr = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)); - sc_hash_table_insert(cache_table, p_addr, connector); - sc_queue_push(&iter_queue, p_addr); + sc_element * connector = sc_hash_table_get(processed_connectors, p_addr); + if (connector == null_ptr) + { + result = sc_storage_get_element_by_addr(connector_addr, &connector); + if (result != SC_RESULT_OK) + break; + + sc_hash_table_insert(processed_connectors, p_addr, connector); + sc_list * connectors_with_emitted_erase_events; + sc_list_init(&connectors_with_emitted_erase_events); + _sc_storage_element_erase_with_base_element( + ctx, + connector_chain_begin_addr, + connector_addr, + processed_connectors, + connectors_with_emitted_erase_events, + elements_that_can_be_erased); + if (connectors_with_emitted_erase_events->size == 0) + sc_list_destroy(connectors_with_emitted_erase_events); + else + { + sc_iterator * lists_it = sc_list_iterator(connectors_with_emitted_erase_events); + while (sc_iterator_next(lists_it)) + { + sc_list * connectors_list = (sc_list *)sc_iterator_get(lists_it); + sc_list_push_back(connector_lists_required_for_erase_events, connectors_list); + } + sc_iterator_destroy(lists_it); } - - connector_addr = connector->arc.next_end_in_arc; } - sc_monitor_release_read(monitor); + connector_addr = connector->arc.next_end_in_arc; } - sc_queue_destroy(&iter_queue); - sc_hash_table_destroy(cache_table); - - while (!sc_queue_empty(&addrs_with_not_emitted_erase_events)) + sc_monitor_release_write(monitor); + if (connector_lists_required_for_erase_events->size == 0) + { + sc_list_push_back(elements_that_can_be_erased, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); + } + else { - sc_addr_hash addr_int = (sc_pointer_to_sc_addr_hash)sc_queue_pop(&addrs_with_not_emitted_erase_events); - addr.seg = SC_ADDR_LOCAL_SEG_FROM_INT(addr_int); - addr.offset = SC_ADDR_LOCAL_OFFSET_FROM_INT(addr_int); + sc_iterator * lists_it = sc_list_iterator(connector_lists_required_for_erase_events); + while (sc_iterator_next(lists_it)) + { + sc_list * connectors_list = (sc_list *)sc_iterator_get(lists_it); + sc_list_push_back(connectors_list, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); + } + sc_iterator_destroy(lists_it); + } + return SC_RESULT_OK; +} + +sc_result sc_storage_element_erase(sc_memory_context const * ctx, sc_addr addr) +{ + sc_hash_table * connectors_added_to_queue = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); + sc_list * connectors_lists_used_for_erase_events; + sc_list_init(&connectors_lists_used_for_erase_events); + sc_list * elements_that_can_be_erased; + sc_list_init(&elements_that_can_be_erased); + + sc_result result = _sc_storage_element_erase_with_base_element( + ctx, addr, addr, connectors_added_to_queue, connectors_lists_used_for_erase_events, elements_that_can_be_erased); + sc_hash_table_destroy(connectors_added_to_queue); + + sc_iterator * elements_it = sc_list_iterator(elements_that_can_be_erased); + while (sc_iterator_next(elements_it)) + { + sc_addr_hash addr_hash = (sc_pointer_to_sc_addr_hash)sc_iterator_get(elements_it); + SC_ADDR_LOCAL_FROM_INT(addr_hash, addr); _sc_storage_element_erase(addr); } + sc_iterator_destroy(elements_it); + sc_list_destroy(elements_that_can_be_erased); - sc_queue_destroy(&addrs_with_not_emitted_erase_events); - - result = SC_RESULT_OK; -error: return result; } diff --git a/sc-memory/sc-core/src/sc_memory_context_manager.c b/sc-memory/sc-core/src/sc_memory_context_manager.c index 3991e8e4d..e3dc90b96 100644 --- a/sc-memory/sc-core/src/sc_memory_context_manager.c +++ b/sc-memory/sc-core/src/sc_memory_context_manager.c @@ -232,8 +232,8 @@ sc_bool _sc_memory_context_are_events_pending(sc_memory_context const * ctx) void _sc_memory_context_pend_event( sc_memory_context const * ctx, - sc_event_type event_type_addr, sc_addr subscription_addr, + sc_event_type event_type_addr, sc_addr connector_addr, sc_type connector_type, sc_addr other_addr) diff --git a/sc-memory/sc-core/src/sc_memory_context_manager.h b/sc-memory/sc-core/src/sc_memory_context_manager.h index 4113e9094..18696166f 100644 --- a/sc-memory/sc-core/src/sc_memory_context_manager.h +++ b/sc-memory/sc-core/src/sc_memory_context_manager.h @@ -13,6 +13,7 @@ #include "sc-core/sc-base/sc_monitor.h" #include "sc-store/sc-base/sc_message.h" +#include "sc-store/sc-event/sc_event_queue.h" typedef struct _sc_memory_context_manager sc_memory_context_manager; typedef struct _sc_event_emit_params sc_event_emit_params; @@ -104,8 +105,8 @@ void _sc_memory_context_pending_end(sc_memory_context * ctx); /*! Function that adds an event to the pending events list in a sc-memory context. * @param ctx Pointer to the sc-memory context to which the event is added. - * @param type Type of the event to be added. * @param subscription_addr sc_addr representing the sc-element associated with the event. + * @param event_type_addr Type of the event to be added. * @param connector_addr sc-address representing the sc-connector associated with the event. * @param connector_type sc-type representing the sc-connector associated with the event. * @param other_addr sc-address representing the other sc-element associated with the event. @@ -113,8 +114,8 @@ void _sc_memory_context_pending_end(sc_memory_context * ctx); */ void _sc_memory_context_pend_event( sc_memory_context const * ctx, - sc_event_type event_type_addr, sc_addr subscription_addr, + sc_event_type event_type_addr, sc_addr connector_addr, sc_type connector_type, sc_addr other_addr); diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 91920a4da..18d41daea 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -1367,3 +1367,52 @@ TEST_F(ScEventTest, BlockEventsGuardAndEmitAfter) std::this_thread::sleep_for(std::chrono::milliseconds(10)); EXPECT_TRUE(isCalled); } + +TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) +{ + ScAddr nodeAddr1 = m_ctx->GenerateNode(ScType::ConstNode); + bool isDelayedCalled = false; + auto delayedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr1, + [&isDelayedCalled, this](auto const & event) + { + isDelayedCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + EXPECT_TRUE(m_ctx->IsElement(event.GetArc())); + auto const & [sourceAddr, targetAddr] = m_ctx->GetConnectorIncidentElements(event.GetArc()); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr)); + auto const & [source2Addr, target2Addr] = m_ctx->GetConnectorIncidentElements(targetAddr); + EXPECT_TRUE(m_ctx->IsElement(source2Addr)); + EXPECT_TRUE(m_ctx->IsElement(target2Addr)); + auto const & [source3Addr, target3Addr] = m_ctx->GetConnectorIncidentElements(target2Addr); + EXPECT_TRUE(m_ctx->IsElement(source3Addr)); + EXPECT_TRUE(m_ctx->IsElement(target3Addr)); + auto const & [source4Addr, target4Addr] = m_ctx->GetConnectorIncidentElements(target3Addr); + EXPECT_TRUE(m_ctx->IsElement(source4Addr)); + EXPECT_TRUE(m_ctx->IsElement(target4Addr)); + EXPECT_TRUE(m_ctx->GetElementType(target4Addr).IsNode()); + }); + bool isInstantCalled = false; + auto instantSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr1, + [&isInstantCalled](auto const &) + { + isInstantCalled = true; + }); + + ScAddr const nodeAddr2 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const nodeAddr3 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const nodeAddr4 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const nodeAddr5 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr const & arcAddr1 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr3, nodeAddr2); + ScAddr const & arcAddr2 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr4, arcAddr1); + ScAddr const & arcAddr3 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr5, arcAddr2); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr1, arcAddr3); + m_ctx->EraseElement(nodeAddr2); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + EXPECT_TRUE(isInstantCalled); + EXPECT_TRUE(isDelayedCalled); +} From 28247a4accab3ddcec363c814dfbb52dd430de04 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Tue, 15 Oct 2024 18:18:32 +0300 Subject: [PATCH 02/18] [memory][event] Replace list with bool flag --- sc-memory/sc-core/src/sc-store/sc_storage.c | 62 ++++----------------- 1 file changed, 10 insertions(+), 52 deletions(-) diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 5694e31ca..a2bf77bc1 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -674,7 +674,7 @@ sc_result _sc_storage_element_erase_with_base_element( sc_addr connector_chain_begin_addr, sc_addr addr, sc_hash_table * processed_connectors, - sc_list * connector_lists_required_for_erase_events, + sc_bool * does_branch_have_emitted_events, sc_list * elements_that_can_be_erased) { sc_result result; @@ -793,10 +793,7 @@ sc_result _sc_storage_element_erase_with_base_element( || erase_incoming_arc_result == SC_RESULT_OK || erase_outgoing_arc_result == SC_RESULT_OK || erase_element_result == SC_RESULT_OK || there_are_active_erase_events_with_addr) { - sc_list * end_of_erased_connectors_chain; - sc_list_init(&end_of_erased_connectors_chain); - sc_list_push_back(end_of_erased_connectors_chain, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); - sc_list_push_back(connector_lists_required_for_erase_events, end_of_erased_connectors_chain); + *does_branch_have_emitted_events = SC_TRUE; sc_monitor_release_write(monitor); return SC_RESULT_OK; } @@ -813,27 +810,15 @@ sc_result _sc_storage_element_erase_with_base_element( break; sc_hash_table_insert(processed_connectors, p_addr, connector); - sc_list * connectors_with_emitted_erase_events; - sc_list_init(&connectors_with_emitted_erase_events); + sc_bool does_subbranch_have_emitted_events; _sc_storage_element_erase_with_base_element( ctx, connector_chain_begin_addr, connector_addr, processed_connectors, - connectors_with_emitted_erase_events, + &does_subbranch_have_emitted_events, elements_that_can_be_erased); - if (connectors_with_emitted_erase_events->size == 0) - sc_list_destroy(connectors_with_emitted_erase_events); - else - { - sc_iterator * lists_it = sc_list_iterator(connectors_with_emitted_erase_events); - while (sc_iterator_next(lists_it)) - { - sc_list * connectors_list = (sc_list *)sc_iterator_get(lists_it); - sc_list_push_back(connector_lists_required_for_erase_events, connectors_list); - } - sc_iterator_destroy(lists_it); - } + *does_branch_have_emitted_events |= does_subbranch_have_emitted_events; } connector_addr = connector->arc.next_begin_out_arc; @@ -852,60 +837,33 @@ sc_result _sc_storage_element_erase_with_base_element( break; sc_hash_table_insert(processed_connectors, p_addr, connector); - sc_list * connectors_with_emitted_erase_events; - sc_list_init(&connectors_with_emitted_erase_events); + sc_bool does_subbranch_have_emitted_events; _sc_storage_element_erase_with_base_element( ctx, connector_chain_begin_addr, connector_addr, processed_connectors, - connectors_with_emitted_erase_events, + &does_subbranch_have_emitted_events, elements_that_can_be_erased); - if (connectors_with_emitted_erase_events->size == 0) - sc_list_destroy(connectors_with_emitted_erase_events); - else - { - sc_iterator * lists_it = sc_list_iterator(connectors_with_emitted_erase_events); - while (sc_iterator_next(lists_it)) - { - sc_list * connectors_list = (sc_list *)sc_iterator_get(lists_it); - sc_list_push_back(connector_lists_required_for_erase_events, connectors_list); - } - sc_iterator_destroy(lists_it); - } + *does_branch_have_emitted_events |= does_subbranch_have_emitted_events; } connector_addr = connector->arc.next_end_in_arc; } sc_monitor_release_write(monitor); - if (connector_lists_required_for_erase_events->size == 0) - { - sc_list_push_back(elements_that_can_be_erased, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); - } - else - { - sc_iterator * lists_it = sc_list_iterator(connector_lists_required_for_erase_events); - while (sc_iterator_next(lists_it)) - { - sc_list * connectors_list = (sc_list *)sc_iterator_get(lists_it); - sc_list_push_back(connectors_list, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); - } - sc_iterator_destroy(lists_it); - } return SC_RESULT_OK; } sc_result sc_storage_element_erase(sc_memory_context const * ctx, sc_addr addr) { sc_hash_table * connectors_added_to_queue = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); - sc_list * connectors_lists_used_for_erase_events; - sc_list_init(&connectors_lists_used_for_erase_events); + sc_bool does_branch_have_emitted_events = SC_FALSE; sc_list * elements_that_can_be_erased; sc_list_init(&elements_that_can_be_erased); sc_result result = _sc_storage_element_erase_with_base_element( - ctx, addr, addr, connectors_added_to_queue, connectors_lists_used_for_erase_events, elements_that_can_be_erased); + ctx, addr, addr, connectors_added_to_queue, &does_branch_have_emitted_events, elements_that_can_be_erased); sc_hash_table_destroy(connectors_added_to_queue); From 3ceb3bd6bad4c7d0519092aea9d6f2766d8af563 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Wed, 16 Oct 2024 08:33:14 +0300 Subject: [PATCH 03/18] [memory][event] Reuse bool flag during recursion --- sc-memory/sc-core/include/sc-core/sc_types.h | 2 +- sc-memory/sc-core/src/sc-store/sc_storage.c | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sc-memory/sc-core/include/sc-core/sc_types.h b/sc-memory/sc-core/include/sc-core/sc_types.h index 4b6d0dca6..e837bceaa 100644 --- a/sc-memory/sc-core/include/sc-core/sc_types.h +++ b/sc-memory/sc-core/include/sc-core/sc_types.h @@ -111,7 +111,7 @@ struct _sc_addr * and get them back from int */ # define SC_ADDR_LOCAL_TO_INT(addr) (sc_uint32)(((addr).seg << 16) | ((addr).offset & 0xffff)) -# define SC_ADDR_LOCAL_OFFSET_FROM_INT(v) (sc_uint16)((v)&0x0000ffff) +# define SC_ADDR_LOCAL_OFFSET_FROM_INT(v) (sc_uint16)((v) & 0x0000ffff) # define SC_ADDR_LOCAL_SEG_FROM_INT(v) SC_ADDR_LOCAL_OFFSET_FROM_INT(v >> 16) # define SC_ADDR_LOCAL_FROM_INT(hash, addr) \ addr.seg = SC_ADDR_LOCAL_SEG_FROM_INT(hash); \ diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index a2bf77bc1..f46bb8144 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -810,15 +810,13 @@ sc_result _sc_storage_element_erase_with_base_element( break; sc_hash_table_insert(processed_connectors, p_addr, connector); - sc_bool does_subbranch_have_emitted_events; _sc_storage_element_erase_with_base_element( ctx, connector_chain_begin_addr, connector_addr, processed_connectors, - &does_subbranch_have_emitted_events, + does_branch_have_emitted_events, elements_that_can_be_erased); - *does_branch_have_emitted_events |= does_subbranch_have_emitted_events; } connector_addr = connector->arc.next_begin_out_arc; @@ -837,21 +835,22 @@ sc_result _sc_storage_element_erase_with_base_element( break; sc_hash_table_insert(processed_connectors, p_addr, connector); - sc_bool does_subbranch_have_emitted_events; _sc_storage_element_erase_with_base_element( ctx, connector_chain_begin_addr, connector_addr, processed_connectors, - &does_subbranch_have_emitted_events, + does_branch_have_emitted_events, elements_that_can_be_erased); - *does_branch_have_emitted_events |= does_subbranch_have_emitted_events; } connector_addr = connector->arc.next_end_in_arc; } sc_monitor_release_write(monitor); + + if (!*does_branch_have_emitted_events) + sc_list_push_back(elements_that_can_be_erased, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); return SC_RESULT_OK; } From 57b1f6a469a18e8613e60e378e8c95797e35879d Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Wed, 16 Oct 2024 09:22:04 +0300 Subject: [PATCH 04/18] [memory][event] Add check for emission_manager absence --- sc-memory/sc-core/src/sc-store/sc_storage.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index f46bb8144..26bf1b173 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -780,13 +780,16 @@ sc_result _sc_storage_element_erase_with_base_element( else { sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); - sc_monitor_acquire_read(&emission_manager->pool_monitor); - if (sc_hash_table_get(emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))) - != null_ptr) + if (emission_manager != null_ptr) { - there_are_active_erase_events_with_addr = SC_TRUE; + sc_monitor_acquire_read(& emission_manager->pool_monitor); + if (sc_hash_table_get(emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))) + != null_ptr) + { + there_are_active_erase_events_with_addr = SC_TRUE; + } + sc_monitor_release_read(& emission_manager->pool_monitor); } - sc_monitor_release_read(&emission_manager->pool_monitor); } if (erase_incoming_connector_result == SC_RESULT_OK || erase_outgoing_connector_result == SC_RESULT_OK From 5c4804e5f9a53ad5a38ec62a8251bcb5bb6030c9 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Wed, 16 Oct 2024 09:34:33 +0300 Subject: [PATCH 05/18] [memory][event][refactor] Remove space after & --- sc-memory/sc-core/src/sc-store/sc_storage.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 26bf1b173..42e44e055 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -782,7 +782,7 @@ sc_result _sc_storage_element_erase_with_base_element( sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); if (emission_manager != null_ptr) { - sc_monitor_acquire_read(& emission_manager->pool_monitor); + sc_monitor_acquire_read(&emission_manager->pool_monitor); if (sc_hash_table_get(emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))) != null_ptr) { From dcb5483e17a8f1e920ce7b93eec7528b51f924d3 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Wed, 16 Oct 2024 09:37:34 +0300 Subject: [PATCH 06/18] [memory][event][refactor] Remove space after & --- sc-memory/sc-core/src/sc-store/sc_storage.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 42e44e055..7ad020633 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -788,7 +788,7 @@ sc_result _sc_storage_element_erase_with_base_element( { there_are_active_erase_events_with_addr = SC_TRUE; } - sc_monitor_release_read(& emission_manager->pool_monitor); + sc_monitor_release_read(&emission_manager->pool_monitor); } } From e6c443d881fcea73914a283564970991de40e1cd Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Tue, 29 Oct 2024 19:28:30 +0300 Subject: [PATCH 07/18] [tests] Add pending for initiation condition generation --- .../sc-memory/units/agents/test_sc_specified_agents.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp b/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp index 1ed2a8a2d..4e4fa6a55 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp @@ -1940,8 +1940,11 @@ TEST_F(ScSpecifiedAgentTest, ATestSpecifiedAgentGeneratingIncomingArcHasFullSpec ScAddr const & testSetAddr = m_ctx->SearchElementBySystemIdentifier("test_set"); ScAddr const & testOtherSetAddr = m_ctx->SearchElementBySystemIdentifier("test_other_set"); ScAddr const & testRelation = m_ctx->SearchElementBySystemIdentifier("test_relation"); - ScAddr const & edgeAddr = m_ctx->GenerateConnector(ScType::ConstPermPosArc, testOtherSetAddr, testSetAddr); - m_ctx->GenerateConnector(ScType::ConstPermPosArc, testRelation, edgeAddr); + { + ScMemoryContextEventsPendingGuard guard(* m_ctx); + ScAddr const & edgeAddr = m_ctx->GenerateConnector(ScType::ConstPermPosArc, testOtherSetAddr, testSetAddr); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, testRelation, edgeAddr); + } EXPECT_TRUE(ATestSpecifiedAgent::msWaiter.Wait()); From ea5b29b288b9f8fc44d34734a48df41d7a111e04 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Tue, 29 Oct 2024 19:31:58 +0300 Subject: [PATCH 08/18] [tests] Rename for more clear understanding --- .../sc-memory/units/events/test_sc_event.cpp | 22 ++++++++++--------- .../sc-memory/units/events/test_sc_wait.cpp | 4 ++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 18d41daea..48132fa2b 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -1371,13 +1371,14 @@ TEST_F(ScEventTest, BlockEventsGuardAndEmitAfter) TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) { ScAddr nodeAddr1 = m_ctx->GenerateNode(ScType::ConstNode); - bool isDelayedCalled = false; - auto delayedSubscription = + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription>( nodeAddr1, - [&isDelayedCalled, this](auto const & event) + [&isLongExecutedSubscriptionCalled, this](auto const & event) { - isDelayedCalled = true; + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; std::this_thread::sleep_for(std::chrono::milliseconds(10)); EXPECT_TRUE(m_ctx->IsElement(event.GetArc())); auto const & [sourceAddr, targetAddr] = m_ctx->GetConnectorIncidentElements(event.GetArc()); @@ -1394,13 +1395,14 @@ TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) EXPECT_TRUE(m_ctx->IsElement(target4Addr)); EXPECT_TRUE(m_ctx->GetElementType(target4Addr).IsNode()); }); - bool isInstantCalled = false; - auto instantSubscription = + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = m_ctx->CreateElementaryEventSubscription>( nodeAddr1, - [&isInstantCalled](auto const &) + [&isShortExecutedSubscriptionCalled](auto const &) { - isInstantCalled = true; + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; }); ScAddr const nodeAddr2 = m_ctx->GenerateNode(ScType::ConstNode); @@ -1413,6 +1415,6 @@ TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr1, arcAddr3); m_ctx->EraseElement(nodeAddr2); std::this_thread::sleep_for(std::chrono::milliseconds(20)); - EXPECT_TRUE(isInstantCalled); - EXPECT_TRUE(isDelayedCalled); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); } diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp index 8b99f0dae..8566e7495 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_wait.cpp @@ -239,7 +239,7 @@ TEST_F(ScWaiterTest, InvalidWaitersWithCondition) m_ctx->CreateConditionWaiter(nodeAddr, {}), utils::ExceptionInvalidParams); } -TEST_F(ScWaiterTest, InvalidEventsFotWaiters) +TEST_F(ScWaiterTest, InvalidEventsForWaiters) { ScAddr nodeAddr = m_ctx->GenerateNode(ScType::ConstNode); ScAddr eventClassAddr; @@ -256,7 +256,7 @@ TEST_F(ScWaiterTest, InvalidEventsFotWaiters) EXPECT_THROW(m_ctx->CreateEventWaiter(eventClassAddr, nodeAddr, {}), utils::ExceptionInvalidParams); } -TEST_F(ScWaiterTest, InvalidEventsFotWaitersWithConditions) +TEST_F(ScWaiterTest, InvalidEventsForWaitersWithConditions) { ScAddr nodeAddr = m_ctx->GenerateNode(ScType::ConstNode); ScAddr eventClassAddr; From 7a8f4c4048eff1644c4b14d7da57562d36e1b255 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Wed, 30 Oct 2024 09:24:40 +0300 Subject: [PATCH 09/18] [tests][event] Use pending for arc assignment --- .../tests/sc-memory/units/events/test_sc_event.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 48132fa2b..6d11a674e 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -105,9 +105,13 @@ bool TestEventSubscriptionGenerateConnector(ScAgentContext * ctx) { nodeAddr2 = ctx->GenerateNode(ScType::ConstNode); EXPECT_TRUE(nodeAddr2.IsValid()); - - arcAddr = ctx->GenerateConnector(eventConnectorType, nodeAddr2, nodeAddr1); - EXPECT_TRUE(arcAddr.IsValid()); + { + //sometimes OnEvent is called before arcAddr is assigned to generated connector, so pending is used to assure that + // arcAddr is assigned before it is used in OnEvent + ScMemoryContextEventsPendingGuard guard(*ctx); + arcAddr = ctx->GenerateConnector(eventConnectorType, nodeAddr2, nodeAddr1); + EXPECT_TRUE(arcAddr.IsValid()); + } }; bool isDone = false; From c5d87434e56dd5e079330a9785e4f2a074c28243 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Wed, 30 Oct 2024 09:30:58 +0300 Subject: [PATCH 10/18] [tests] Don't shadow local variable --- .../tests/sc-memory/units/common/test_sc_memory_context.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp b/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp index 70d64367c..0b656c436 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/common/test_sc_memory_context.cpp @@ -541,7 +541,7 @@ TEST_F( ScAddr arcAddr; std::atomic_bool isChecked = false; { - auto eventSubscription = + auto eventSubscriptionForErasure = m_ctx->CreateElementaryEventSubscription>( usersSetAddr, [this, &userContext, &isChecked](ScEventBeforeEraseOutgoingArc const &) @@ -1629,7 +1629,7 @@ TEST_F( ScAddr usersSetEdgeAddr; std::atomic_bool isChecked = false; { - auto eventSubscription = + auto eventSubscriptionForErasure = m_ctx->CreateElementaryEventSubscription>( usersSetAddr, [&](ScEventBeforeEraseOutgoingArc const &) From 9a97df2c3a5136995fbb9254110c62618a852df9 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Fri, 15 Nov 2024 15:47:26 +0300 Subject: [PATCH 11/18] [events][tests] Erase incident nodes without event callbacks --- .../src/sc-store/sc-event/sc_event_queue.c | 38 ++- sc-memory/sc-core/src/sc-store/sc_storage.c | 128 ++++++-- .../units/agents/test_sc_specified_agents.cpp | 6 +- .../sc-memory/units/events/test_sc_event.cpp | 295 +++++++++++++++++- 4 files changed, 409 insertions(+), 58 deletions(-) diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c index 600f55ee3..f78d9c0a3 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c @@ -103,20 +103,19 @@ void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) { sc_monitor_acquire_write(&queue->pool_monitor); - sc_uint32 * count = (sc_uint32 *)sc_hash_table_get( - queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(event->connector_addr))); - if (count != null_ptr) + sc_addr key = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr) + ? event->event_addr + : event->connector_addr; + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); + if (count != 0) { - --(*count); - if (*count == 0) - { - sc_hash_table_remove( - queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(event->connector_addr))); - sc_mem_free(count); - } + --count; + if (count == 0) + sc_hash_table_remove(queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); else sc_hash_table_insert( - queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(event->connector_addr)), count); + queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)), GUINT_TO_POINTER(count)); } sc_monitor_release_write(&queue->pool_monitor); } @@ -250,15 +249,14 @@ void _sc_event_emission_manager_add( || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) { - sc_uint32 * count = (sc_uint32 *)sc_hash_table_get( - manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr))); - if (count == null_ptr) - { - count = sc_mem_new(sc_uint32, 1); - *count = 0; - } - ++(*count); - sc_hash_table_insert(manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(connector_addr)), count); + sc_addr key = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr) + ? event_subscription->subscription_addr + : connector_addr; + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); + ++count; + sc_hash_table_insert( + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)), GUINT_TO_POINTER(count)); } g_thread_pool_push(manager->thread_pool, event, null_ptr); diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 7ad020633..d287b4125 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -669,13 +669,43 @@ sc_result _sc_storage_element_erase(sc_addr addr) return result; } -sc_result _sc_storage_element_erase_with_base_element( +void _sc_storage_cache_node_under_erasure_without_erase_events( + sc_addr addr, + sc_hash_table * incident_nodes_under_erasure) +{ + sc_pointer key = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr)); + if (sc_hash_table_get(incident_nodes_under_erasure, key) != null_ptr) + return; + + sc_monitor * monitor = sc_monitor_table_get_monitor_for_addr(&storage->addr_monitors_table, addr); + sc_monitor_acquire_read(monitor); + sc_element * element; + sc_result result = sc_storage_get_element_by_addr(addr, &element); + if (result == SC_RESULT_OK && sc_type_is_node(element->flags.type) + && (element->flags.states & SC_STATE_IS_UNDER_ERASURE) == SC_STATE_IS_UNDER_ERASURE) + { + sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); + if (emission_manager != null_ptr) + { + sc_monitor_acquire_read(&emission_manager->pool_monitor); + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))); + if (count == 0) + sc_hash_table_insert(incident_nodes_under_erasure, key, element); + sc_monitor_release_read(&emission_manager->pool_monitor); + } + } + sc_monitor_release_read(monitor); +} + +sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( sc_memory_context const * ctx, sc_addr connector_chain_begin_addr, sc_addr addr, sc_hash_table * processed_connectors, sc_bool * does_branch_have_emitted_events, - sc_list * elements_that_can_be_erased) + sc_list * elements_that_can_be_erased, + sc_hash_table * incident_nodes_under_erasure) { sc_result result; sc_element * el = null_ptr; @@ -687,6 +717,14 @@ sc_result _sc_storage_element_erase_with_base_element( sc_monitor_release_write(monitor); return result; } + // if element wasn't erased before then erase events should be emitted, otherwise there should be a check for started + // and not finished erase events callbacks + sc_bool const was_element_erased_before = (el->flags.states & SC_STATE_IS_UNDER_ERASURE) == SC_STATE_IS_UNDER_ERASURE; + if (!was_element_erased_before) + el->flags.states |= SC_STATE_IS_UNDER_ERASURE; + sc_monitor_release_write(monitor); + + sc_monitor_acquire_read(monitor); sc_type const type = el->flags.type; sc_addr const begin_addr = el->arc.begin; @@ -699,9 +737,8 @@ sc_result _sc_storage_element_erase_with_base_element( sc_result erase_element_result = SC_RESULT_NO; sc_bool there_are_active_erase_events_with_addr = SC_FALSE; - if ((el->flags.states & SC_STATE_IS_UNDER_ERASURE) != SC_STATE_IS_UNDER_ERASURE) + if (!was_element_erased_before) { - el->flags.states |= SC_STATE_IS_UNDER_ERASURE; if ((type & sc_type_connector_mask) != 0) { erase_incoming_connector_result = sc_event_emit( @@ -783,8 +820,9 @@ sc_result _sc_storage_element_erase_with_base_element( if (emission_manager != null_ptr) { sc_monitor_acquire_read(&emission_manager->pool_monitor); - if (sc_hash_table_get(emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))) - != null_ptr) + sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( + emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))); + if (count != 0) { there_are_active_erase_events_with_addr = SC_TRUE; } @@ -797,8 +835,6 @@ sc_result _sc_storage_element_erase_with_base_element( || erase_element_result == SC_RESULT_OK || there_are_active_erase_events_with_addr) { *does_branch_have_emitted_events = SC_TRUE; - sc_monitor_release_write(monitor); - return SC_RESULT_OK; } sc_addr connector_addr = el->first_out_arc; while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) @@ -813,13 +849,14 @@ sc_result _sc_storage_element_erase_with_base_element( break; sc_hash_table_insert(processed_connectors, p_addr, connector); - _sc_storage_element_erase_with_base_element( + _sc_storage_element_erase_with_incoming_outgoing_connectors( ctx, connector_chain_begin_addr, connector_addr, processed_connectors, does_branch_have_emitted_events, - elements_that_can_be_erased); + elements_that_can_be_erased, + incident_nodes_under_erasure); } connector_addr = connector->arc.next_begin_out_arc; @@ -838,47 +875,96 @@ sc_result _sc_storage_element_erase_with_base_element( break; sc_hash_table_insert(processed_connectors, p_addr, connector); - _sc_storage_element_erase_with_base_element( + _sc_storage_element_erase_with_incoming_outgoing_connectors( ctx, connector_chain_begin_addr, connector_addr, processed_connectors, does_branch_have_emitted_events, - elements_that_can_be_erased); + elements_that_can_be_erased, + incident_nodes_under_erasure); } connector_addr = connector->arc.next_end_in_arc; } + sc_monitor_release_read(monitor); - sc_monitor_release_write(monitor); + // if addr is connector and its source/target is node that is under erasure and does not have emitted erase events + // then cache source/target to try to erase them with their incoming/outgoing connectors + if ((type & sc_type_connector_mask) != 0) + { + if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, begin_addr)) + _sc_storage_cache_node_under_erasure_without_erase_events(begin_addr, incident_nodes_under_erasure); + if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, end_addr)) + _sc_storage_cache_node_under_erasure_without_erase_events(end_addr, incident_nodes_under_erasure); + } if (!*does_branch_have_emitted_events) sc_list_push_back(elements_that_can_be_erased, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); return SC_RESULT_OK; } -sc_result sc_storage_element_erase(sc_memory_context const * ctx, sc_addr addr) +sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors_and_hanging_nodes( + sc_memory_context const * ctx, + sc_addr erased_element_addr, + sc_addr reason_of_erasure_addr, + sc_hash_table * connectors_added_to_queue) { - sc_hash_table * connectors_added_to_queue = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); + sc_element * element; + sc_storage_get_element_by_addr(erased_element_addr, &element); + sc_hash_table_insert(connectors_added_to_queue, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(erased_element_addr)), element); + + sc_hash_table * incident_nodes_under_erasure = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); sc_bool does_branch_have_emitted_events = SC_FALSE; sc_list * elements_that_can_be_erased; sc_list_init(&elements_that_can_be_erased); - sc_result result = _sc_storage_element_erase_with_base_element( - ctx, addr, addr, connectors_added_to_queue, &does_branch_have_emitted_events, elements_that_can_be_erased); - - sc_hash_table_destroy(connectors_added_to_queue); + sc_result result = _sc_storage_element_erase_with_incoming_outgoing_connectors( + ctx, + reason_of_erasure_addr, + erased_element_addr, + connectors_added_to_queue, + &does_branch_have_emitted_events, + elements_that_can_be_erased, + incident_nodes_under_erasure); sc_iterator * elements_it = sc_list_iterator(elements_that_can_be_erased); + sc_addr from_hash_addr; while (sc_iterator_next(elements_it)) { sc_addr_hash addr_hash = (sc_pointer_to_sc_addr_hash)sc_iterator_get(elements_it); - SC_ADDR_LOCAL_FROM_INT(addr_hash, addr); - _sc_storage_element_erase(addr); + SC_ADDR_LOCAL_FROM_INT(addr_hash, from_hash_addr); + _sc_storage_element_erase(from_hash_addr); } sc_iterator_destroy(elements_it); sc_list_destroy(elements_that_can_be_erased); + if (!does_branch_have_emitted_events) + { + sc_hash_table_iterator nodes_to_erase_iterator; + sc_hash_table_iterator_init(&nodes_to_erase_iterator, incident_nodes_under_erasure); + sc_pointer key, value; + while (sc_hash_table_iterator_next(&nodes_to_erase_iterator, &key, &value)) + { + if (sc_hash_table_get(connectors_added_to_queue, key) == null_ptr) + { + sc_addr node_that_was_erased_addr; + SC_ADDR_LOCAL_FROM_INT((sc_pointer_to_sc_addr_hash)key, node_that_was_erased_addr); + _sc_storage_element_erase_with_incoming_outgoing_connectors_and_hanging_nodes( + ctx, node_that_was_erased_addr, reason_of_erasure_addr, connectors_added_to_queue); + } + } + } + + sc_hash_table_destroy(incident_nodes_under_erasure); + return result; +} +sc_result sc_storage_element_erase(sc_memory_context const * ctx, sc_addr addr) +{ + sc_hash_table * connectors_added_to_queue = sc_hash_table_init(g_direct_hash, g_direct_equal, null_ptr, null_ptr); + sc_result result = _sc_storage_element_erase_with_incoming_outgoing_connectors_and_hanging_nodes( + ctx, addr, addr, connectors_added_to_queue); + sc_hash_table_destroy(connectors_added_to_queue); return result; } diff --git a/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp b/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp index 4e4fa6a55..e7ebd8134 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/agents/test_sc_specified_agents.cpp @@ -1941,9 +1941,9 @@ TEST_F(ScSpecifiedAgentTest, ATestSpecifiedAgentGeneratingIncomingArcHasFullSpec ScAddr const & testOtherSetAddr = m_ctx->SearchElementBySystemIdentifier("test_other_set"); ScAddr const & testRelation = m_ctx->SearchElementBySystemIdentifier("test_relation"); { - ScMemoryContextEventsPendingGuard guard(* m_ctx); - ScAddr const & edgeAddr = m_ctx->GenerateConnector(ScType::ConstPermPosArc, testOtherSetAddr, testSetAddr); - m_ctx->GenerateConnector(ScType::ConstPermPosArc, testRelation, edgeAddr); + ScMemoryContextEventsPendingGuard guard(*m_ctx); + ScAddr const & arcAddr = m_ctx->GenerateConnector(ScType::ConstPermPosArc, testOtherSetAddr, testSetAddr); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, testRelation, arcAddr); } EXPECT_TRUE(ATestSpecifiedAgent::msWaiter.Wait()); diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 6d11a674e..971ea837c 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -106,8 +106,8 @@ bool TestEventSubscriptionGenerateConnector(ScAgentContext * ctx) nodeAddr2 = ctx->GenerateNode(ScType::ConstNode); EXPECT_TRUE(nodeAddr2.IsValid()); { - //sometimes OnEvent is called before arcAddr is assigned to generated connector, so pending is used to assure that - // arcAddr is assigned before it is used in OnEvent + // sometimes OnEvent is called before arcAddr is assigned to generated connector, so pending is used to assure + // that arcAddr is assigned before it is used in OnEvent ScMemoryContextEventsPendingGuard guard(*ctx); arcAddr = ctx->GenerateConnector(eventConnectorType, nodeAddr2, nodeAddr1); EXPECT_TRUE(arcAddr.IsValid()); @@ -1374,6 +1374,7 @@ TEST_F(ScEventTest, BlockEventsGuardAndEmitAfter) TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) { + int const sleepTime = 20; ScAddr nodeAddr1 = m_ctx->GenerateNode(ScType::ConstNode); bool isLongExecutedSubscriptionCalled = false; auto longExecutedSubscription = @@ -1383,21 +1384,21 @@ TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) { EXPECT_FALSE(isLongExecutedSubscriptionCalled); isLongExecutedSubscriptionCalled = true; - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); EXPECT_TRUE(m_ctx->IsElement(event.GetArc())); auto const & [sourceAddr, targetAddr] = m_ctx->GetConnectorIncidentElements(event.GetArc()); EXPECT_TRUE(m_ctx->IsElement(sourceAddr)); EXPECT_TRUE(m_ctx->IsElement(targetAddr)); - auto const & [source2Addr, target2Addr] = m_ctx->GetConnectorIncidentElements(targetAddr); - EXPECT_TRUE(m_ctx->IsElement(source2Addr)); - EXPECT_TRUE(m_ctx->IsElement(target2Addr)); - auto const & [source3Addr, target3Addr] = m_ctx->GetConnectorIncidentElements(target2Addr); - EXPECT_TRUE(m_ctx->IsElement(source3Addr)); - EXPECT_TRUE(m_ctx->IsElement(target3Addr)); - auto const & [source4Addr, target4Addr] = m_ctx->GetConnectorIncidentElements(target3Addr); - EXPECT_TRUE(m_ctx->IsElement(source4Addr)); - EXPECT_TRUE(m_ctx->IsElement(target4Addr)); - EXPECT_TRUE(m_ctx->GetElementType(target4Addr).IsNode()); + auto const & [sourceAddr2, targetAddr2] = m_ctx->GetConnectorIncidentElements(targetAddr); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr2)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr2)); + auto const & [sourceAddr3, targetAddr3] = m_ctx->GetConnectorIncidentElements(targetAddr2); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr3)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr3)); + auto const & [sourceAddr4, targetAddr4] = m_ctx->GetConnectorIncidentElements(targetAddr3); + EXPECT_TRUE(m_ctx->IsElement(sourceAddr4)); + EXPECT_TRUE(m_ctx->IsElement(targetAddr4)); + EXPECT_TRUE(m_ctx->GetElementType(targetAddr4).IsNode()); }); bool isShortExecutedSubscriptionCalled = false; auto shortExecutedSubscription = @@ -1417,8 +1418,274 @@ TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) ScAddr const & arcAddr2 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr4, arcAddr1); ScAddr const & arcAddr3 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr5, arcAddr2); m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr1, arcAddr3); + m_ctx->EraseElement(nodeAddr2); - std::this_thread::sleep_for(std::chrono::milliseconds(20)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, TwoSubscriptionsForNodeErasure) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithTwoSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithTwoSubscriptionsAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithTwoSubscriptionsAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithTwoSubscriptionsAddr); + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + SC_LOG_INFO("checking nodeWithoutSubscriptionsAddr first"); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + SC_LOG_INFO("checking nodeWithTwoSubscriptionsAddr first"); + EXPECT_TRUE(m_ctx->IsElement(nodeWithTwoSubscriptionsAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + SC_LOG_INFO("checking nodeWithTwoSubscriptionsAddr second"); + EXPECT_FALSE(m_ctx->IsElement(nodeWithTwoSubscriptionsAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingFirstAndNodeFinishEarlier) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithSubscriptionAddr); + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingFirstAndNodeFinishLater) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithSubscriptionAddr); + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingSecondAndNodeFinishEarlier) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + m_ctx->EraseElement(nodeWithSubscriptionAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + EXPECT_TRUE(isShortExecutedSubscriptionCalled); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} + +TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeErasingSecondAndNodeFinishLater) +{ + int const sleepTime = 20; + ScAddr nodeWithoutSubscriptionsAddr = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeWithSubscriptionAddr = m_ctx->GenerateNode(ScType::ConstNode); + m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeWithSubscriptionAddr, nodeWithoutSubscriptionsAddr); + bool isShortExecutedSubscriptionCalled = false; + auto shortExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeWithSubscriptionAddr, + [&isShortExecutedSubscriptionCalled](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionCalled); + isShortExecutedSubscriptionCalled = true; + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( + nodeWithSubscriptionAddr, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); + m_ctx->EraseElement(nodeWithSubscriptionAddr); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); + EXPECT_TRUE(m_ctx->IsElement(nodeWithSubscriptionAddr)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeWithSubscriptionAddr)); EXPECT_TRUE(isShortExecutedSubscriptionCalled); EXPECT_TRUE(isLongExecutedSubscriptionCalled); } + +TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarlier) +{ + int const sleepTime = 20; + ScAddr nodeAddr1 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr2 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr3 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr4 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr nodeAddr5 = m_ctx->GenerateNode(ScType::ConstNode); + ScAddr arc1 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr1, nodeAddr2); + ScAddr arc2 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr3, nodeAddr4); + ScAddr arc3 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, arc2, arc1); + ScAddr arc4 = m_ctx->GenerateConnector(ScType::ConstPermPosArc, nodeAddr5, arc2); + + bool isShortExecutedSubscriptionForArc1Called = false; + auto shortExecutedSubscriptionForArc1 = m_ctx->CreateElementaryEventSubscription( + nodeAddr1, + [&isShortExecutedSubscriptionForArc1Called](auto const &) + { + EXPECT_FALSE(isShortExecutedSubscriptionForArc1Called); + isShortExecutedSubscriptionForArc1Called = true; + }); + bool isMediumExecutedSubscriptionForArc4Called = false; + auto mediumExecutedSubscriptionForArc4 = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr5, + [&isMediumExecutedSubscriptionForArc4Called, &sleepTime](auto const &) + { + EXPECT_FALSE(isMediumExecutedSubscriptionForArc4Called); + isMediumExecutedSubscriptionForArc4Called = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); + }); + bool isLongExecutedSubscriptionCalled = false; + auto longExecutedSubscription = + m_ctx->CreateElementaryEventSubscription>( + nodeAddr1, + [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + { + EXPECT_FALSE(isLongExecutedSubscriptionCalled); + isLongExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + }); + + m_ctx->EraseElement(nodeAddr1); + m_ctx->EraseElement(nodeAddr2); + m_ctx->EraseElement(nodeAddr4); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr3)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_TRUE(m_ctx->IsElement(arc2)); + EXPECT_FALSE(m_ctx->IsElement(arc3)); + EXPECT_TRUE(m_ctx->IsElement(arc4)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 3)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr3)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_FALSE(m_ctx->IsElement(arc2)); + EXPECT_FALSE(m_ctx->IsElement(arc3)); + EXPECT_FALSE(m_ctx->IsElement(arc4)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr1)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr3)); + EXPECT_FALSE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + EXPECT_FALSE(m_ctx->IsElement(arc1)); + EXPECT_FALSE(m_ctx->IsElement(arc2)); + EXPECT_FALSE(m_ctx->IsElement(arc3)); + EXPECT_FALSE(m_ctx->IsElement(arc4)); + + EXPECT_TRUE(isShortExecutedSubscriptionForArc1Called); + EXPECT_TRUE(isMediumExecutedSubscriptionForArc4Called); + EXPECT_TRUE(isLongExecutedSubscriptionCalled); +} From bacd431300a98c7f792ac1a61867170b257c1d07 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Fri, 29 Nov 2024 14:23:12 +0300 Subject: [PATCH 12/18] [event][tests] Don't erase elements that were erased --- sc-memory/sc-core/src/sc-store/sc_storage.c | 37 +++++++++++-------- .../sc-memory/units/events/test_sc_event.cpp | 3 -- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index d287b4125..6b7a43d8b 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -669,20 +669,23 @@ sc_result _sc_storage_element_erase(sc_addr addr) return result; } -void _sc_storage_cache_node_under_erasure_without_erase_events( +void _sc_storage_cache_elements_under_erasure_without_erase_events( sc_addr addr, - sc_hash_table * incident_nodes_under_erasure) + sc_hash_table * incident_elements_under_erasure, + sc_hash_table * processed_elements) { sc_pointer key = GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr)); - if (sc_hash_table_get(incident_nodes_under_erasure, key) != null_ptr) + if (sc_hash_table_get(incident_elements_under_erasure, key) != null_ptr) + return; + if (sc_hash_table_get(processed_elements, key) != null_ptr) return; sc_monitor * monitor = sc_monitor_table_get_monitor_for_addr(&storage->addr_monitors_table, addr); sc_monitor_acquire_read(monitor); sc_element * element; sc_result result = sc_storage_get_element_by_addr(addr, &element); - if (result == SC_RESULT_OK && sc_type_is_node(element->flags.type) - && (element->flags.states & SC_STATE_IS_UNDER_ERASURE) == SC_STATE_IS_UNDER_ERASURE) + if (result == SC_RESULT_OK && (element->flags.states & SC_STATE_IS_UNDER_ERASURE) == SC_STATE_IS_UNDER_ERASURE + && (element->flags.states & SC_STATE_REQUEST_ERASURE) == 0) { sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); if (emission_manager != null_ptr) @@ -691,7 +694,7 @@ void _sc_storage_cache_node_under_erasure_without_erase_events( sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))); if (count == 0) - sc_hash_table_insert(incident_nodes_under_erasure, key, element); + sc_hash_table_insert(incident_elements_under_erasure, key, element); sc_monitor_release_read(&emission_manager->pool_monitor); } } @@ -725,6 +728,11 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( sc_monitor_release_write(monitor); sc_monitor_acquire_read(monitor); + if ((el->flags.states & SC_STATE_REQUEST_ERASURE) == SC_STATE_REQUEST_ERASURE) + { + sc_monitor_release_read(monitor); + return SC_RESULT_OK; + } sc_type const type = el->flags.type; sc_addr const begin_addr = el->arc.begin; @@ -739,7 +747,7 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( if (!was_element_erased_before) { - if ((type & sc_type_connector_mask) != 0) + if (sc_type_is_connector(type)) { erase_incoming_connector_result = sc_event_emit( ctx, @@ -823,9 +831,7 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( emission_manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(addr))); if (count != 0) - { there_are_active_erase_events_with_addr = SC_TRUE; - } sc_monitor_release_read(&emission_manager->pool_monitor); } } @@ -833,9 +839,8 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( if (erase_incoming_connector_result == SC_RESULT_OK || erase_outgoing_connector_result == SC_RESULT_OK || erase_incoming_arc_result == SC_RESULT_OK || erase_outgoing_arc_result == SC_RESULT_OK || erase_element_result == SC_RESULT_OK || there_are_active_erase_events_with_addr) - { *does_branch_have_emitted_events = SC_TRUE; - } + sc_addr connector_addr = el->first_out_arc; while (SC_ADDR_IS_NOT_EMPTY(connector_addr)) { @@ -887,17 +892,19 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( connector_addr = connector->arc.next_end_in_arc; } - sc_monitor_release_read(monitor); // if addr is connector and its source/target is node that is under erasure and does not have emitted erase events // then cache source/target to try to erase them with their incoming/outgoing connectors - if ((type & sc_type_connector_mask) != 0) + if (sc_type_is_connector(type)) { if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, begin_addr)) - _sc_storage_cache_node_under_erasure_without_erase_events(begin_addr, incident_nodes_under_erasure); + _sc_storage_cache_elements_under_erasure_without_erase_events( + begin_addr, incident_nodes_under_erasure, processed_connectors); if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, end_addr)) - _sc_storage_cache_node_under_erasure_without_erase_events(end_addr, incident_nodes_under_erasure); + _sc_storage_cache_elements_under_erasure_without_erase_events( + end_addr, incident_nodes_under_erasure, processed_connectors); } + sc_monitor_release_read(monitor); if (!*does_branch_have_emitted_events) sc_list_push_back(elements_that_can_be_erased, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 971ea837c..0f432e0d2 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -1451,12 +1451,9 @@ TEST_F(ScEventTest, TwoSubscriptionsForNodeErasure) m_ctx->EraseElement(nodeWithTwoSubscriptionsAddr); m_ctx->EraseElement(nodeWithoutSubscriptionsAddr); std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); - SC_LOG_INFO("checking nodeWithoutSubscriptionsAddr first"); EXPECT_FALSE(m_ctx->IsElement(nodeWithoutSubscriptionsAddr)); - SC_LOG_INFO("checking nodeWithTwoSubscriptionsAddr first"); EXPECT_TRUE(m_ctx->IsElement(nodeWithTwoSubscriptionsAddr)); std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); - SC_LOG_INFO("checking nodeWithTwoSubscriptionsAddr second"); EXPECT_FALSE(m_ctx->IsElement(nodeWithTwoSubscriptionsAddr)); EXPECT_TRUE(isShortExecutedSubscriptionCalled); EXPECT_TRUE(isLongExecutedSubscriptionCalled); From 969e37d154749e5211338e66af57197c20ccf11f Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Fri, 29 Nov 2024 15:40:28 +0300 Subject: [PATCH 13/18] [event] Release read monitor before source and target processing --- sc-memory/sc-core/src/sc-store/sc_storage.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 6b7a43d8b..47813b72e 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -892,6 +892,7 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( connector_addr = connector->arc.next_end_in_arc; } + sc_monitor_release_read(monitor); // if addr is connector and its source/target is node that is under erasure and does not have emitted erase events // then cache source/target to try to erase them with their incoming/outgoing connectors @@ -904,7 +905,6 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( _sc_storage_cache_elements_under_erasure_without_erase_events( end_addr, incident_nodes_under_erasure, processed_connectors); } - sc_monitor_release_read(monitor); if (!*does_branch_have_emitted_events) sc_list_push_back(elements_that_can_be_erased, (sc_addr_hash_to_sc_pointer)SC_ADDR_LOCAL_TO_INT(addr)); From b6c6c041da58db5a918ea0075d26df3bc7408308 Mon Sep 17 00:00:00 2001 From: kilativ-dotcom Date: Fri, 6 Dec 2024 13:20:58 +0300 Subject: [PATCH 14/18] [tests][event] Reorder monitor acquire --- .../sc-core/src/sc-store/sc_event_subscription.c | 12 +++++------- sc-memory/sc-core/src/sc-store/sc_storage.c | 4 ++-- .../tests/sc-memory/units/events/test_sc_event.cpp | 7 +++++++ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c index d72392e0d..80b34dbd8 100644 --- a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c +++ b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c @@ -92,14 +92,13 @@ sc_result _sc_event_subscription_manager_remove( if (manager == null_ptr) return SC_RESULT_NO; - sc_monitor_acquire_write(&manager->events_table_monitor); element_events_list = (sc_hash_table_list *)sc_hash_table_get(manager->events_table, TABLE_KEY(event_subscription->subscription_addr)); if (element_events_list == null_ptr) - goto error; + return SC_RESULT_ERROR_INVALID_PARAMS; if (manager->events_table == null_ptr) - goto error; + return SC_RESULT_ERROR_INVALID_PARAMS; // remove event_subscription from list of events for specified sc-element element_events_list = sc_hash_table_list_remove(element_events_list, (sc_const_pointer)event_subscription); @@ -109,11 +108,7 @@ sc_result _sc_event_subscription_manager_remove( sc_hash_table_insert( manager->events_table, TABLE_KEY(event_subscription->subscription_addr), (sc_pointer)element_events_list); - sc_monitor_release_write(&manager->events_table_monitor); return SC_RESULT_OK; -error: - sc_monitor_release_write(&manager->events_table_monitor); - return SC_RESULT_ERROR_INVALID_PARAMS; } void sc_event_subscription_manager_initialize(sc_event_subscription_manager ** manager) @@ -207,10 +202,12 @@ sc_result sc_event_subscription_destroy(sc_event_subscription * event_subscripti sc_event_subscription_manager * subscription_manager = sc_storage_get_event_subscription_manager(); sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); + sc_monitor_acquire_write(&subscription_manager->events_table_monitor); sc_monitor_acquire_write(&event_subscription->monitor); if (_sc_event_subscription_manager_remove(subscription_manager, event_subscription) != SC_RESULT_OK) { sc_monitor_release_write(&event_subscription->monitor); + sc_monitor_release_write(&subscription_manager->events_table_monitor); return SC_RESULT_ERROR; } @@ -234,6 +231,7 @@ sc_result sc_event_subscription_destroy(sc_event_subscription * event_subscripti sc_monitor_release_write(&emission_manager->pool_monitor); } sc_monitor_release_write(&event_subscription->monitor); + sc_monitor_release_write(&subscription_manager->events_table_monitor); return SC_RESULT_OK; } diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 47813b72e..79901afd8 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -685,7 +685,7 @@ void _sc_storage_cache_elements_under_erasure_without_erase_events( sc_element * element; sc_result result = sc_storage_get_element_by_addr(addr, &element); if (result == SC_RESULT_OK && (element->flags.states & SC_STATE_IS_UNDER_ERASURE) == SC_STATE_IS_UNDER_ERASURE - && (element->flags.states & SC_STATE_REQUEST_ERASURE) == 0) + && (element->flags.states & SC_STATE_REQUEST_ERASURE) != SC_STATE_REQUEST_ERASURE) { sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); if (emission_manager != null_ptr) @@ -896,7 +896,7 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( // if addr is connector and its source/target is node that is under erasure and does not have emitted erase events // then cache source/target to try to erase them with their incoming/outgoing connectors - if (sc_type_is_connector(type)) + if (sc_type_is_connector(type) && !*does_branch_have_emitted_events) { if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, begin_addr)) _sc_storage_cache_elements_under_erasure_without_erase_events( diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 0f432e0d2..a68a9e97b 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -1408,6 +1408,7 @@ TEST_F(ScEventTest, TwoSubscriptionsForOneArcErasure) { EXPECT_FALSE(isShortExecutedSubscriptionCalled); isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); }); ScAddr const nodeAddr2 = m_ctx->GenerateNode(ScType::ConstNode); @@ -1437,6 +1438,7 @@ TEST_F(ScEventTest, TwoSubscriptionsForNodeErasure) { EXPECT_FALSE(isShortExecutedSubscriptionCalled); isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); }); bool isLongExecutedSubscriptionCalled = false; auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( @@ -1472,6 +1474,7 @@ TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeEra { EXPECT_FALSE(isShortExecutedSubscriptionCalled); isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); }); bool isLongExecutedSubscriptionCalled = false; auto longExecutedSubscription = @@ -1510,6 +1513,7 @@ TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeEra { EXPECT_FALSE(isShortExecutedSubscriptionCalled); isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); }); bool isLongExecutedSubscriptionCalled = false; auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( @@ -1546,6 +1550,7 @@ TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeEra { EXPECT_FALSE(isShortExecutedSubscriptionCalled); isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); }); bool isLongExecutedSubscriptionCalled = false; auto longExecutedSubscription = @@ -1584,6 +1589,7 @@ TEST_F(ScEventTest, SubscriptionForNodeAndConnectorsErasureWithSubscribedNodeEra { EXPECT_FALSE(isShortExecutedSubscriptionCalled); isShortExecutedSubscriptionCalled = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); }); bool isLongExecutedSubscriptionCalled = false; auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription( @@ -1626,6 +1632,7 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl { EXPECT_FALSE(isShortExecutedSubscriptionForArc1Called); isShortExecutedSubscriptionForArc1Called = true; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); }); bool isMediumExecutedSubscriptionForArc4Called = false; auto mediumExecutedSubscriptionForArc4 = From a0697015746b6938a80dc8f8eb64e6ab25ea34e4 Mon Sep 17 00:00:00 2001 From: NikitaZotov Date: Sat, 11 Jan 2025 15:00:00 +0300 Subject: [PATCH 15/18] [memory][core][events] Fix syncing removing subscription and emmiting event linked to subscription --- .../src/sc-store/sc-event/sc_event_queue.c | 79 ++++++++++++------- .../src/sc-store/sc-event/sc_event_queue.h | 5 +- .../src/sc-store/sc_event_subscription.c | 51 ++++++------ 3 files changed, 78 insertions(+), 57 deletions(-) diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c index f78d9c0a3..04c4fa835 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c @@ -6,6 +6,8 @@ #include "sc_event_queue.h" +#include + #include "sc-core/sc_event_subscription.h" #include "sc_event_private.h" @@ -65,22 +67,22 @@ void _sc_event_emission_pool_worker_data_destroy(sc_event * data) void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) { sc_event * event = (sc_event *)data; - sc_event_emission_manager * queue = user_data; + sc_event_emission_manager * manager = user_data; sc_event_subscription * event_subscription = event->event_subscription; if (event_subscription == null_ptr) goto destroy; - sc_monitor_acquire_read(&queue->destroy_monitor); + sc_monitor_acquire_read(&manager->destroy_monitor); - if (queue->running == SC_FALSE) + if (manager->running == SC_FALSE) goto end; sc_monitor_acquire_read(&event_subscription->monitor); if (sc_event_subscription_is_deletable(event_subscription)) { sc_monitor_release_read(&event_subscription->monitor); - goto end; + goto destroy; } sc_event_callback callback = event_subscription->callback; @@ -102,30 +104,34 @@ void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) { - sc_monitor_acquire_write(&queue->pool_monitor); + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); sc_addr key = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr) ? event->event_addr : event->connector_addr; sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( - queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); if (count != 0) { --count; if (count == 0) - sc_hash_table_remove(queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); + sc_hash_table_remove(manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); else sc_hash_table_insert( - queue->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)), GUINT_TO_POINTER(count)); + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)), GUINT_TO_POINTER(count)); } - sc_monitor_release_write(&queue->pool_monitor); + sc_monitor_release_write(&manager->emitted_erase_events_monitor); } sc_monitor_release_read(&event_subscription->monitor); end: - sc_monitor_release_read(&queue->destroy_monitor); + sc_monitor_release_read(&manager->destroy_monitor); destroy: { + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); + --manager->current_emitted_events_count; + sc_monitor_release_write(&manager->emitted_erase_events_monitor); + if (event->callback != null_ptr) { sc_memory_context * ctx = sc_memory_context_new_ext(event->user_addr); @@ -166,11 +172,12 @@ void sc_event_emission_manager_initialize(sc_event_emission_manager ** manager, } (*manager)->running = SC_TRUE; - sc_monitor_init(&(*manager)->destroy_monitor); sc_monitor_init(&(*manager)->pool_monitor); + (*manager)->current_emitted_events_count = 0; (*manager)->emitted_erase_events = sc_hash_table_init(emitted_events_hash_func, emitted_events_equal_func, null_ptr, null_ptr); + sc_monitor_init(&(*manager)->emitted_erase_events_monitor); (*manager)->thread_pool = g_thread_pool_new( _sc_event_emission_pool_worker, *manager, @@ -184,18 +191,8 @@ void sc_event_emission_manager_stop(sc_event_emission_manager * manager) if (manager == null_ptr) return; - sc_bool is_running = SC_FALSE; - - sc_monitor_acquire_read(&manager->destroy_monitor); - is_running = manager->running; - sc_monitor_release_read(&manager->destroy_monitor); - - if (is_running) - { - sc_monitor_acquire_write(&manager->destroy_monitor); + if (manager->running) manager->running = SC_FALSE; - sc_monitor_release_write(&manager->destroy_monitor); - } } void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) @@ -203,6 +200,8 @@ void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) if (manager == null_ptr) return; + // Acquire write lock once for all operations + sc_monitor_acquire_write(&manager->destroy_monitor); sc_monitor_acquire_write(&manager->pool_monitor); if (manager->thread_pool) { @@ -210,6 +209,17 @@ void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager) manager->thread_pool = null_ptr; } + // Wait for current events to finish with a timeout + sc_int32 const MAX_WAIT_ITERATIONS = 1000; + sc_int32 wait_iterations = 0; + while (manager->current_emitted_events_count > 0 && wait_iterations < MAX_WAIT_ITERATIONS) + { + sc_monitor_release_write(&manager->destroy_monitor); + usleep(10); + sc_monitor_acquire_write(&manager->destroy_monitor); + wait_iterations++; + } + while (!sc_queue_empty(&manager->deletable_events_subscriptions)) { sc_event_subscription * event_subscription = sc_queue_pop(&manager->deletable_events_subscriptions); @@ -236,29 +246,38 @@ void _sc_event_emission_manager_add( sc_event_do_after_callback callback, sc_addr event_addr) { - if (manager == null_ptr) - return; - sc_event * event = _sc_event_new(event_subscription, user_addr, connector_addr, connector_type, other_addr, callback, event_addr); sc_monitor_acquire_write(&manager->pool_monitor); + if (manager->thread_pool == null_ptr) + { + sc_monitor_release_write(&manager->pool_monitor); + return; + } + if (SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_connector_addr) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_incoming_arc_addr) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_outgoing_arc_addr) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr) || SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)) { - sc_addr key = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr) - ? event_subscription->subscription_addr - : connector_addr; + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); + sc_addr key_addr = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr) + ? event_subscription->subscription_addr + : connector_addr; sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get( - manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key))); + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key_addr))); ++count; sc_hash_table_insert( - manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)), GUINT_TO_POINTER(count)); + manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key_addr)), GUINT_TO_POINTER(count)); + sc_monitor_release_write(&manager->emitted_erase_events_monitor); } + sc_monitor_acquire_write(&manager->emitted_erase_events_monitor); + ++manager->current_emitted_events_count; + sc_monitor_release_write(&manager->emitted_erase_events_monitor); + g_thread_pool_push(manager->thread_pool, event, null_ptr); sc_monitor_release_write(&manager->pool_monitor); } diff --git a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h index 11aee6d4a..e7613e315 100644 --- a/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h +++ b/sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h @@ -32,8 +32,11 @@ typedef struct sc_monitor destroy_monitor; ///< Monitor for synchronizing access to the destruction process. GThreadPool * thread_pool; ///< Thread pool used for worker threads processing events. sc_monitor pool_monitor; ///< Monitor for synchronizing access to the thread pool. + sc_uint32 current_emitted_events_count; ///< Current count of emitted events. sc_hash_table * emitted_erase_events; ///< Table that stores amount of active event subscriptions that were initiated - ///< due to erasure of sc-element which sc-addr is stored as a key + ///< due to erasure of sc-element which sc-addr is stored as a key. + sc_monitor emitted_erase_events_monitor; ///< Monitor for synchronizing current_emitted_events_count and + ///< emitted_erase_events. } sc_event_emission_manager; /*! Function that initializes an sc-event emission manager. diff --git a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c index 80b34dbd8..acd748182 100644 --- a/sc-memory/sc-core/src/sc-store/sc_event_subscription.c +++ b/sc-memory/sc-core/src/sc-store/sc_event_subscription.c @@ -51,6 +51,7 @@ sc_result _sc_event_subscription_manager_add( sc_event_subscription_manager * manager, sc_event_subscription * event_subscription) { + sc_result result = SC_RESULT_OK; sc_hash_table_list * element_events_list = null_ptr; // the first, if table doesn't exist, then return error @@ -58,23 +59,22 @@ sc_result _sc_event_subscription_manager_add( return SC_RESULT_NO; sc_monitor_acquire_write(&manager->events_table_monitor); - if (manager->events_table == null_ptr) { - sc_monitor_release_write(&manager->events_table_monitor); - return SC_RESULT_NO; + result = SC_RESULT_OK; + goto result; } - // if there are no events for specified sc-element, then generate new events list + // if there are no events for specified sc-element, then create new events list element_events_list = (sc_hash_table_list *)sc_hash_table_get(manager->events_table, TABLE_KEY(event_subscription->subscription_addr)); element_events_list = sc_hash_table_list_append(element_events_list, (sc_pointer)event_subscription); sc_hash_table_insert( manager->events_table, TABLE_KEY(event_subscription->subscription_addr), (sc_pointer)element_events_list); +result: sc_monitor_release_write(&manager->events_table_monitor); - - return SC_RESULT_OK; + return result; } /*! Removes the specified sc-event_subscription from the registration manager's events table. @@ -86,19 +86,21 @@ sc_result _sc_event_subscription_manager_remove( sc_event_subscription_manager * manager, sc_event_subscription * event_subscription) { + sc_result result = SC_RESULT_OK; sc_hash_table_list * element_events_list = null_ptr; // the first, if table doesn't exist, then return error if (manager == null_ptr) return SC_RESULT_NO; + sc_monitor_acquire_write(&manager->events_table_monitor); element_events_list = (sc_hash_table_list *)sc_hash_table_get(manager->events_table, TABLE_KEY(event_subscription->subscription_addr)); if (element_events_list == null_ptr) - return SC_RESULT_ERROR_INVALID_PARAMS; - - if (manager->events_table == null_ptr) - return SC_RESULT_ERROR_INVALID_PARAMS; + { + result = SC_RESULT_ERROR_INVALID_PARAMS; + goto result; + } // remove event_subscription from list of events for specified sc-element element_events_list = sc_hash_table_list_remove(element_events_list, (sc_const_pointer)event_subscription); @@ -108,7 +110,9 @@ sc_result _sc_event_subscription_manager_remove( sc_hash_table_insert( manager->events_table, TABLE_KEY(event_subscription->subscription_addr), (sc_pointer)element_events_list); - return SC_RESULT_OK; +result: + sc_monitor_release_write(&manager->events_table_monitor); + return result; } void sc_event_subscription_manager_initialize(sc_event_subscription_manager ** manager) @@ -200,14 +204,17 @@ sc_result sc_event_subscription_destroy(sc_event_subscription * event_subscripti return SC_RESULT_NO; sc_event_subscription_manager * subscription_manager = sc_storage_get_event_subscription_manager(); + if (subscription_manager == null_ptr) + return SC_RESULT_NO; + sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager(); + if (subscription_manager == null_ptr) + return SC_RESULT_NO; - sc_monitor_acquire_write(&subscription_manager->events_table_monitor); sc_monitor_acquire_write(&event_subscription->monitor); if (_sc_event_subscription_manager_remove(subscription_manager, event_subscription) != SC_RESULT_OK) { sc_monitor_release_write(&event_subscription->monitor); - sc_monitor_release_write(&subscription_manager->events_table_monitor); return SC_RESULT_ERROR; } @@ -231,7 +238,6 @@ sc_result sc_event_subscription_destroy(sc_event_subscription * event_subscripti sc_monitor_release_write(&emission_manager->pool_monitor); } sc_monitor_release_write(&event_subscription->monitor); - sc_monitor_release_write(&subscription_manager->events_table_monitor); return SC_RESULT_OK; } @@ -248,16 +254,11 @@ sc_result sc_event_notify_element_deleted(sc_addr element) if (subscription_manager == null_ptr || subscription_manager->events_table == null_ptr) goto result; - // TODO(NikitaZotov): Implement monitor for `subscription_manager` to synchronize its freeing. // lookup for all registered to specified sc-element events sc_monitor_acquire_write(&subscription_manager->events_table_monitor); - if (subscription_manager != null_ptr) - { - element_events_list = - (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(element)); - if (element_events_list != null_ptr) - sc_hash_table_remove(subscription_manager->events_table, TABLE_KEY(element)); - } + element_events_list = (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(element)); + if (element_events_list != null_ptr) + sc_hash_table_remove(subscription_manager->events_table, TABLE_KEY(element)); if (element_events_list != null_ptr) { @@ -331,12 +332,10 @@ sc_result sc_event_emit_impl( if (subscription_manager == null_ptr || subscription_manager->events_table == null_ptr) goto result; - // TODO(NikitaZotov): Implement monitor for `subscription_manager` to synchronize its freeing. // lookup for all registered to specified sc-element events sc_monitor_acquire_read(&subscription_manager->events_table_monitor); - if (subscription_manager != null_ptr) - element_events_list = - (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(subscription_addr)); + element_events_list = + (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(subscription_addr)); while (element_events_list != null_ptr) { From 172ddc8b195eb9ad29e723aa66c029f231b6350b Mon Sep 17 00:00:00 2001 From: NikitaZotov Date: Sat, 11 Jan 2025 15:01:41 +0300 Subject: [PATCH 16/18] [memory][core][events][tests] Fix checking timeouts for processes in a state of race --- .../sc-memory/units/events/test_sc_event.cpp | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index a68a9e97b..51693dba7 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -1628,8 +1628,13 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl bool isShortExecutedSubscriptionForArc1Called = false; auto shortExecutedSubscriptionForArc1 = m_ctx->CreateElementaryEventSubscription( nodeAddr1, - [&isShortExecutedSubscriptionForArc1Called](auto const &) + [&](auto const &) { + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(arc3)); + EXPECT_FALSE(isShortExecutedSubscriptionForArc1Called); isShortExecutedSubscriptionForArc1Called = true; std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1638,8 +1643,13 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl auto mediumExecutedSubscriptionForArc4 = m_ctx->CreateElementaryEventSubscription>( nodeAddr5, - [&isMediumExecutedSubscriptionForArc4Called, &sleepTime](auto const &) + [&](auto const &) { + EXPECT_TRUE(m_ctx->IsElement(arc2)); + EXPECT_TRUE(m_ctx->IsElement(arc4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr4)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr5)); + EXPECT_FALSE(isMediumExecutedSubscriptionForArc4Called); isMediumExecutedSubscriptionForArc4Called = true; std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); @@ -1648,8 +1658,13 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl auto longExecutedSubscription = m_ctx->CreateElementaryEventSubscription>( nodeAddr1, - [&isLongExecutedSubscriptionCalled, &sleepTime](auto const & event) + [&](auto const &) { + EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); + EXPECT_TRUE(m_ctx->IsElement(arc1)); + EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); + EXPECT_TRUE(m_ctx->IsElement(arc3)); + EXPECT_FALSE(isLongExecutedSubscriptionCalled); isLongExecutedSubscriptionCalled = true; std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); @@ -1658,6 +1673,7 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl m_ctx->EraseElement(nodeAddr1); m_ctx->EraseElement(nodeAddr2); m_ctx->EraseElement(nodeAddr4); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 4)); EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); @@ -1668,7 +1684,8 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl EXPECT_TRUE(m_ctx->IsElement(arc2)); EXPECT_FALSE(m_ctx->IsElement(arc3)); EXPECT_TRUE(m_ctx->IsElement(arc4)); - std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 3)); + + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime / 2)); EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); EXPECT_TRUE(m_ctx->IsElement(nodeAddr3)); @@ -1678,6 +1695,7 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl EXPECT_FALSE(m_ctx->IsElement(arc2)); EXPECT_FALSE(m_ctx->IsElement(arc3)); EXPECT_FALSE(m_ctx->IsElement(arc4)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); EXPECT_FALSE(m_ctx->IsElement(nodeAddr1)); EXPECT_FALSE(m_ctx->IsElement(nodeAddr2)); From ad6d130bb13f0112e7647a7cb15b9e26070c0671 Mon Sep 17 00:00:00 2001 From: NikitaZotov Date: Sat, 11 Jan 2025 15:15:50 +0300 Subject: [PATCH 17/18] [memory][core][events][tests] Leave TODO about providing causal consistency for agents responding to sc-events of erasing sc-elements --- sc-memory/sc-core/src/sc-store/sc_storage.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sc-memory/sc-core/src/sc-store/sc_storage.c b/sc-memory/sc-core/src/sc-store/sc_storage.c index 79901afd8..5be785d96 100644 --- a/sc-memory/sc-core/src/sc-store/sc_storage.c +++ b/sc-memory/sc-core/src/sc-store/sc_storage.c @@ -898,6 +898,11 @@ sc_result _sc_storage_element_erase_with_incoming_outgoing_connectors( // then cache source/target to try to erase them with their incoming/outgoing connectors if (sc_type_is_connector(type) && !*does_branch_have_emitted_events) { + // TODO(NikitaZotov): Provide causal consistency for agents responding to sc-events of erasing sc-elements occurring + // within the same semantic neighbourhood. It should be that 1) agents, reacted to sc-event of erasing sc-elements, + // know about this sc-element until the end of their existence, 2) the agent, that reacted to sc-event of erasing + // sc-element, can view the entire semantic neighbourhood of this sc-element, even if some of sc-connectors in this + // neighbourhood is erased by another agent. if (SC_ADDR_IS_NOT_EQUAL(connector_chain_begin_addr, begin_addr)) _sc_storage_cache_elements_under_erasure_without_erase_events( begin_addr, incident_nodes_under_erasure, processed_connectors); From b375f960822f2d92931b9dcd066073d984b2d0d7 Mon Sep 17 00:00:00 2001 From: NikitaZotov Date: Sat, 11 Jan 2025 15:49:27 +0300 Subject: [PATCH 18/18] [memory][core][events][tests] Duplicate TODO in tests --- .../tests/sc-memory/units/events/test_sc_event.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp index 51693dba7..235cef6d1 100644 --- a/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp +++ b/sc-memory/sc-memory/tests/sc-memory/units/events/test_sc_event.cpp @@ -1663,7 +1663,13 @@ TEST_F(ScEventTest, SubscriptionForNodeAndTwoConnectorsErasureWithNodeFinishEarl EXPECT_TRUE(m_ctx->IsElement(nodeAddr1)); EXPECT_TRUE(m_ctx->IsElement(arc1)); EXPECT_TRUE(m_ctx->IsElement(nodeAddr2)); - EXPECT_TRUE(m_ctx->IsElement(arc3)); + + // TODO(NikitaZotov): Provide causal consistency for agents responding to sc-events of erasing sc-elements + // occurring within the same semantic neighbourhood. It should be that 1) agents, reacted to sc-event of + // erasing sc-elements, know about this sc-element until the end of their existence, 2) the agent, that + // reacted to sc-event of erasing sc-element, can view the entire semantic neighbourhood of this sc-element, + // even if some of sc-connectors in this neighbourhood is erased by another agent. + // EXPECT_TRUE(m_ctx->IsElement(arc3)); EXPECT_FALSE(isLongExecutedSubscriptionCalled); isLongExecutedSubscriptionCalled = true;