Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't erase elements if multiple subscriptions use them #425

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a989d5f
[memory][event] Don't erase elements if multiple subsciptions use them
kilativ-dotcom Oct 15, 2024
28247a4
[memory][event] Replace list with bool flag
kilativ-dotcom Oct 15, 2024
3ceb3bd
[memory][event] Reuse bool flag during recursion
kilativ-dotcom Oct 16, 2024
57b1f6a
[memory][event] Add check for emission_manager absence
kilativ-dotcom Oct 16, 2024
5c4804e
[memory][event][refactor] Remove space after &
kilativ-dotcom Oct 16, 2024
dcb5483
[memory][event][refactor] Remove space after &
kilativ-dotcom Oct 16, 2024
e6c443d
[tests] Add pending for initiation condition generation
kilativ-dotcom Oct 29, 2024
ea5b29b
[tests] Rename for more clear understanding
kilativ-dotcom Oct 29, 2024
7a8f4c4
[tests][event] Use pending for arc assignment
kilativ-dotcom Oct 30, 2024
c5d8743
[tests] Don't shadow local variable
kilativ-dotcom Oct 30, 2024
9a97df2
[events][tests] Erase incident nodes without event callbacks
kilativ-dotcom Nov 15, 2024
bacd431
[event][tests] Don't erase elements that were erased
kilativ-dotcom Nov 29, 2024
969e37d
[event] Release read monitor before source and target processing
kilativ-dotcom Nov 29, 2024
b6c6c04
[tests][event] Reorder monitor acquire
kilativ-dotcom Dec 6, 2024
a069701
[memory][core][events] Fix syncing removing subscription and emmiting…
NikitaZotov Jan 11, 2025
172ddc8
[memory][core][events][tests] Fix checking timeouts for processes in …
NikitaZotov Jan 11, 2025
ad6d130
[memory][core][events][tests] Leave TODO about providing causal consi…
NikitaZotov Jan 11, 2025
b375f96
[memory][core][events][tests] Duplicate TODO in tests
NikitaZotov Jan 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sc-memory/sc-core/include/sc-core/sc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ typedef sc_uint16 sc_type;
typedef sc_uint16 sc_states;

# define SC_STATE_REQUEST_ERASURE 0x1
# define SC_STATE_IS_ERASABLE 0x200
# define SC_STATE_IS_UNDER_ERASURE 0x200
# define SC_STATE_ELEMENT_EXIST 0x2

// results
Expand Down
4 changes: 2 additions & 2 deletions sc-memory/sc-core/src/sc-store/sc-event/sc_event_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ sc_result sc_event_notify_element_deleted(sc_addr addr);
* @param subscription_addr sc-addr of element that emitting event
* @param event_type_addr Emitting event type
* @param connector_addr A sc-address of added/removed sc-connector (just for specified events)
* @param edge_type A sc-type of added/removed sc-connector (just for specified events)
* @param connector_type A sc-type of added/removed sc-connector (just for specified events)
* @param other_addr A sc-address of the second sc-element of sc-connector. If \p subscription_addr is a source, then \p
* other_addr is a target. If \p subscription_addr is a target, then \p other_addr is a source.
* @param callback A pointer function that is executed after the execution of a function that was called on the
Expand All @@ -63,7 +63,7 @@ sc_result sc_event_emit(
sc_addr subscription_addr,
sc_event_type event_type_addr,
sc_addr connector_addr,
sc_type edge_type,
sc_type connector_type,
sc_addr other_addr,
sc_event_do_after_callback callback,
sc_addr event_addr);
Expand Down
113 changes: 93 additions & 20 deletions sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@

#include "sc_event_queue.h"

#include <unistd.h>

#include "sc-core/sc_event_subscription.h"
#include "sc_event_private.h"

#include "sc-store/sc_storage.h"
#include "sc-store/sc_storage_private.h"
#include "sc_memory_private.h"
#include "sc-core/sc_memory.h"
#include "sc-core/sc_keynodes.h"

#include "sc-core/sc-base/sc_allocator.h"

