diff --git a/.github/scripts/check-autobahn-result.php b/.github/scripts/check-autobahn-result.php index 237bc02..0dd4cfa 100644 --- a/.github/scripts/check-autobahn-result.php +++ b/.github/scripts/check-autobahn-result.php @@ -19,8 +19,7 @@ foreach($data as $k=>$v) { $ok = true; if($v["behavior"] == "UNIMPLEMENTED" || $v["behaviorClose"] == "UNIMPLEMENTED") { - if(explode(".", $k)[0] == "12" || explode(".", $k)[0] == "13") continue; - else $ok = false; + $ok = false; } else if($v["behavior"] == "NON-STRICT" || $v["behaviorClose"] == "NON-STRICT") { if(!in_array($k, $non_strict)) $ok = false; } else if($v["behavior"] == "INFORMATIONAL" || $v["behaviorClose"] == "INFORMATIONAL") { diff --git a/.github/workflows/autobahn-ws.yml b/.github/workflows/autobahn-ws.yml index 7ab8e54..779a468 100644 --- a/.github/workflows/autobahn-ws.yml +++ b/.github/workflows/autobahn-ws.yml @@ -14,11 +14,11 @@ on: - ".github/actions/**" - ".github/workflows/autobahn-ws.yml" env: - CXX: /usr/bin/clang++-12 - CC: /usr/bin/clang-12 + CXX: /usr/bin/clang++-15 + CC: /usr/bin/clang-15 jobs: autobahn-ws: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 - name: Configure diff --git a/CMakeLists.txt b/CMakeLists.txt index a73bd70..443f2b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,6 +8,7 @@ endif() project(AsyncppCURL) find_package(Threads REQUIRED) +find_package(ZLIB REQUIRED) include(cmake/GetCURL.cmake) @@ -36,7 +37,7 @@ add_library( ${CMAKE_CURRENT_SOURCE_DIR}/src/curl/webclient.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/curl/websocket.cpp) target_link_libraries(asyncpp_curl PUBLIC asyncpp) -target_link_libraries(asyncpp_curl PUBLIC CURL::libcurl Threads::Threads) +target_link_libraries(asyncpp_curl PUBLIC CURL::libcurl ZLIB::ZLIB Threads::Threads) target_include_directories(asyncpp_curl PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) target_compile_features(asyncpp_curl PUBLIC cxx_std_20) diff --git a/include/asyncpp/curl/websocket.h b/include/asyncpp/curl/websocket.h index addf317..aa5e9b9 100644 --- a/include/asyncpp/curl/websocket.h +++ b/include/asyncpp/curl/websocket.h @@ -24,6 +24,7 @@ namespace asyncpp::curl { ping, pong, + compressed = 0x40, fin = 0x80, }; using header_map = std::multimap; diff --git a/src/curl/tcp_client.cpp b/src/curl/tcp_client.cpp index 49d94d3..618ec71 100644 --- a/src/curl/tcp_client.cpp +++ b/src/curl/tcp_client.cpp @@ -93,44 +93,24 @@ namespace asyncpp::curl { auto recv_cb = std::exchange(m_recv_handler, {}); m_handle.reset(); m_is_connected = false; -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 m_executor.push([cb = std::move(cb), recv_cb = std::move(recv_cb), send_cb = std::move(send_cb)]() { if (recv_cb) recv_cb(true); if (send_cb) send_cb(true); if (cb) cb(); }); -#else - lck.unlock(); - if (recv_cb) recv_cb(true); - if (send_cb) send_cb(true); - if (cb) cb(); -#endif } void tcp_client::send(const void* buffer, size_t size, std::function cb) { std::unique_lock lck{m_mtx}; if (m_send_handler) throw std::logic_error("Send already in progress"); if (!m_is_connected) { -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb)]() { cb(0); }); -#else - lck.unlock(); - return cb(0); -#endif } // Try inline send auto res = m_handle.send(buffer, size); if (res != -1) { if (m_handle.is_verbose()) printf("* curl::tcp_client.send finished inline res=%td\n", res); -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb), res]() { cb(res); }); -#else - lck.unlock(); - return cb(res); -#endif } // No data available, set the callback and unpause the transfer (adds the connections to poll). m_send_handler = [this, buffer, size, cb](bool cancel) { @@ -153,25 +133,13 @@ namespace asyncpp::curl { std::unique_lock lck{m_mtx}; if (m_send_handler) throw std::logic_error("Send already in progress"); if (!m_is_connected) { -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb)]() { cb(0); }); -#else - lck.unlock(); - return cb(0); -#endif } // Try inline send auto res = m_handle.send(buffer, size); if (res == 0 || static_cast(res) == size) { if (m_handle.is_verbose()) printf("* curl::tcp_client.send_all finished inline res=%td\n", res); -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb), res]() { cb(res); }); -#else - lck.unlock(); - return cb(res); -#endif } auto u8ptr = reinterpret_cast(buffer); size_t sent{(res < 0) ? 0u : static_cast(res)}; @@ -197,25 +165,13 @@ namespace asyncpp::curl { std::unique_lock lck{m_mtx}; if (m_recv_handler) throw std::logic_error("Read already in progress"); if (!m_is_connected) { -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb)]() { cb(0); }); -#else - lck.unlock(); - return cb(0); -#endif } // Try inline recv auto res = m_handle.recv(buffer, size); if (res != -1) { if (m_handle.is_verbose()) printf("* curl::tcp_client.recv finished inline res=%td\n", res); -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb), res]() { cb(res); }); -#else - lck.unlock(); - return cb(res); -#endif } m_recv_handler = [this, buffer, size, cb](bool cancel) { if (cancel) { @@ -235,25 +191,13 @@ namespace asyncpp::curl { std::unique_lock lck{m_mtx}; if (m_recv_handler) throw std::logic_error("Read already in progress"); if (!m_is_connected) { -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb)]() { cb(0); }); -#else - lck.unlock(); - return cb(0); -#endif } // Try inline send auto res = m_handle.recv(buffer, size); if (res == 0 || static_cast(res) == size) { if (m_handle.is_verbose()) printf("* curl::tcp_client.recv_all finished inline res=%td\n", res); -// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead -#if defined(__clang__) && __clang_major__ < 12 return m_executor.push([cb = std::move(cb), res]() { cb(res); }); -#else - lck.unlock(); - return cb(res); -#endif } auto u8ptr = reinterpret_cast(buffer); size_t read{(res < 0) ? 0u : static_cast(res)}; diff --git a/src/curl/websocket.cpp b/src/curl/websocket.cpp index 3149e7c..10b5caf 100644 --- a/src/curl/websocket.cpp +++ b/src/curl/websocket.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -14,9 +15,9 @@ #include #include -#include #include #include +#include namespace { static const std::string ws_magic_string = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; @@ -70,10 +71,17 @@ namespace { } // namespace namespace asyncpp::curl { - namespace detail { + namespace detail { struct websocket_state { websocket_state(executor& e) noexcept : client(e) {} + ~websocket_state() { + if (pmce_enabled) { + inflateEnd(&pmce_inflate); + deflateEnd(&pmce_deflate); + pmce_enabled = false; + } + } mutable thread_safe_refcount m_refcount{0}; std::recursive_mutex mtx; @@ -95,7 +103,8 @@ namespace asyncpp::curl { // Async management async_launch_scope async_scope{}; - opcode fragment_code = opcode::continuation; + bool fragment_was_compressed{}; + websocket::opcode fragment_code = websocket::opcode::continuation; std::string fragment_payload; size_t fragment_validate_offset = 0; @@ -104,6 +113,17 @@ namespace asyncpp::curl { std::atomic utf8_mode{utf8_validator::mode::pedantic}; + // PMCE state + bool pmce_enabled{}; + z_stream pmce_inflate{}; + z_stream pmce_deflate{}; + bool pmce_client_no_context_takeover{}; + bool inflate_some(const std::string& in, std::string& out); + bool deflate_some(const websocket::buffer in, std::string& out); + + static task handshake_task(ref state); + static task send_task(ref state); + // Parser state size_t parser_wanted_size{2}; std::string parser_data; @@ -153,6 +173,7 @@ namespace asyncpp::curl { m_state->request_headers.emplace("Connection", "Upgrade"); m_state->request_headers.emplace("Upgrade", "websocket"); m_state->request_headers.emplace("Sec-WebSocket-Version", "13"); + m_state->request_headers.emplace("Sec-WebSocket-Extensions", "permessage-deflate; client_max_window_bits"); std::uniform_int_distribution dist((std::numeric_limits::min)(), (std::numeric_limits::max)()); std::string nonce; nonce.resize(16); @@ -220,6 +241,40 @@ namespace asyncpp::curl { m_state->send_frame(op, data, std::move(cb)); } + bool detail::websocket_state::inflate_some(const std::string& in, std::string& out) { + pmce_inflate.next_in = const_cast(reinterpret_cast(in.data())); + pmce_inflate.avail_in = in.size(); + size_t out_offset = 0; + while (pmce_inflate.avail_in != 0) { + out_offset = out.size(); + out.resize(std::max(in.size() * 2, out.size() * 2)); + pmce_inflate.next_out = reinterpret_cast(out.data() + out_offset); + pmce_inflate.avail_out = out.size() - out_offset; + + auto res = inflate(&pmce_inflate, Z_NO_FLUSH); + if (res != Z_OK) return false; + } + out.resize(out.size() - pmce_inflate.avail_out); + return true; + } + + bool detail::websocket_state::deflate_some(const websocket::buffer in, std::string& out) { + pmce_deflate.next_in = const_cast(reinterpret_cast(in.data())); + pmce_deflate.avail_in = in.size(); + size_t out_offset = 0; + while (pmce_deflate.avail_in != 0) { + out_offset = out.size(); + out.resize(std::max(in.size() * 2, out.size() * 2)); + pmce_deflate.next_out = reinterpret_cast(out.data() + out_offset); + pmce_deflate.avail_out = out.size() - out_offset; + + auto res = deflate(&pmce_deflate, pmce_client_no_context_takeover ? Z_FULL_FLUSH : Z_SYNC_FLUSH); + if (res != Z_OK) return false; + } + out.resize(out.size() - pmce_deflate.avail_out); + return true; + } + detail::websocket_state::cs detail::websocket_state::state_transition(cs newstate) { auto old = con_state.exchange(newstate); constexpr const char* states[] = {"init", "connect", "handshake", "open", "client_close", "server_close", "closed"}; @@ -228,93 +283,100 @@ namespace asyncpp::curl { return old; } - void detail::websocket_state::raw_on_connect(int code) { - std::unique_lock lck{mtx}; - state_transition(cs::handshake); - ref state(this); - if (code != 0) { - if (auto cb = on_open; cb) cb(code); - return; - } - - async_scope.launch([](ref state) -> task { - std::string buffer; - try { - // clang-format off - std::string request = - "GET " + state->last_url.path_query() + " HTTP/1.1\r\n" - "Host: " + state->last_url.host() + (state->last_url.port() > 0 ? (":" + std::to_string(state->last_url.port())) : "") + "\r\n"; - // clang-format on - for (auto& e : state->request_headers) { - request += e.first + ": " + e.second + "\r\n"; - } - request += "\r\n"; - co_await state->client.send_all(request.data(), request.size()); - - std::string header; - bool header_ok = co_await read_until(state->client, "\r\n\r\n", header, buffer); - if (!header_ok) throw exception(CURLE_RECV_ERROR); - auto lines = string_split(header, std::string_view("\r\n")); - if (lines.empty()) throw exception(CURLE_WEIRD_SERVER_REPLY); - auto status_line = lines[0]; - size_t pos = status_line.find(' '); - if (pos == std::string::npos) throw exception(CURLE_WEIRD_SERVER_REPLY); - //http_version = status_line.substr(0, pos); - status_line = status_line.substr(status_line.find_first_not_of(' ', pos)); - pos = status_line.find(' '); - if (pos == std::string::npos) throw exception(CURLE_WEIRD_SERVER_REPLY); - auto status_code = std::stoul(std::string(status_line.substr(0, pos))); - if (status_code < 100 || status_code > 600) throw exception(CURLE_WEIRD_SERVER_REPLY); - status_line = status_line.substr(status_line.find_first_not_of(' ', pos)); - - lines.erase(lines.begin()); - for (auto e : lines) { - auto parts = string_split(e, std::string_view(":")); - if (parts.size() != 2) continue; - string_trim(parts[0]); - string_trim(parts[1]); - state->response_headers.emplace(parts[0], parts[1]); - } - - auto accept_it = state->response_headers.find("sec-websocket-accept"); - if (status_code != 101 || accept_it == state->response_headers.end() || state->handshake_nonce != accept_it->second) - throw exception(CURLE_HTTP_RETURNED_ERROR); - } catch (const exception& e) { - if (auto cb = state->on_open; cb) cb(e.code()); - co_return; - } catch (...) { - if (auto cb = state->on_open; cb) cb(CURLE_FUNCTION_NOT_FOUND); - co_return; + task detail::websocket_state::handshake_task(ref state) { + std::string buffer; + try { + // clang-format off + std::string request = + "GET " + state->last_url.path_query() + " HTTP/1.1\r\n" + "Host: " + state->last_url.host() + (state->last_url.port() > 0 ? (":" + std::to_string(state->last_url.port())) : "") + "\r\n"; + // clang-format on + for (auto& e : state->request_headers) { + request += e.first + ": " + e.second + "\r\n"; } - if (auto cb = state->on_open; cb) cb(0); - state->state_transition(cs::open); - state->parser_data = std::move(buffer); - // Dummy call to data event to process frames that might have been read together with the handshake - state->raw_on_data_available(false); - state->client.set_on_data_available([parent = state.get()](bool cancelled) { return parent->raw_on_data_available(cancelled); }); - state->client.pause_receive(false); - }(state)); - send_should_exit = false; - async_scope.launch([](ref state) -> task { - while (!state->send_should_exit) { - while (true) { - auto element = state->send_queue.pop(); - if (!element) { - co_await state->send_event.wait(&state->client.get_executor()); - state->send_event.reset(); - break; + request += "\r\n"; + co_await state->client.send_all(request.data(), request.size()); + + std::string header; + bool header_ok = co_await read_until(state->client, "\r\n\r\n", header, buffer); + if (!header_ok) throw exception(CURLE_RECV_ERROR); + auto lines = string_split(header, std::string_view("\r\n")); + if (lines.empty()) throw exception(CURLE_WEIRD_SERVER_REPLY); + auto status_line = lines[0]; + size_t pos = status_line.find(' '); + if (pos == std::string::npos) throw exception(CURLE_WEIRD_SERVER_REPLY); + //http_version = status_line.substr(0, pos); + status_line = status_line.substr(status_line.find_first_not_of(' ', pos)); + pos = status_line.find(' '); + if (pos == std::string::npos) throw exception(CURLE_WEIRD_SERVER_REPLY); + auto status_code = std::stoul(std::string(status_line.substr(0, pos))); + if (status_code < 100 || status_code > 600) throw exception(CURLE_WEIRD_SERVER_REPLY); + status_line = status_line.substr(status_line.find_first_not_of(' ', pos)); + + lines.erase(lines.begin()); + for (auto e : lines) { + auto parts = string_split(e, std::string_view(":")); + if (parts.size() != 2) continue; + string_trim(parts[0]); + string_trim(parts[1]); + state->response_headers.emplace(parts[0], parts[1]); + } + + auto accept_it = state->response_headers.find("sec-websocket-accept"); + if (status_code != 101 || accept_it == state->response_headers.end() || state->handshake_nonce != accept_it->second) + throw exception(CURLE_HTTP_RETURNED_ERROR); + + if (auto it = state->response_headers.find("Sec-WebSocket-Extensions"); it != state->response_headers.end()) { + for (auto& e : string_split(it->second, std::string_view(","))) { + if (!e.starts_with("permessage-deflate")) continue; + int deflate_window = 15; + if (auto pos = e.find("client_max_window_bits="); pos != std::string::npos) { + deflate_window = std::stoi(e.substr(pos + 23)); + deflate_window = std::max(std::min(deflate_window, 15), 8); } - auto res = co_await state->client.send_all(element->first.data(), element->first.size()); - if (element->second) element->second(res == element->first.size()); + if (auto pos = e.find("client_no_context_takeover"); pos != std::string::npos) state->pmce_client_no_context_takeover = true; + auto res = inflateInit2(&state->pmce_inflate, -15); + if (res != Z_OK) throw exception(CURLE_OUT_OF_MEMORY); + res = deflateInit2(&state->pmce_deflate, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -deflate_window, 8, Z_DEFAULT_STRATEGY); + if (res != Z_OK) throw exception(CURLE_OUT_OF_MEMORY); + state->pmce_enabled = true; } } + } catch (const exception& e) { + if (auto cb = state->on_open; cb) cb(e.code()); + co_return; + } catch (...) { + if (auto cb = state->on_open; cb) cb(CURLE_FUNCTION_NOT_FOUND); + co_return; + } + if (auto cb = state->on_open; cb) cb(0); + state->state_transition(cs::open); + state->parser_data = std::move(buffer); + // Dummy call to data event to process frames that might have been read together with the handshake + state->raw_on_data_available(false); + state->client.set_on_data_available([parent = state.get()](bool cancelled) { return parent->raw_on_data_available(cancelled); }); + state->client.pause_receive(false); + } + + task detail::websocket_state::send_task(ref state) { + while (!state->send_should_exit) { while (true) { - auto e = state->send_queue.pop(); - if (!e) break; - if (e->second) e->second(false); + auto element = state->send_queue.pop(); + if (!element) { + co_await state->send_event.wait(&state->client.get_executor()); + state->send_event.reset(); + break; + } + auto res = co_await state->client.send_all(element->first.data(), element->first.size()); + if (auto cb = std::move(element->second)) cb(res == element->first.size()); } - if (state->client.is_connected()) co_await state->client.disconnect(); - }(state)); + } + while (true) { + auto e = state->send_queue.pop(); + if (!e) break; + if (e->second) e->second(false); + } + if (state->client.is_connected()) co_await state->client.disconnect(); } void detail::websocket_state::raw_on_disconnect() { @@ -323,6 +385,20 @@ namespace asyncpp::curl { if (auto cb = on_close; cb) cb(1006, "Connection lost"); } + void detail::websocket_state::raw_on_connect(int code) { + std::unique_lock lck{mtx}; + state_transition(cs::handshake); + ref state(this); + if (code != 0) { + if (auto cb = on_open; cb) cb(code); + return; + } + + async_scope.invoke(&websocket_state::handshake_task, state); + send_should_exit = false; + async_scope.invoke(&websocket_state::send_task, state); + } + tcp_client::callback_result detail::websocket_state::raw_on_data_available(bool cancelled) { // Note: parser_data is never used outside this piece of code and raw_on_data_available is never called concurrently // since it runs on the curl executor, so we can afford to not lock here. @@ -358,7 +434,10 @@ namespace asyncpp::curl { } parser_wanted_size = (std::max)(parser_wanted_size, header_len + payload_len); if (parser_data.size() < parser_wanted_size) continue; - uint32_t mask = is_masked ? get_be(parser_data.data() + header_len - 4) : 0; + if (is_masked) { + close(1002, "Protocol error"); + break; + } handle_frame(code, parser_data.substr(header_len, payload_len)); parser_wanted_size = 2; parser_data.erase(parser_data.begin(), parser_data.begin() + header_len + payload_len); @@ -370,18 +449,35 @@ namespace asyncpp::curl { std::unique_lock lck{mtx}; if (auto s = con_state.load(); s == cs::closed || s == cs::server_close) return; - // RSV Bits should be zero - if (static_cast(code) & 0x70) return this->close(1002, "Protocol error"); - - switch (clean_op(code)) { - case opcode::continuation: this->handle_continuation_frame(code, std::move(payload)); break; - case opcode::text: this->handle_text_frame(code, std::move(payload)); break; - case opcode::binary: this->handle_binary_frame(code, std::move(payload)); break; - case opcode::close: this->handle_close_frame(code, std::move(payload)); break; - case opcode::ping: this->handle_ping_frame(code, std::move(payload)); break; - case opcode::pong: this->handle_pong_frame(code, std::move(payload)); break; - default: close(1002, "Protocol error"); break; - } + auto clean_code = clean_op(code); + if (clean_code == opcode::close || clean_code == opcode::ping || clean_code == opcode::pong) { + // RSV Bits should be zero + if (static_cast(code) & 0x70) return this->close(1002, "Protocol error"); + switch (clean_code) { + case opcode::close: this->handle_close_frame(code, std::move(payload)); break; + case opcode::ping: this->handle_ping_frame(code, std::move(payload)); break; + case opcode::pong: this->handle_pong_frame(code, std::move(payload)); break; + default: break; + } + } else if (clean_code == opcode::continuation || clean_code == opcode::binary || clean_code == opcode::text) { + if (static_cast(code) & 0x30 || (!pmce_enabled && (static_cast(code) & 0x70))) // + return this->close(1002, "Protocol error"); + const bool is_compressed = clean_code == opcode::continuation ? fragment_was_compressed : ((code & opcode::compressed) == opcode::compressed); + if (pmce_enabled && is_compressed) { + std::string decompressed; + if (!payload.empty() && !inflate_some(payload, decompressed)) return this->close(1002, "Protocol error"); + if ((code & opcode::fin) == opcode::fin && !inflate_some(std::string("\x00\x00\xff\xff", 4), decompressed)) + return this->close(1002, "Protocol error"); + payload = std::move(decompressed); + } + switch (clean_code) { + case opcode::continuation: this->handle_continuation_frame(code, std::move(payload)); break; + case opcode::binary: this->handle_binary_frame(code, std::move(payload)); break; + case opcode::text: this->handle_text_frame(code, std::move(payload)); break; + default: break; + } + } else + return close(1002, "Protocol error"); } void detail::websocket_state::handle_close_frame(opcode op, std::string payload) { @@ -434,6 +530,7 @@ namespace asyncpp::curl { if (!(op & opcode::fin)) { // Fragment start fragment_code = opcode::text; + fragment_was_compressed = (op & opcode::compressed) == opcode::compressed; fragment_payload = std::move(payload); std::string_view sv{fragment_payload}; auto last_valid = sv.cbegin(); @@ -451,6 +548,7 @@ namespace asyncpp::curl { if (!(op & opcode::fin)) { // Fragment start fragment_code = opcode::binary; + fragment_was_compressed = (op & opcode::compressed) == opcode::compressed; fragment_payload = std::move(payload); fragment_validate_offset = 0; } else { // Unfragmented message @@ -512,11 +610,20 @@ namespace asyncpp::curl { send_frame(opcode::fin | opcode::close, as_bytes(payload), std::move(cb)); } - void detail::websocket_state::send_frame(opcode op, const websocket::buffer buf, std::function cb) { + void detail::websocket_state::send_frame(opcode op, websocket::buffer buf, std::function cb) { if (clean_op(op) == opcode::close && con_state == cs::open) { // If we are the first to close, call on_close on disconnect/fin reply state_transition(cs::client_close); } + + std::string compressed; + if (pmce_enabled && (clean_op(op) == opcode::binary || clean_op(op) == opcode::text)) { + if (!deflate_some(buf, compressed)) throw std::runtime_error("failed to compress"); + if (compressed.size() > 4) compressed.resize(compressed.size() - 4); + buf = std::as_bytes(std::span(compressed)); + op = op | opcode::compressed; + } + std::pair> data; data.second = cb; size_t header_len = 2 + 4; diff --git a/test/util.cpp b/test/util.cpp index f90bf0d..9a2c516 100644 --- a/test/util.cpp +++ b/test/util.cpp @@ -126,7 +126,6 @@ namespace { TEST(ASYNCPP_CURL, Utf8CheckNormal) { constexpr size_t size = sizeof(test_cases) / sizeof(test_cases[0]); - bool all_ok = true; for (size_t i = 0; i < size; i++) { auto& t = test_cases[i]; auto res = utf8_validator{}(std::get<0>(t), utf8_validator::mode::normal); @@ -138,7 +137,6 @@ TEST(ASYNCPP_CURL, Utf8CheckNormal) { TEST(ASYNCPP_CURL, Utf8CheckStrict) { constexpr size_t size = sizeof(test_cases) / sizeof(test_cases[0]); - bool all_ok = true; for (size_t i = 0; i < size; i++) { auto& t = test_cases[i]; auto res = utf8_validator{}(std::get<0>(t), utf8_validator::mode::strict); @@ -150,7 +148,6 @@ TEST(ASYNCPP_CURL, Utf8CheckStrict) { TEST(ASYNCPP_CURL, Utf8CheckPedantic) { constexpr size_t size = sizeof(test_cases) / sizeof(test_cases[0]); - bool all_ok = true; for (size_t i = 0; i < size; i++) { auto& t = test_cases[i]; auto res = utf8_validator{}(std::get<0>(t), utf8_validator::mode::pedantic); @@ -162,7 +159,6 @@ TEST(ASYNCPP_CURL, Utf8CheckPedantic) { TEST(ASYNCPP_CURL, Utf8CheckExtreme) { constexpr size_t size = sizeof(test_cases) / sizeof(test_cases[0]); - bool all_ok = true; for (size_t i = 0; i < size; i++) { auto& t = test_cases[i]; auto res = utf8_validator{}(std::get<0>(t), utf8_validator::mode::extreme); diff --git a/test/ws-autobahn.cpp b/test/ws-autobahn.cpp index 6ec8f25..a58f257 100644 --- a/test/ws-autobahn.cpp +++ b/test/ws-autobahn.cpp @@ -1,9 +1,4 @@ -#include -#include -#include -#include #include -#include #include #include @@ -63,7 +58,7 @@ task async_main(int argc, const char** argv) { socket.set_on_open([i, ncases](int code) { std::cout << "Test case " << i << "/" << ncases << " ... " << std::flush; }); socket.set_on_message([&socket](websocket::buffer data, bool binary) { socket.send(data, binary, [](bool) {}); }); promise res; - socket.set_on_close([i, ncases, res, main_dp](uint16_t code, std::string_view reason) mutable { + socket.set_on_close([res, main_dp](uint16_t code, std::string_view reason) mutable { std::cout << "finished (code=" << code << ", reason=\"" << reason << "\")" << std::endl; main_dp->push([res]() mutable { res.fulfill({}); }); });