Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shutdown_socket() function for follow up of #505 #506

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0af50a5
Add shutdown_socket()
Jakio815 Dec 20, 2024
e5129c5
Use shutdown_socket() in rti_remote.c and federate.c
Jakio815 Dec 20, 2024
1b00ed3
Fix comments.
Jakio815 Dec 20, 2024
de2fbde
Fix port type && change to print_log on shutdown failures && Refactor…
Jakio815 Dec 21, 2024
9ea9f1d
Refactor close_outbound_socket().
Jakio815 Dec 21, 2024
d9a67a2
Merge branch 'refactor-only-comm-type' into shutdown
Jakio815 Dec 21, 2024
996896c
Add comment to shutdown_socket function.
Jakio815 Dec 21, 2024
e3b0ea1
Add commnets.
Jakio815 Dec 21, 2024
30a9446
Properly replace close() to shutdown_socket()
Jakio815 Dec 21, 2024
276d7b4
Minor fix.
Jakio815 Dec 21, 2024
93adf2e
Minor fix.
Jakio815 Dec 21, 2024
d27c743
Properly close federate's socket connected to the RTI.
Jakio815 Dec 21, 2024
ef3345b
Minor fix.
Jakio815 Dec 21, 2024
311205d
Merge branch 'main' of github.com:lf-lang/reactor-c into shutdown
Jakio815 Jan 8, 2025
c0d59db
Fix merge error.
Jakio815 Jan 13, 2025
96b146a
Fix rti_socket check.
Jakio815 Jan 16, 2025
e93f266
Add socket == -1 checking.
Jakio815 Jan 22, 2025
0f89b4c
Remove returning -1 on shutdown fail.
Jakio815 Jan 24, 2025
aa6fe33
Merge branch 'shutdown' of github.com:lf-lang/reactor-c into shutdown
Jakio815 Jan 24, 2025
0c7be7f
Merge branch 'main' of github.com:lf-lang/reactor-c into shutdown
Jakio815 Jan 24, 2025
62354ab
Formatting.
Jakio815 Jan 24, 2025
6b8bd91
Merge branch 'main' of github.com:lf-lang/reactor-c into shutdown
Jakio815 Jan 24, 2025
1edd1e3
Fix comments.
Jakio815 Jan 24, 2025
8334783
Fix to return -1 on failure of tagged_message from federate.
Jakio815 Jan 25, 2025
048a5b3
Fix missing comment.
Jakio815 Jan 25, 2025
5d34858
Revert close_inbound_socket to read false.
Jakio815 Jan 25, 2025
7e7d912
Merge branch 'main' of github.com:lf-lang/reactor-c into shutdown
Jakio815 Jan 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 7 additions & 44 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -871,14 +871,7 @@ static void handle_federate_failed(federate_info_t* my_fed) {
// Indicate that there will no further events from this federate.
my_fed->enclave.next_event = FOREVER_TAG;

// According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket,
// the close should happen when receiving a 0 length message from the other end.
// Here, we just signal the other side that no further writes to the socket are
// forthcoming, which should result in the other end getting a zero-length reception.
shutdown(my_fed->socket, SHUT_RDWR);

// We can now safely close the socket.
close(my_fed->socket); // from unistd.h
shutdown_socket(&my_fed->socket, false);

// Check downstream federates to see whether they should now be granted a TAG.
// To handle cycles, need to create a boolean array to keep
Expand Down Expand Up @@ -917,21 +910,7 @@ static void handle_federate_resign(federate_info_t* my_fed) {
// Indicate that there will no further events from this federate.
my_fed->enclave.next_event = FOREVER_TAG;

// According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket,
// the close should happen when receiving a 0 length message from the other end.
// Here, we just signal the other side that no further writes to the socket are
// forthcoming, which should result in the other end getting a zero-length reception.
shutdown(my_fed->socket, SHUT_WR);

// Wait for the federate to send an EOF or a socket error to occur.
// Discard any incoming bytes. Normally, this read should return 0 because
// the federate is resigning and should itself invoke shutdown.
unsigned char buffer[10];
while (read(my_fed->socket, buffer, 10) > 0)
;

// We can now safely close the socket.
close(my_fed->socket); // from unistd.h
shutdown_socket(&my_fed->socket, true);

// Check downstream federates to see whether they should now be granted a TAG.
// To handle cycles, need to create a boolean array to keep
Expand Down Expand Up @@ -1030,9 +1009,7 @@ void send_reject(int* socket_id, unsigned char error_code) {
lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket.");
}
// Close the socket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: how about adding more information for this call? For example, "Close the socket without reading until EOF."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I fixed it as suggested.

shutdown(*socket_id, SHUT_RDWR);
close(*socket_id);
*socket_id = -1;
shutdown_socket(socket_id, false);
LF_MUTEX_UNLOCK(&rti_mutex);
}

Expand Down Expand Up @@ -1420,9 +1397,7 @@ void lf_connect_to_federates(int socket_descriptor) {
if (!authenticate_federate(&socket_id)) {
lf_print_warning("RTI failed to authenticate the incoming federate.");
// Close the socket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I fixed it as suggested.

shutdown(socket_id, SHUT_RDWR);
close(socket_id);
socket_id = -1;
shutdown_socket(&socket_id, false);
// Ignore the federate that failed authentication.
i--;
continue;
Expand Down Expand Up @@ -1490,8 +1465,7 @@ void* respond_to_erroneous_connections(void* nothing) {
lf_print_warning("RTI failed to write FEDERATION_ID_DOES_NOT_MATCH to erroneous incoming connection.");
}
// Close the socket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I fixed it as suggested.

shutdown(socket_id, SHUT_RDWR);
close(socket_id);
shutdown_socket(&socket_id, false);
}
return NULL;
}
Expand Down Expand Up @@ -1554,21 +1528,10 @@ void wait_for_federates(int socket_descriptor) {
// Shutdown and close the socket that is listening for incoming connections
// so that the accept() call in respond_to_erroneous_connections returns.
// That thread should then check rti->all_federates_exited and it should exit.
if (shutdown(socket_descriptor, SHUT_RDWR)) {
LF_PRINT_LOG("On shut down TCP socket, received reply: %s", strerror(errno));
}
// NOTE: In all common TCP/IP stacks, there is a time period,
// typically between 30 and 120 seconds, called the TIME_WAIT period,
// before the port is released after this close. This is because
// the OS is preventing another program from accidentally receiving
// duplicated packets intended for this program.
close(socket_descriptor);
shutdown_socket(&socket_descriptor, false);

if (rti_remote->socket_descriptor_UDP > 0) {
if (shutdown(rti_remote->socket_descriptor_UDP, SHUT_RDWR)) {
LF_PRINT_LOG("On shut down UDP socket, received reply: %s", strerror(errno));
}
close(rti_remote->socket_descriptor_UDP);
shutdown_socket(&rti_remote->socket_descriptor_UDP, false);
}
}

Expand Down
61 changes: 19 additions & 42 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,28 +403,15 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen

/**
* Close the socket that receives incoming messages from the
* specified federate ID. This function should be called when a read
* of incoming socket fails or when an EOF is received.
* It can also be called when the receiving end wants to stop communication,
* in which case, flag should be 1.
* specified federate ID.
*
* @param fed_id The ID of the peer federate sending messages to this
* federate.
* @param flag 0 if an EOF was received, -1 if a socket error occurred, 1 otherwise.
*/
static void close_inbound_socket(int fed_id, int flag) {
static void close_inbound_socket(int fed_id) {
LF_MUTEX_LOCK(&socket_mutex);
if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) {
if (flag >= 0) {
if (flag > 0) {
shutdown(_fed.sockets_for_inbound_p2p_connections[fed_id], SHUT_RDWR);
} else {
// Have received EOF from the other end. Send EOF to the other end.
shutdown(_fed.sockets_for_inbound_p2p_connections[fed_id], SHUT_WR);
}
}
close(_fed.sockets_for_inbound_p2p_connections[fed_id]);
_fed.sockets_for_inbound_p2p_connections[fed_id] = -1;
shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false);
}
LF_MUTEX_UNLOCK(&socket_mutex);
}
Expand Down Expand Up @@ -665,7 +652,7 @@ static int handle_tagged_message(int* socket, int fed_id) {
env->current_tag.time - start_time, env->current_tag.microstep, intended_tag.time - start_time,
intended_tag.microstep);
// Close socket, reading any incoming data and discarding it.
close_inbound_socket(fed_id, 1);
close_inbound_socket(fed_id);
} else {
// Need to use intended_tag here, not actual_tag, so that STP violations are detected.
// It will become actual_tag (that is when the reactions will be invoked).
Expand Down Expand Up @@ -827,33 +814,22 @@ static void* listen_to_federates(void* _args) {
* if _lf_normal_termination is true and otherwise proceeds without the lock.
* @param fed_id The ID of the peer federate receiving messages from this
* federate, or -1 if the RTI (centralized coordination).
* @param flag 0 if the socket has received EOF, 1 if not, -1 if abnormal termination.
*/
static void close_outbound_socket(int fed_id, int flag) {
static void close_outbound_socket(int fed_id) {
assert(fed_id >= 0 && fed_id < NUMBER_OF_FEDERATES);
// Close outbound connections, in case they have not closed themselves.
// This will result in EOF being sent to the remote federate, except for
// abnormal termination, in which case it will just close the socket.
if (_lf_normal_termination) {
LF_MUTEX_LOCK(&lf_outbound_socket_mutex);
}
if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) {
// Close the socket by sending a FIN packet indicating that no further writes
// are expected. Then read until we get an EOF indication.
if (flag >= 0) {
// SHUT_WR indicates no further outgoing messages.
shutdown(_fed.sockets_for_outbound_p2p_connections[fed_id], SHUT_WR);
if (flag > 0) {
// Have not received EOF yet. read until we get an EOF or error indication.
// This compensates for delayed ACKs and disabling of Nagles algorithm
// by delaying exiting until the shutdown is complete.
unsigned char message[32];
while (read(_fed.sockets_for_outbound_p2p_connections[fed_id], &message, 32) > 0)
;
}
if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) {
// Close the socket by sending a FIN packet indicating that no further writes
// are expected. Then read until we get an EOF indication.
shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[fed_id], true);
}
close(_fed.sockets_for_outbound_p2p_connections[fed_id]);
_fed.sockets_for_outbound_p2p_connections[fed_id] = -1;
}
if (_lf_normal_termination) {
LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex);
} else {
shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[fed_id], false);
}
}

