From 7db2452238caece2d3a91e6cbed75324edccea7d Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Wed, 7 Aug 2024 15:29:27 -0700 Subject: [PATCH] Connection shutdown with buffered data (#482) --- include/aws/http/private/h1_connection.h | 10 +- source/h1_connection.c | 77 +++++++- tests/CMakeLists.txt | 4 + tests/test_h1_client.c | 214 +++++++++++++++++++++++ 4 files changed, 295 insertions(+), 10 deletions(-) diff --git a/include/aws/http/private/h1_connection.h b/include/aws/http/private/h1_connection.h index bff0695b..08a5f27a 100644 --- a/include/aws/http/private/h1_connection.h +++ b/include/aws/http/private/h1_connection.h @@ -15,6 +15,12 @@ # pragma warning(disable : 4214) /* nonstandard extension used: bit field types other than int */ #endif +enum aws_h1_connection_read_state { + AWS_CONNECTION_READ_OPEN, + AWS_CONNECTION_READ_SHUTTING_DOWN, + AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE, +}; + struct aws_h1_connection { struct aws_http_connection base; @@ -96,8 +102,10 @@ struct aws_h1_connection { uint64_t outgoing_stream_timestamp_ns; uint64_t incoming_stream_timestamp_ns; + int pending_shutdown_error_code; + enum aws_h1_connection_read_state read_state; + /* True when read and/or writing has stopped, whether due to errors or normal channel shutdown. */ - bool is_reading_stopped : 1; bool is_writing_stopped : 1; /* If true, the connection has upgraded to another protocol. diff --git a/source/h1_connection.c b/source/h1_connection.c index 29aa82fd..ab532c7d 100644 --- a/source/h1_connection.c +++ b/source/h1_connection.c @@ -142,7 +142,17 @@ static void s_stop( AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */ if (stop_reading) { - connection->thread_data.is_reading_stopped = true; + if (connection->thread_data.read_state == AWS_CONNECTION_READ_OPEN) { + connection->thread_data.read_state = AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE; + } else if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUTTING_DOWN) { + /* Shutdown after pending */ + if (connection->thread_data.pending_shutdown_error_code != 0) { + error_code = connection->thread_data.pending_shutdown_error_code; + } + connection->thread_data.read_state = AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE; + aws_channel_slot_on_handler_shutdown_complete( + connection->base.channel_slot, AWS_CHANNEL_DIR_READ, error_code, false); + } } if (stop_writing) { @@ -167,6 +177,7 @@ static void s_stop( aws_error_name(error_code)); aws_channel_shutdown(connection->base.channel_slot->channel, error_code); + if (stop_reading) { /* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the TLS * handler. */ @@ -324,7 +335,7 @@ static size_t s_calculate_stream_mode_desired_connection_window(struct aws_h1_co static int s_update_connection_window(struct aws_h1_connection *connection) { AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); - if (connection->thread_data.is_reading_stopped) { + if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) { return AWS_OP_SUCCESS; } @@ -778,7 +789,7 @@ static void s_set_incoming_stream_ptr( static void s_client_update_incoming_stream_ptr(struct aws_h1_connection *connection) { struct aws_linked_list *list = &connection->thread_data.stream_list; struct aws_h1_stream *desired; - if (connection->thread_data.is_reading_stopped) { + if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) { desired = NULL; } else if (aws_linked_list_empty(list)) { desired = NULL; @@ -1663,7 +1674,7 @@ static void s_handler_installed(struct aws_channel_handler *handler, struct aws_ static int s_try_process_next_midchannel_read_message(struct aws_h1_connection *connection, bool *out_stop_processing) { AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); AWS_ASSERT(connection->thread_data.has_switched_protocols); - AWS_ASSERT(!connection->thread_data.is_reading_stopped); + AWS_ASSERT(connection->thread_data.read_state != AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE); AWS_ASSERT(!aws_linked_list_empty(&connection->thread_data.read_buffer.messages)); *out_stop_processing = false; @@ -1839,7 +1850,7 @@ static int s_handler_process_read_message( AWS_LOGF_TRACE( AWS_LS_HTTP_CONNECTION, "id=%p: Incoming message of size %zu.", (void *)&connection->base, message_size); - if (connection->thread_data.is_reading_stopped) { + if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) { /* Read has stopped, ignore the data, shutdown the channel incase it has not started yet. */ aws_mem_release(message->allocator, message); /* Release the message as we return success. */ s_shutdown_due_to_error(connection, AWS_ERROR_HTTP_CONNECTION_CLOSED); @@ -1868,7 +1879,7 @@ static int s_handler_process_read_message( } void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *connection) { - + int error_code = 0; /* Protect against this function being called recursively. */ if (connection->thread_data.is_processing_read_messages) { return; @@ -1877,7 +1888,7 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne /* Process queued messages */ while (!aws_linked_list_empty(&connection->thread_data.read_buffer.messages)) { - if (connection->thread_data.is_reading_stopped) { + if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) { AWS_LOGF_ERROR( AWS_LS_HTTP_CONNECTION, "id=%p: Cannot process message because connection is shutting down.", @@ -1908,6 +1919,13 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne } } + if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUTTING_DOWN && + connection->thread_data.read_buffer.pending_bytes == 0) { + /* Done processing the pending buffer. */ + aws_raise_error(connection->thread_data.pending_shutdown_error_code); + goto shutdown; + } + /* Increment connection window, if necessary */ if (s_update_connection_window(connection)) { goto shutdown; @@ -1917,7 +1935,17 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne return; shutdown: - s_shutdown_due_to_error(connection, aws_last_error()); + error_code = aws_last_error(); + if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUTTING_DOWN && + connection->thread_data.pending_shutdown_error_code != 0) { + error_code = connection->thread_data.pending_shutdown_error_code; + } + if (error_code == 0) { + /* Graceful shutdown, don't stop writing yet. */ + s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, error_code); + } else { + s_shutdown_due_to_error(connection, aws_last_error()); + } } /* Try to process the next queued aws_io_message as normal HTTP data for an aws_http_stream. @@ -1925,7 +1953,7 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne static int s_try_process_next_stream_read_message(struct aws_h1_connection *connection, bool *out_stop_processing) { AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); AWS_ASSERT(!connection->thread_data.has_switched_protocols); - AWS_ASSERT(!connection->thread_data.is_reading_stopped); + AWS_ASSERT(connection->thread_data.read_state != AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE); AWS_ASSERT(!aws_linked_list_empty(&connection->thread_data.read_buffer.messages)); *out_stop_processing = false; @@ -2122,6 +2150,31 @@ static int s_handler_increment_read_window( return AWS_OP_SUCCESS; } +static void s_initialize_read_delay_shutdown(struct aws_h1_connection *connection, int error_code) { + + AWS_LOGF_DEBUG( + AWS_LS_HTTP_CONNECTION, + "id=%p: Connection still have pending data to be delivered during shutdown. Wait until downstream " + "reads the data.", + (void *)&connection->base); + + AWS_LOGF_TRACE( + AWS_LS_HTTP_CONNECTION, + "id=%p: Current window stats: connection=%zu, stream=%" PRIu64 " buffer=%zu/%zu", + (void *)&connection->base, + connection->thread_data.connection_window, + connection->thread_data.incoming_stream ? connection->thread_data.incoming_stream->thread_data.stream_window + : 0, + connection->thread_data.read_buffer.pending_bytes, + connection->thread_data.read_buffer.capacity); + + /* Still have data buffered in connection, wait for it to be processed */ + connection->thread_data.read_state = AWS_CONNECTION_READ_SHUTTING_DOWN; + connection->thread_data.pending_shutdown_error_code = error_code; + /* Try to process messages in queue */ + aws_h1_connection_try_process_read_messages(connection); +} + static int s_handler_shutdown( struct aws_channel_handler *handler, struct aws_channel_slot *slot, @@ -2142,6 +2195,12 @@ static int s_handler_shutdown( if (dir == AWS_CHANNEL_DIR_READ) { /* This call ensures that no further streams will be created or worked on. */ + if (!free_scarce_resources_immediately && connection->thread_data.read_state == AWS_CONNECTION_READ_OPEN && + connection->thread_data.read_buffer.pending_bytes > 0) { + s_initialize_read_delay_shutdown(connection, error_code); + /* Return success, and wait for the buffered data to be processed to propagate the shutdown. */ + return AWS_OP_SUCCESS; + } s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, false /*schedule_shutdown*/, error_code); } else /* dir == AWS_CHANNEL_DIR_WRITE */ { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6cd95462..dcd242d9 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -148,6 +148,10 @@ add_test_case(h1_client_stream_cancel) add_test_case(h1_client_response_close_connection_before_request_finishes) add_test_case(h1_client_response_first_byte_timeout_connection) add_test_case(h1_client_response_first_byte_timeout_request_override) +add_test_case(h1_client_connection_close_before_request_finishes_with_buffer) +add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_incomplete_response) +add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown) +add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_stream_cancel) add_test_case(strutil_trim_http_whitespace) add_test_case(strutil_is_http_token) diff --git a/tests/test_h1_client.c b/tests/test_h1_client.c index 8c32a13c..9cbe7f79 100644 --- a/tests/test_h1_client.c +++ b/tests/test_h1_client.c @@ -4519,3 +4519,217 @@ H1_CLIENT_TEST_CASE(h1_client_response_first_byte_timeout_request_override) { ASSERT_SUCCESS(s_tester_clean_up(&tester)); return AWS_OP_SUCCESS; } + +/** + * Once upon a time, when the connection received data from the channel, we buffer the data in the connection level, + and + * then send it to the stream. + * When stream has no window left to receive the data from connection, the data will be kept in the buffer. + * If the connection starts to shutdown before stream opens its window, the buffered data will be throw away because + of + * shutdown process. + * But, the connection actually received the full response, which is an unexpected behavior for the + * stream to report connection close with error. + */ +H1_CLIENT_TEST_CASE(h1_client_connection_close_before_request_finishes_with_buffer) { + (void)ctx; + struct tester_options tester_opts = { + .manual_window_management = true, + .initial_stream_window_size = 5, + .read_buffer_capacity = SIZE_MAX, + }; + struct tester tester; + ASSERT_SUCCESS(s_tester_init_ex(&tester, allocator, &tester_opts)); + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("GET"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt"))); + + struct client_stream_tester stream_tester; + ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, &tester, request)); + + /* send head of request */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + + /* Ensure the request can be destroyed after request is sent */ + aws_http_message_destroy(request); + + /* send close connection response */ + ASSERT_SUCCESS(testing_channel_push_read_str( + &tester.testing_channel, + "HTTP/1.1 200 OK\r\n" + "Content-Length: 9\r\n" + "\r\n" + "Call Momo")); + + /* All the response data has been processed, buffered in the connection level. */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + + /* Some handler starts the shutdown process, while the stream still has 0 window left. (eg: socket reads EOF, TLS + * reads graceful shutdown) */ + aws_channel_shutdown(tester.testing_channel.channel, AWS_ERROR_UNKNOWN); + testing_channel_drain_queued_tasks(&tester.testing_channel); + /* We should not complete the stream, since the window for the stream is still 0. */ + ASSERT_FALSE(stream_tester.complete); + + /* Updated the window after shutdown happens */ + aws_http_stream_update_window(stream_tester.stream, 5); + /* Wait for channel to finish shutdown */ + testing_channel_drain_queued_tasks(&tester.testing_channel); + + /* check result */ + ASSERT_TRUE(stream_tester.complete); + ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code); + ASSERT_INT_EQUALS(200, stream_tester.response_status); + ASSERT_UINT_EQUALS(1, aws_http_headers_count(stream_tester.response_headers)); + ASSERT_SUCCESS(s_check_header(stream_tester.response_headers, 0, "Content-Length", "9")); + ASSERT_TRUE(aws_byte_buf_eq_c_str(&stream_tester.response_body, "Call Momo")); + + /* clean up */ + client_stream_tester_clean_up(&stream_tester); + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + +H1_CLIENT_TEST_CASE(h1_client_connection_close_before_request_finishes_with_buffer_incomplete_response) { + (void)ctx; + struct tester_options tester_opts = { + .manual_window_management = true, + .initial_stream_window_size = 5, + .read_buffer_capacity = SIZE_MAX, + }; + struct tester tester; + ASSERT_SUCCESS(s_tester_init_ex(&tester, allocator, &tester_opts)); + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("GET"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt"))); + + struct client_stream_tester stream_tester; + ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, &tester, request)); + + /* send head of request */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + + /* Ensure the request can be destroyed after request is sent */ + aws_http_message_destroy(request); + + /* Send the incomplete response. */ + ASSERT_SUCCESS(testing_channel_push_read_str( + &tester.testing_channel, + "HTTP/1.1 200 OK\r\n" + "Content-Length: 9\r\n" + "\r\n" + "Call Mo")); + + /* All the response data has been processed, buffered in the connection level. */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + + /* Some handler starts the shutdown process, while the stream still has 0 window left. (eg: socket reads EOF, TLS + * reads graceful shutdown) */ + aws_channel_shutdown(tester.testing_channel.channel, AWS_ERROR_UNIMPLEMENTED); + testing_channel_drain_queued_tasks(&tester.testing_channel); + /* We should not complete the stream, since the window for the stream is still 0. */ + ASSERT_FALSE(stream_tester.complete); + + /* Updated the window after shutdown happens */ + aws_http_stream_update_window(stream_tester.stream, 5); + /* Wait for channel to finish shutdown */ + testing_channel_drain_queued_tasks(&tester.testing_channel); + + /* check result */ + ASSERT_TRUE(stream_tester.complete); + /* Fail */ + ASSERT_INT_EQUALS(AWS_ERROR_UNIMPLEMENTED, stream_tester.on_complete_error_code); + ASSERT_INT_EQUALS(200, stream_tester.response_status); + ASSERT_UINT_EQUALS(1, aws_http_headers_count(stream_tester.response_headers)); + ASSERT_SUCCESS(s_check_header(stream_tester.response_headers, 0, "Content-Length", "9")); + /* Incomplete response received */ + ASSERT_TRUE(aws_byte_buf_eq_c_str(&stream_tester.response_body, "Call Mo")); + + /* clean up */ + client_stream_tester_clean_up(&stream_tester); + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + +static int s_h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown_helper( + struct aws_allocator *allocator, + bool connection_close) { + struct tester_options tester_opts = { + .manual_window_management = true, + .initial_stream_window_size = 5, + .read_buffer_capacity = SIZE_MAX, + }; + struct tester tester; + ASSERT_SUCCESS(s_tester_init_ex(&tester, allocator, &tester_opts)); + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("GET"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt"))); + + struct client_stream_tester stream_tester; + ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, &tester, request)); + + /* send head of request */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + + /* Ensure the request can be destroyed after request is sent */ + aws_http_message_destroy(request); + + /* Send the incomplete response. */ + ASSERT_SUCCESS(testing_channel_push_read_str( + &tester.testing_channel, + "HTTP/1.1 200 OK\r\n" + "Content-Length: 9\r\n" + "\r\n" + "Call Mo")); + + /* All the response data has been processed, buffered in the connection level. */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + + /* Some handler starts the shutdown process, while the stream still has 0 window left. (eg: socket reads EOF, TLS + * reads graceful shutdown) */ + aws_channel_shutdown(tester.testing_channel.channel, AWS_ERROR_UNIMPLEMENTED); + testing_channel_drain_queued_tasks(&tester.testing_channel); + /* We should not complete the stream, since the window for the stream is still 0. */ + ASSERT_FALSE(stream_tester.complete); + + /* Don't update the window, just cancel the stream should also complete the stream. */ + if (connection_close) { + aws_http_connection_close(tester.connection); + } else { + /* Stream cancel error will be override as the connection shutdown happens before cancel. */ + aws_http_stream_cancel(stream_tester.stream, AWS_ERROR_UNKNOWN); + } + /* Wait for channel to finish shutdown */ + testing_channel_drain_queued_tasks(&tester.testing_channel); + + /* check result */ + ASSERT_TRUE(stream_tester.complete); + /* Fail */ + ASSERT_INT_EQUALS(AWS_ERROR_UNIMPLEMENTED, stream_tester.on_complete_error_code); + ASSERT_INT_EQUALS(200, stream_tester.response_status); + ASSERT_UINT_EQUALS(1, aws_http_headers_count(stream_tester.response_headers)); + ASSERT_SUCCESS(s_check_header(stream_tester.response_headers, 0, "Content-Length", "9")); + /* Incomplete response received */ + ASSERT_TRUE(aws_byte_buf_eq_c_str(&stream_tester.response_body, "Call ")); + + /* clean up */ + client_stream_tester_clean_up(&stream_tester); + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + +H1_CLIENT_TEST_CASE(h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown) { + (void)ctx; + return s_h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown_helper(allocator, true); +} + +H1_CLIENT_TEST_CASE(h1_client_connection_close_before_request_finishes_with_buffer_stream_cancel) { + (void)ctx; + return s_h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown_helper(allocator, false); +}