Expand Down Expand Up @@ -64,22 +67,22 @@
void _sc_event_emission_pool_worker(sc_pointer data, sc_pointer user_data)
{
sc_event * event = (sc_event *)data;
sc_event_emission_manager * queue = user_data;
sc_event_emission_manager * manager = user_data;

sc_event_subscription * event_subscription = event->event_subscription;
if (event_subscription == null_ptr)
goto destroy;

sc_monitor_acquire_read(&queue->destroy_monitor);
sc_monitor_acquire_read(&manager->destroy_monitor);

if (queue->running == SC_FALSE)
if (manager->running == SC_FALSE)
goto end;

sc_monitor_acquire_read(&event_subscription->monitor);
if (sc_event_subscription_is_deletable(event_subscription))
{
sc_monitor_release_read(&event_subscription->monitor);
goto end;
goto destroy;
}

sc_event_callback callback = event_subscription->callback;
Expand All @@ -95,12 +98,40 @@

sc_storage_end_new_process();

if (SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_connector_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_incoming_arc_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_outgoing_arc_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr))
{
sc_monitor_acquire_write(&manager->emitted_erase_events_monitor);
sc_addr key = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)
? event->event_addr
: event->connector_addr;
sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get(
manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)));
if (count != 0)
{
--count;
if (count == 0)
sc_hash_table_remove(manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)));
else
sc_hash_table_insert(
manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key)), GUINT_TO_POINTER(count));
}
sc_monitor_release_write(&manager->emitted_erase_events_monitor);
}

sc_monitor_release_read(&event_subscription->monitor);

end:
sc_monitor_release_read(&queue->destroy_monitor);
sc_monitor_release_read(&manager->destroy_monitor);
destroy:
{
sc_monitor_acquire_write(&manager->emitted_erase_events_monitor);
--manager->current_emitted_events_count;
sc_monitor_release_write(&manager->emitted_erase_events_monitor);

if (event->callback != null_ptr)
{
sc_memory_context * ctx = sc_memory_context_new_ext(event->user_addr);
Expand All @@ -112,6 +143,16 @@
}
}

guint emitted_events_hash_func(gconstpointer pointer)
{
return GPOINTER_TO_UINT(pointer);
}

gboolean emitted_events_equal_func(gconstpointer a, gconstpointer b)
{
return (a == b);
}

void sc_event_emission_manager_initialize(sc_event_emission_manager ** manager, sc_memory_params const * params)
{
*manager = sc_mem_new(sc_event_emission_manager, 1);
Expand All @@ -131,9 +172,12 @@
}

(*manager)->running = SC_TRUE;
sc_monitor_init(&(*manager)->destroy_monitor);

sc_monitor_init(&(*manager)->pool_monitor);
(*manager)->current_emitted_events_count = 0;
(*manager)->emitted_erase_events =
sc_hash_table_init(emitted_events_hash_func, emitted_events_equal_func, null_ptr, null_ptr);
sc_monitor_init(&(*manager)->emitted_erase_events_monitor);
(*manager)->thread_pool = g_thread_pool_new(
_sc_event_emission_pool_worker,
*manager,
Expand All @@ -147,32 +191,35 @@
if (manager == null_ptr)
return;

sc_bool is_running = SC_FALSE;

sc_monitor_acquire_read(&manager->destroy_monitor);
is_running = manager->running;
sc_monitor_release_read(&manager->destroy_monitor);

if (is_running)
{
sc_monitor_acquire_write(&manager->destroy_monitor);
if (manager->running)
manager->running = SC_FALSE;
sc_monitor_release_write(&manager->destroy_monitor);
}
}