Expand Down Expand Up @@ -1653,7 +1629,7 @@ void lf_terminate_execution(environment_t* env) {
LF_PRINT_DEBUG("Closing incoming P2P sockets.");
// Close any incoming P2P sockets that are still open.
for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {
close_inbound_socket(i, 1);
close_inbound_socket(i);
// Ignore errors. Mark the socket closed.
_fed.sockets_for_inbound_p2p_connections[i] = -1;
}
Expand All @@ -1667,8 +1643,7 @@ void lf_terminate_execution(environment_t* env) {
// Close outbound connections, in case they have not closed themselves.
// This will result in EOF being sent to the remote federate, except for
// abnormal termination, in which case it will just close the socket.
int flag = _lf_normal_termination ? 1 : -1;
close_outbound_socket(i, flag);
close_outbound_socket(i);
}

LF_PRINT_DEBUG("Waiting for inbound p2p socket listener threads.");
Expand Down Expand Up @@ -1943,9 +1918,11 @@ void lf_connect_to_rti(const char* hostname, int port) {

void lf_create_server(int specified_port) {
assert(specified_port <= UINT16_MAX && specified_port >= 0);
if (create_TCP_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, false)) {
uint16_t port;
if (create_TCP_server(specified_port, &_fed.server_socket, &port, false)) {
Jakio815 marked this conversation as resolved.
Show resolved Hide resolved
lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno));
};
_fed.server_port = (int)port;
LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port);

