Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Implement Websocket per message compression extension #11

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/scripts/check-autobahn-result.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
foreach($data as $k=>$v) {
$ok = true;
if($v["behavior"] == "UNIMPLEMENTED" || $v["behaviorClose"] == "UNIMPLEMENTED") {
if(explode(".", $k)[0] == "12" || explode(".", $k)[0] == "13") continue;
else $ok = false;
$ok = false;
} else if($v["behavior"] == "NON-STRICT" || $v["behaviorClose"] == "NON-STRICT") {
if(!in_array($k, $non_strict)) $ok = false;
} else if($v["behavior"] == "INFORMATIONAL" || $v["behaviorClose"] == "INFORMATIONAL") {
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/autobahn-ws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ on:
- ".github/actions/**"
- ".github/workflows/autobahn-ws.yml"
env:
CXX: /usr/bin/clang++-12
CC: /usr/bin/clang-12
CXX: /usr/bin/clang++-15
CC: /usr/bin/clang-15
jobs:
autobahn-ws:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- name: Configure
Expand Down
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ project(AsyncppCURL)

find_package(Threads REQUIRED)

include(cmake/GetZLIB.cmake)
include(cmake/GetCURL.cmake)

option(ASYNCPP_BUILD_TEST "Enable test builds" ON)
Expand Down Expand Up @@ -36,7 +37,8 @@ add_library(
${CMAKE_CURRENT_SOURCE_DIR}/src/curl/webclient.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/curl/websocket.cpp)
target_link_libraries(asyncpp_curl PUBLIC asyncpp)
target_link_libraries(asyncpp_curl PUBLIC CURL::libcurl Threads::Threads)
target_link_libraries(asyncpp_curl PUBLIC CURL::libcurl ZLIB::ZLIB
Threads::Threads)
target_include_directories(asyncpp_curl
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_features(asyncpp_curl PUBLIC cxx_std_20)
Expand Down
23 changes: 23 additions & 0 deletions cmake/GetZLIB.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
if(TARGET ZLIB::ZLIB)
message(STATUS "Using existing ZLIB target.")
else()
find_package(ZLIB)

if(NOT ZLIB_FOUND)
set(ZLIB_BUILD_EXAMPLES
OFF
CACHE INTERNAL "" FORCE)

include(FetchContent)
FetchContent_Declare(
zlib
URL https://github.com/madler/zlib/releases/download/v1.3.1/zlib-1.3.1.tar.xz
URL_HASH
SHA256=38ef96b8dfe510d42707d9c781877914792541133e1870841463bfa73f883e32
USES_TERMINAL_DOWNLOAD TRUE)
FetchContent_MakeAvailable(zlib)
set_property(TARGET zlib PROPERTY FOLDER "external")
add_library(ZLIB::ZLIB ALIAS zlibstatic)
message(STATUS "Building zlib using FetchContent")
endif()
endif()
1 change: 1 addition & 0 deletions include/asyncpp/curl/websocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace asyncpp::curl {
ping,
pong,

compressed = 0x40,
fin = 0x80,
};
using header_map = std::multimap<std::string, std::string, case_insensitive_less>;
Expand Down
56 changes: 0 additions & 56 deletions src/curl/tcp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,44 +93,24 @@ namespace asyncpp::curl {
auto recv_cb = std::exchange(m_recv_handler, {});
m_handle.reset();
m_is_connected = false;
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
m_executor.push([cb = std::move(cb), recv_cb = std::move(recv_cb), send_cb = std::move(send_cb)]() {
if (recv_cb) recv_cb(true);
if (send_cb) send_cb(true);
if (cb) cb();
});
#else
lck.unlock();
if (recv_cb) recv_cb(true);
if (send_cb) send_cb(true);
if (cb) cb();
#endif
}

void tcp_client::send(const void* buffer, size_t size, std::function<void(size_t)> cb) {
std::unique_lock lck{m_mtx};
if (m_send_handler) throw std::logic_error("Send already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline send
auto res = m_handle.send(buffer, size);
if (res != -1) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.send finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
// No data available, set the callback and unpause the transfer (adds the connections to poll).
m_send_handler = [this, buffer, size, cb](bool cancel) {
Expand All @@ -153,25 +133,13 @@ namespace asyncpp::curl {
std::unique_lock lck{m_mtx};
if (m_send_handler) throw std::logic_error("Send already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline send
auto res = m_handle.send(buffer, size);
if (res == 0 || static_cast<size_t>(res) == size) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.send_all finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
auto u8ptr = reinterpret_cast<const uint8_t*>(buffer);
size_t sent{(res < 0) ? 0u : static_cast<size_t>(res)};
Expand All @@ -197,25 +165,13 @@ namespace asyncpp::curl {
std::unique_lock lck{m_mtx};
if (m_recv_handler) throw std::logic_error("Read already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline recv
auto res = m_handle.recv(buffer, size);
if (res != -1) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.recv finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
m_recv_handler = [this, buffer, size, cb](bool cancel) {
if (cancel) {
Expand All @@ -235,25 +191,13 @@ namespace asyncpp::curl {
std::unique_lock lck{m_mtx};
if (m_recv_handler) throw std::logic_error("Read already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline send
auto res = m_handle.recv(buffer, size);
if (res == 0 || static_cast<size_t>(res) == size) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.recv_all finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
auto u8ptr = reinterpret_cast<uint8_t*>(buffer);
size_t read{(res < 0) ? 0u : static_cast<size_t>(res)};
Expand Down
Loading
Loading