void sc_event_emission_manager_shutdown(sc_event_emission_manager * manager)
{
if (manager == null_ptr)
return;

// Acquire write lock once for all operations
sc_monitor_acquire_write(&manager->destroy_monitor);
sc_monitor_acquire_write(&manager->pool_monitor);
if (manager->thread_pool)
{
g_thread_pool_free(manager->thread_pool, SC_FALSE, SC_TRUE);
manager->thread_pool = null_ptr;
}

// Wait for current events to finish with a timeout
sc_int32 const MAX_WAIT_ITERATIONS = 1000;
sc_int32 wait_iterations = 0;
while (manager->current_emitted_events_count > 0 && wait_iterations < MAX_WAIT_ITERATIONS)
{
sc_monitor_release_write(&manager->destroy_monitor);
usleep(10);
sc_monitor_acquire_write(&manager->destroy_monitor);
wait_iterations++;

Check warning on line 220 in sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c

View check run for this annotation

Codecov / codecov/patch

sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c#L217-L220

Added lines #L217 - L220 were not covered by tests
}

while (!sc_queue_empty(&manager->deletable_events_subscriptions))
{
sc_event_subscription * event_subscription = sc_queue_pop(&manager->deletable_events_subscriptions);
Expand All @@ -185,6 +232,7 @@

sc_monitor_destroy(&manager->pool_monitor);
sc_monitor_destroy(&manager->destroy_monitor);
sc_hash_table_destroy(manager->emitted_erase_events);
sc_mem_free(manager);
}

Expand All @@ -198,13 +246,38 @@
sc_event_do_after_callback callback,
sc_addr event_addr)
{
if (manager == null_ptr)
return;

sc_event * event =
_sc_event_new(event_subscription, user_addr, connector_addr, connector_type, other_addr, callback, event_addr);

sc_monitor_acquire_write(&manager->pool_monitor);
if (manager->thread_pool == null_ptr)
{
sc_monitor_release_write(&manager->pool_monitor);
return;

Check warning on line 256 in sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c

View check run for this annotation

Codecov / codecov/patch

sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.c#L255-L256

Added lines #L255 - L256 were not covered by tests
}

if (SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_connector_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_incoming_arc_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_outgoing_arc_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_edge_addr)
|| SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr))
{
sc_monitor_acquire_write(&manager->emitted_erase_events_monitor);
sc_addr key_addr = SC_ADDR_IS_EQUAL(event_subscription->event_type_addr, sc_event_before_erase_element_addr)
? event_subscription->subscription_addr
: connector_addr;
sc_uint32 count = (sc_uint32)(sc_uint64)sc_hash_table_get(
manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key_addr)));
++count;
sc_hash_table_insert(
manager->emitted_erase_events, GUINT_TO_POINTER(SC_ADDR_LOCAL_TO_INT(key_addr)), GUINT_TO_POINTER(count));
sc_monitor_release_write(&manager->emitted_erase_events_monitor);
}

sc_monitor_acquire_write(&manager->emitted_erase_events_monitor);
++manager->current_emitted_events_count;
sc_monitor_release_write(&manager->emitted_erase_events_monitor);

g_thread_pool_push(manager->thread_pool, event, null_ptr);
sc_monitor_release_write(&manager->pool_monitor);
}
5 changes: 5 additions & 0 deletions sc-memory/sc-core/src/sc-store/sc-event/sc_event_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ typedef struct
sc_monitor destroy_monitor; ///< Monitor for synchronizing access to the destruction process.
GThreadPool * thread_pool; ///< Thread pool used for worker threads processing events.
sc_monitor pool_monitor; ///< Monitor for synchronizing access to the thread pool.
sc_uint32 current_emitted_events_count; ///< Current count of emitted events.
sc_hash_table * emitted_erase_events; ///< Table that stores amount of active event subscriptions that were initiated
///< due to erasure of sc-element which sc-addr is stored as a key.
sc_monitor emitted_erase_events_monitor; ///< Monitor for synchronizing current_emitted_events_count and
///< emitted_erase_events.
} sc_event_emission_manager;

