Skip to content

Commit

Permalink
Fix merge issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ChadliaJerad committed Jan 31, 2025
1 parent c022317 commit 57ba24d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 48 deletions.
1 change: 1 addition & 0 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void invalidate_min_delays() {
node->flags = 0; // All flags cleared because they get set lazily.
}
free(rti_common->min_delays);
rti_common->min_delays = NULL;
}
}

Expand Down
86 changes: 40 additions & 46 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ static void notify_grant_delayed(federate_info_t* fed, tag_t tag, bool is_provis
*/
static int get_num_absent_upstream_transients(federate_info_t* fed) {
int num_absent_upstream_transients = 0;
for (int j = 0; j < fed->enclave.num_upstream; j++) {
federate_info_t* upstream = GET_FED_INFO(fed->enclave.upstream[j]);
for (int j = 0; j < fed->enclave.num_immediate_upstreams; j++) {
federate_info_t* upstream = GET_FED_INFO(fed->enclave.immediate_upstreams[j]);
// Ignore this enclave if it no longer connected.
if ((upstream->enclave.state == NOT_CONNECTED) && (upstream->is_transient)) {
num_absent_upstream_transients++;
Expand Down Expand Up @@ -275,8 +275,8 @@ static void notify_federate_disconnected(federate_info_t* fed) {
// Notify downstream federates. Need to hold the mutex lock to do this.
if (fed->is_transient) {
LF_MUTEX_LOCK(&rti_mutex);
for (int j = 0; j < fed->enclave.num_downstream; j++) {
federate_info_t* downstream = GET_FED_INFO(fed->enclave.downstream[j]);
for (int j = 0; j < fed->enclave.num_immediate_downstreams; j++) {
federate_info_t* downstream = GET_FED_INFO(fed->enclave.immediate_downstreams[j]);
// Ignore this enclave if it no longer connected.
if (downstream->enclave.state != NOT_CONNECTED) {
// Notify the downstream enclave.
Expand Down Expand Up @@ -582,9 +582,9 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
// message from the socket, and return.
federate_info_t* fed = GET_FED_INFO(federate_id);
interval_t delay = NEVER;
for (int i = 0; i < fed->enclave.num_upstream; i++) {
if (fed->enclave.upstream[i] == sending_federate->enclave.id) {
delay = fed->enclave.upstream_delay[i];
for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) {
if (fed->enclave.immediate_upstreams[i] == sending_federate->enclave.id) {
delay = fed->enclave.immediate_upstream_delays[i];
break;
}
}
Expand Down Expand Up @@ -996,8 +996,8 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_
// Notify my_fed of any upstream transient federates that are connected.
// This has to occur before sending the start tag so that my_fed does not begin executing thinking that these
// upstream federates are not connected.
for (int i = 0; i < my_fed->enclave.num_upstream; i++) {
federate_info_t* fed = GET_FED_INFO(my_fed->enclave.upstream[i]);
for (int i = 0; i < my_fed->enclave.num_immediate_upstreams; i++) {
federate_info_t* fed = GET_FED_INFO(my_fed->enclave.immediate_upstreams[i]);
if (fed->is_transient && fed->enclave.state == GRANTED) {
send_upstream_connected_locked(my_fed, fed);
}
Expand Down Expand Up @@ -1029,8 +1029,8 @@ static void send_start_tag_locked(federate_info_t* my_fed, instant_t federation_

// If this is a transient federate, notify its downstream federates that it is now connected.
if (my_fed->is_transient) {
for (int i = 0; i < my_fed->enclave.num_downstream; i++) {
send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.downstream[i]), my_fed);
for (int i = 0; i < my_fed->enclave.num_immediate_downstreams; i++) {
send_upstream_connected_locked(GET_FED_INFO(my_fed->enclave.immediate_downstreams[i]), my_fed);
}
}
}
Expand Down Expand Up @@ -1122,8 +1122,8 @@ void handle_timestamp(federate_info_t* my_fed) {
}

// Condition 4. Iterate over the downstream federates
for (int j = 0; j < my_fed->enclave.num_downstream; j++) {
federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.downstream[j]);
for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) {
federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]);

// Get the max over the TAG of the downstreams
if (lf_tag_compare(downstream->enclave.last_granted, my_fed->effective_start_tag) >= 0) {
Expand All @@ -1144,8 +1144,8 @@ void handle_timestamp(federate_info_t* my_fed) {
// because the effective_start_tag is sent while still holding the mutex.

// Iterate over the messages from the upstream federates
for (int j = 0; j < my_fed->enclave.num_upstream; j++) {
federate_info_t* upstream = GET_FED_INFO(my_fed->enclave.upstream[j]);
for (int j = 0; j < my_fed->enclave.num_immediate_upstreams; j++) {
federate_info_t* upstream = GET_FED_INFO(my_fed->enclave.immediate_upstreams[j]);

size_t queue_size = pqueue_tag_size(upstream->in_transit_message_tags);
if (queue_size != 0) {
Expand All @@ -1163,8 +1163,8 @@ void handle_timestamp(federate_info_t* my_fed) {
// FIXME: Should this be higher-than or equal to?
// FIXME: Also, won't the grant simply be lost?
// If the joining federate doesn't send anything, the downstream federate won't issue another NET.
for (int j = 0; j < my_fed->enclave.num_downstream; j++) {
federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.downstream[j]);
for (int j = 0; j < my_fed->enclave.num_immediate_downstreams; j++) {
federate_info_t* downstream = GET_FED_INFO(my_fed->enclave.immediate_downstreams[j]);

// Ignore this federate if it has resigned.
if (downstream->enclave.state == NOT_CONNECTED) {
Expand All @@ -1189,11 +1189,8 @@ void handle_timestamp(federate_info_t* my_fed) {

// Whenver a transient joins, invalidate all federates, so that all min_delays_upstream
// get re-computed.
// FIXME: Needs to be optimized to only invalidate those affected by the transient
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
federate_info_t* fed = GET_FED_INFO(i);
invalidate_min_delays_upstream(&(fed->enclave));
}
// FIXME: Maybe optimize it to only invalidate those affected by the transient
invalidate_min_delays();

LF_MUTEX_UNLOCK(&rti_mutex);
}
Expand Down Expand Up @@ -1639,12 +1636,6 @@ static int32_t receive_and_check_fed_id_message(int* socket_id) {
send_reject(socket_id, FEDERATE_ID_OUT_OF_RANGE);
return -1;
} else {
// Find out if it is a new connection or a hot swap.
// Reject if:
// - duplicate of a connected persistent federate
// - or hot_swap is already in progress (Only 1 hot swap at a time!), for that
// particular federate
// - or it is a hot swap, but it is not the execution phase yet
// Find out if it is a new connection or a hot swap.
// Reject if:
// - duplicate of a connected persistent federate
Expand Down Expand Up @@ -1839,22 +1830,22 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) {
// Now, compare the previous and the new neighberhood structure
// Start with the number of upstreams and downstreams
bool reject = false;
if ((fed->enclave.num_upstream != temp_fed->enclave.num_upstream) ||
(fed->enclave.num_downstream != temp_fed->enclave.num_downstream)) {
if ((fed->enclave.num_immediate_upstreams != temp_fed->enclave.num_immediate_upstreams) ||
(fed->enclave.num_immediate_downstreams != temp_fed->enclave.num_immediate_downstreams)) {
reject = true;
} else {
// Then check all upstreams and their delays
for (int i = 0; i < fed->enclave.num_upstream; i++) {
if ((fed->enclave.upstream[i] != temp_fed->enclave.upstream[i]) ||
(fed->enclave.upstream_delay[i] != temp_fed->enclave.upstream_delay[i])) {
for (int i = 0; i < fed->enclave.num_immediate_upstreams; i++) {
if ((fed->enclave.immediate_upstreams[i] != temp_fed->enclave.immediate_upstreams[i]) ||
(fed->enclave.immediate_upstream_delays[i] != temp_fed->enclave.immediate_upstream_delays[i])) {
reject = true;
break;
}
}
if (!reject) {
// Finally, check all downstream federates
for (int i = 0; i < fed->enclave.num_downstream; i++) {
if (fed->enclave.downstream[i] != temp_fed->enclave.downstream[i]) {
for (int i = 0; i < fed->enclave.num_immediate_downstreams; i++) {
if (fed->enclave.immediate_downstreams[i] != temp_fed->enclave.immediate_downstreams[i]) {
reject = true;
break;
}
Expand Down Expand Up @@ -2322,7 +2313,10 @@ void reset_transient_federate(federate_info_t* fed) {
fed->server_port = -1;
fed->requested_stop = false;
fed->effective_start_tag = NEVER_TAG;
// invalidate_all_min_delays();
// Whenver a transient resigns or leaves, invalidate all federates, so that all min_delays_upstream
// get re-computed.
// FIXME: Maybe optimize it to only invalidate those affected by the transient
invalidate_min_delays();
}

int32_t start_rti_server(uint16_t port) {
Expand Down Expand Up @@ -2356,8 +2350,8 @@ int32_t start_rti_server(uint16_t port) {
static int set_has_upstream_transient_federates_parameter_and_check() {
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
federate_info_t* fed = GET_FED_INFO(i);
for (int j = 0; j < fed->enclave.num_upstream; j++) {
federate_info_t* upstream_fed = GET_FED_INFO(fed->enclave.upstream[j]);
for (int j = 0; j < fed->enclave.num_immediate_upstreams; j++) {
federate_info_t* upstream_fed = GET_FED_INFO(fed->enclave.immediate_upstreams[j]);
if (upstream_fed->is_transient) {
fed->has_upstream_transient_federates = true;
break;
Expand Down Expand Up @@ -2514,14 +2508,14 @@ void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number
for (uint16_t i = 0; i < number_of_scheduling_nodes; i++) {
// FIXME: Gives error freeing memory not allocated!!!!
scheduling_node_t* node = scheduling_nodes[i];
if (node->immediate_upstreams != NULL) {
free(node->immediate_upstreams);
free(node->immediate_upstream_delays);
}
if (node->immediate_downstreams != NULL) {
free(node->immediate_downstreams);
}
free(node);
if (node->immediate_upstreams != NULL) {
free(node->immediate_upstreams);
free(node->immediate_upstream_delays);
}
if (node->immediate_downstreams != NULL) {
free(node->immediate_downstreams);
}
free(node);
}
free(scheduling_nodes);
}
Expand Down
4 changes: 2 additions & 2 deletions include/core/federated/network/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -653,14 +653,14 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* A message the informs a downstream federate that a federate upstream of it
* is connected. The next 2 bytes are the federate ID of the upstream federate.
*/
#define MSG_TYPE_UPSTREAM_CONNECTED 26
#define MSG_TYPE_UPSTREAM_CONNECTED 27
#define MSG_TYPE_UPSTREAM_CONNECTED_LENGTH (1 + sizeof(uint16_t))

/**
* A message the informs a downstream federate that a federate upstream of it
* is no longer connected. The next 2 bytes are the federate ID of the upstream federate.
*/
#define MSG_TYPE_UPSTREAM_DISCONNECTED 27
#define MSG_TYPE_UPSTREAM_DISCONNECTED 28
#define MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH (1 + sizeof(uint16_t))

/**
Expand Down

0 comments on commit 57ba24d

Please sign in to comment.