// Send the server port number to the RTI
Expand Down
49 changes: 40 additions & 9 deletions core/federated/network/socket_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static int create_server(uint16_t port, int* final_socket, uint16_t* final_port,
return -1;
}
set_socket_timeout_option(socket_descriptor, &timeout_time);
int used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry);
uint16_t used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry);
Jakio815 marked this conversation as resolved.
Show resolved Hide resolved
if (sock_type == 0) {
// Enable listening for socket connections.
// The second argument is the maximum number of queued socket requests,
Expand Down Expand Up @@ -313,10 +313,7 @@ int read_from_socket_close_on_error(int* socket, size_t num_bytes, unsigned char
// Read failed.
// Socket has probably been closed from the other side.
// Shut down and close the socket from this side.
shutdown(*socket, SHUT_RDWR);
close(*socket);
// Mark the socket closed.
*socket = -1;
shutdown_socket(socket, false);
return -1;
}
return 0;
Expand Down Expand Up @@ -383,10 +380,7 @@ int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char*
// Write failed.
// Socket has probably been closed from the other side.
// Shut down and close the socket from this side.
shutdown(*socket, SHUT_RDWR);
close(*socket);
// Mark the socket closed.
*socket = -1;
shutdown_socket(socket, false);
}
return result;
}
Expand All @@ -410,3 +404,40 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char*
}
}
}

int shutdown_socket(int* socket, bool read_before_closing) {
if (!read_before_closing) {
if (shutdown(*socket, SHUT_RDWR)) {
lf_print_log("On shutdown socket, received reply: %s", strerror(errno));
return -1;
}
} else {
// Signal the other side that no further writes are expected by sending a FIN packet.
// This indicates the write direction is closed. For more details, refer to:
// https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket
if (shutdown(*socket, SHUT_WR)) {
lf_print_log("Failed to shutdown socket: %s", strerror(errno));
return -1;
}

// Wait for the other side to send an EOF or encounter a socket error.
// Discard any incoming bytes. Normally, this read should return 0, indicating the peer has also closed the
// connection.
// This compensates for delayed ACKs and scenarios where Nagle's algorithm is disabled, ensuring the shutdown
// completes gracefully.
unsigned char buffer[10];
while (read(*socket, buffer, 10) > 0)
;
}
// NOTE: In all common TCP/IP stacks, there is a time period,
// typically between 30 and 120 seconds, called the TIME_WAIT period,
// before the port is released after this close. This is because
// the OS is preventing another program from accidentally receiving
// duplicated packets intended for this program.
if (close(*socket)) {
lf_print_log("Error while closing socket: %s\n", strerror(errno));
return -1;
}
*socket = -1;
return 0;
}
9 changes: 9 additions & 0 deletions include/core/federated/network/socket_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,13 @@ int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char*
void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex,
char* format, ...);

/**
* @brief Gracefully shuts down and closes a socket, optionally reading until EOF.
*
* @param socket Pointer to the socket descriptor to shutdown and close.
* @param read_before_closing If true, read until EOF before closing the socket.
* @return int Returns 0 on success, -1 on failure (errno will indicate the error).
*/
int shutdown_socket(int* socket, bool read_before_closing);

#endif /* SOCKET_COMMON_H */
Loading