Skip to content

Commit

Permalink
✨ Implement websocket per message compression extension
Browse files Browse the repository at this point in the history
  • Loading branch information
Thalhammer committed Apr 13, 2024
1 parent 44cf466 commit 05faa6e
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 170 deletions.
3 changes: 1 addition & 2 deletions .github/scripts/check-autobahn-result.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
foreach($data as $k=>$v) {
$ok = true;
if($v["behavior"] == "UNIMPLEMENTED" || $v["behaviorClose"] == "UNIMPLEMENTED") {
if(explode(".", $k)[0] == "12" || explode(".", $k)[0] == "13") continue;
else $ok = false;
$ok = false;
} else if($v["behavior"] == "NON-STRICT" || $v["behaviorClose"] == "NON-STRICT") {
if(!in_array($k, $non_strict)) $ok = false;
} else if($v["behavior"] == "INFORMATIONAL" || $v["behaviorClose"] == "INFORMATIONAL") {
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/autobahn-ws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ on:
- ".github/actions/**"
- ".github/workflows/autobahn-ws.yml"
env:
CXX: /usr/bin/clang++-12
CC: /usr/bin/clang-12
CXX: /usr/bin/clang++-15
CC: /usr/bin/clang-15
jobs:
autobahn-ws:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- name: Configure
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ endif()
project(AsyncppCURL)

find_package(Threads REQUIRED)
find_package(ZLIB REQUIRED)

include(cmake/GetCURL.cmake)

Expand Down Expand Up @@ -36,7 +37,7 @@ add_library(
${CMAKE_CURRENT_SOURCE_DIR}/src/curl/webclient.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/curl/websocket.cpp)
target_link_libraries(asyncpp_curl PUBLIC asyncpp)
target_link_libraries(asyncpp_curl PUBLIC CURL::libcurl Threads::Threads)
target_link_libraries(asyncpp_curl PUBLIC CURL::libcurl ZLIB::ZLIB Threads::Threads)
target_include_directories(asyncpp_curl
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_features(asyncpp_curl PUBLIC cxx_std_20)
Expand Down
1 change: 1 addition & 0 deletions include/asyncpp/curl/websocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace asyncpp::curl {
ping,
pong,

compressed = 0x40,
fin = 0x80,
};
using header_map = std::multimap<std::string, std::string, case_insensitive_less>;
Expand Down
56 changes: 0 additions & 56 deletions src/curl/tcp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,44 +93,24 @@ namespace asyncpp::curl {
auto recv_cb = std::exchange(m_recv_handler, {});
m_handle.reset();
m_is_connected = false;
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
m_executor.push([cb = std::move(cb), recv_cb = std::move(recv_cb), send_cb = std::move(send_cb)]() {
if (recv_cb) recv_cb(true);
if (send_cb) send_cb(true);
if (cb) cb();
});
#else
lck.unlock();
if (recv_cb) recv_cb(true);
if (send_cb) send_cb(true);
if (cb) cb();
#endif
}

void tcp_client::send(const void* buffer, size_t size, std::function<void(size_t)> cb) {
std::unique_lock lck{m_mtx};
if (m_send_handler) throw std::logic_error("Send already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline send
auto res = m_handle.send(buffer, size);
if (res != -1) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.send finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
// No data available, set the callback and unpause the transfer (adds the connections to poll).
m_send_handler = [this, buffer, size, cb](bool cancel) {
Expand All @@ -153,25 +133,13 @@ namespace asyncpp::curl {
std::unique_lock lck{m_mtx};
if (m_send_handler) throw std::logic_error("Send already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline send
auto res = m_handle.send(buffer, size);
if (res == 0 || static_cast<size_t>(res) == size) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.send_all finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
auto u8ptr = reinterpret_cast<const uint8_t*>(buffer);
size_t sent{(res < 0) ? 0u : static_cast<size_t>(res)};
Expand All @@ -197,25 +165,13 @@ namespace asyncpp::curl {
std::unique_lock lck{m_mtx};
if (m_recv_handler) throw std::logic_error("Read already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline recv
auto res = m_handle.recv(buffer, size);
if (res != -1) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.recv finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
m_recv_handler = [this, buffer, size, cb](bool cancel) {
if (cancel) {
Expand All @@ -235,25 +191,13 @@ namespace asyncpp::curl {
std::unique_lock lck{m_mtx};
if (m_recv_handler) throw std::logic_error("Read already in progress");
if (!m_is_connected) {
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb)]() { cb(0); });
#else
lck.unlock();
return cb(0);
#endif
}
// Try inline send
auto res = m_handle.recv(buffer, size);
if (res == 0 || static_cast<size_t>(res) == size) {
if (m_handle.is_verbose()) printf("* curl::tcp_client.recv_all finished inline res=%td\n", res);
// Clang 10 seems to be unhappy about resuming inside await_suspend, so push the callback instead
#if defined(__clang__) && __clang_major__ < 12
return m_executor.push([cb = std::move(cb), res]() { cb(res); });
#else
lck.unlock();
return cb(res);
#endif
}
auto u8ptr = reinterpret_cast<uint8_t*>(buffer);
size_t read{(res < 0) ? 0u : static_cast<size_t>(res)};
Expand Down
Loading

0 comments on commit 05faa6e

Please sign in to comment.