/*! Function that initializes an sc-event emission manager.
Expand Down
51 changes: 24 additions & 27 deletions sc-memory/sc-core/src/sc-store/sc_event_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,30 @@
sc_event_subscription_manager * manager,
sc_event_subscription * event_subscription)
{
sc_result result = SC_RESULT_OK;
sc_hash_table_list * element_events_list = null_ptr;

// the first, if table doesn't exist, then return error
if (manager == null_ptr)
return SC_RESULT_NO;

sc_monitor_acquire_write(&manager->events_table_monitor);

if (manager->events_table == null_ptr)
{
sc_monitor_release_write(&manager->events_table_monitor);
return SC_RESULT_NO;
result = SC_RESULT_OK;
goto result;

Check warning on line 65 in sc-memory/sc-core/src/sc-store/sc_event_subscription.c

View check run for this annotation

Codecov / codecov/patch

sc-memory/sc-core/src/sc-store/sc_event_subscription.c#L64-L65

Added lines #L64 - L65 were not covered by tests
}

// if there are no events for specified sc-element, then generate new events list
// if there are no events for specified sc-element, then create new events list
element_events_list =
(sc_hash_table_list *)sc_hash_table_get(manager->events_table, TABLE_KEY(event_subscription->subscription_addr));
element_events_list = sc_hash_table_list_append(element_events_list, (sc_pointer)event_subscription);
sc_hash_table_insert(
manager->events_table, TABLE_KEY(event_subscription->subscription_addr), (sc_pointer)element_events_list);

result:
sc_monitor_release_write(&manager->events_table_monitor);

return SC_RESULT_OK;
return result;
}

/*! Removes the specified sc-event_subscription from the registration manager's events table.
Expand All @@ -86,6 +86,7 @@
sc_event_subscription_manager * manager,
sc_event_subscription * event_subscription)
{
sc_result result = SC_RESULT_OK;
sc_hash_table_list * element_events_list = null_ptr;

// the first, if table doesn't exist, then return error
Expand All @@ -96,10 +97,10 @@
element_events_list =
(sc_hash_table_list *)sc_hash_table_get(manager->events_table, TABLE_KEY(event_subscription->subscription_addr));
if (element_events_list == null_ptr)
goto error;

if (manager->events_table == null_ptr)
goto error;
{
result = SC_RESULT_ERROR_INVALID_PARAMS;
goto result;
}

// remove event_subscription from list of events for specified sc-element
element_events_list = sc_hash_table_list_remove(element_events_list, (sc_const_pointer)event_subscription);
Expand All @@ -109,11 +110,9 @@
sc_hash_table_insert(
manager->events_table, TABLE_KEY(event_subscription->subscription_addr), (sc_pointer)element_events_list);

result:
sc_monitor_release_write(&manager->events_table_monitor);
return SC_RESULT_OK;
error:
sc_monitor_release_write(&manager->events_table_monitor);
return SC_RESULT_ERROR_INVALID_PARAMS;
return result;
}

void sc_event_subscription_manager_initialize(sc_event_subscription_manager ** manager)
Expand Down Expand Up @@ -205,7 +204,12 @@
return SC_RESULT_NO;

sc_event_subscription_manager * subscription_manager = sc_storage_get_event_subscription_manager();
if (subscription_manager == null_ptr)
return SC_RESULT_NO;

sc_event_emission_manager * emission_manager = sc_storage_get_event_emission_manager();
if (subscription_manager == null_ptr)
return SC_RESULT_NO;

Check warning on line 212 in sc-memory/sc-core/src/sc-store/sc_event_subscription.c

View check run for this annotation

Codecov / codecov/patch

sc-memory/sc-core/src/sc-store/sc_event_subscription.c#L212

Added line #L212 was not covered by tests

sc_monitor_acquire_write(&event_subscription->monitor);
if (_sc_event_subscription_manager_remove(subscription_manager, event_subscription) != SC_RESULT_OK)
Expand Down Expand Up @@ -250,16 +254,11 @@
if (subscription_manager == null_ptr || subscription_manager->events_table == null_ptr)
goto result;

// TODO(NikitaZotov): Implement monitor for `subscription_manager` to synchronize its freeing.
// lookup for all registered to specified sc-element events
sc_monitor_acquire_write(&subscription_manager->events_table_monitor);
if (subscription_manager != null_ptr)
{
element_events_list =
(sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(element));
if (element_events_list != null_ptr)
sc_hash_table_remove(subscription_manager->events_table, TABLE_KEY(element));
}
element_events_list = (sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(element));
if (element_events_list != null_ptr)
sc_hash_table_remove(subscription_manager->events_table, TABLE_KEY(element));

if (element_events_list != null_ptr)
{
Expand Down Expand Up @@ -304,7 +303,7 @@

if (_sc_memory_context_are_events_pending(ctx))
{
_sc_memory_context_pend_event(ctx, event_type_addr, subscription_addr, connector_addr, connector_type, other_addr);
_sc_memory_context_pend_event(ctx, subscription_addr, event_type_addr, connector_addr, connector_type, other_addr);
return SC_RESULT_OK;
}

Expand Down Expand Up @@ -333,12 +332,10 @@
if (subscription_manager == null_ptr || subscription_manager->events_table == null_ptr)
goto result;

// TODO(NikitaZotov): Implement monitor for `subscription_manager` to synchronize its freeing.
// lookup for all registered to specified sc-element events
sc_monitor_acquire_read(&subscription_manager->events_table_monitor);
if (subscription_manager != null_ptr)
element_events_list =
(sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(subscription_addr));
element_events_list =
(sc_hash_table_list *)sc_hash_table_get(subscription_manager->events_table, TABLE_KEY(subscription_addr));

while (element_events_list != null_ptr)
{
Expand Down
Loading
Loading