Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quickly adding Unix Domain Socket support... #379

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ else ()
add_executable(helloworld helloworld.cpp)
target_link_libraries(helloworld ${Boost_LIBRARIES})
target_link_libraries(helloworld ${CMAKE_THREAD_LIBS_INIT})
add_executable(hellounixsocketworld hellounixsocketworld.cpp)
target_link_libraries(hellounixsocketworld ${Boost_LIBRARIES})
target_link_libraries(hellounixsocketworld ${CMAKE_THREAD_LIBS_INIT})

if (OPENSSL_FOUND)
add_executable(example_ssl ssl/example_ssl.cpp)
Expand Down
14 changes: 14 additions & 0 deletions examples/hellounixsocketworld.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "crow.h"

int main()
{
crow::LocalApp app;

CROW_ROUTE(app, "/")
([]() {
return "Hello Unix Socket world!";
});

::unlink("/tmp/local_sock"); // Remove previous binding.
app.path("/tmp/local_sock").run();
}
47 changes: 47 additions & 0 deletions include/crow/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,51 @@ namespace crow
template <typename ... Middlewares>
using App = Crow<Middlewares...>;
using SimpleApp = Crow<>;

template <typename ... Middlewares>
class LocalCrow : public Crow<Middlewares...>
{
public:
using self_t = LocalCrow;
using server_t = LocalServer<LocalCrow, UnixSocketAdaptor, Middlewares...>;

self_t& path(std::string path)
{
path_ = path;
return *this;
}

void validate()
{
Crow<Middlewares...>::validate();
}

void notify_server_start()
{
Crow<Middlewares...>::notify_server_start();
}

void run()
{
validate();
{
server_ = std::move(std::unique_ptr<server_t>(new server_t(this, path_, &middlewares_, concurrency_, nullptr)));
server_->set_tick_function(tick_interval_, tick_function_);
notify_server_start();
server_->run();
}
}

private:
std::string path_ = "/tmp/crowsock";
uint16_t concurrency_ = 1;

std::chrono::milliseconds tick_interval_;
std::function<void()> tick_function_;

std::tuple<Middlewares...> middlewares_;
std::unique_ptr<server_t> server_;
};

using LocalApp = LocalCrow<>;
}
215 changes: 215 additions & 0 deletions include/crow/http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace crow
{
using namespace boost;
using tcp = asio::ip::tcp;
using stream_protocol = asio::local::stream_protocol;

template <typename Handler, typename Adaptor = SocketAdaptor, typename ... Middlewares>
class Server
Expand Down Expand Up @@ -231,6 +232,220 @@ namespace crow

std::tuple<Middlewares...>* middlewares_;

#ifdef CROW_ENABLE_SSL
bool use_ssl_{false};
boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23};
#endif
typename Adaptor::context* adaptor_ctx_;
};

template <typename Handler, typename Adaptor = UnixSocketAdaptor, typename ... Middlewares>
class LocalServer
{
public:
LocalServer(Handler* handler, std::string path, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, typename Adaptor::context* adaptor_ctx = nullptr)
: acceptor_(io_service_, stream_protocol::endpoint(path)),
signals_(io_service_, SIGINT, SIGTERM),
tick_timer_(io_service_),
handler_(handler),
concurrency_(concurrency),
socket_path_(path),
middlewares_(middlewares),
adaptor_ctx_(adaptor_ctx)
{
}

void set_tick_function(std::chrono::milliseconds d, std::function<void()> f)
{
tick_interval_ = d;
tick_function_ = f;
}

void on_tick()
{
tick_function_();
tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count()));
tick_timer_.async_wait([this](const boost::system::error_code& ec)
{
if (ec)
return;
on_tick();
});
}

void run()
{
if (concurrency_ < 0)
concurrency_ = 1;

for(int i = 0; i < concurrency_; i++)
io_service_pool_.emplace_back(new boost::asio::io_service());
get_cached_date_str_pool_.resize(concurrency_);
timer_queue_pool_.resize(concurrency_);

std::vector<std::future<void>> v;
std::atomic<int> init_count(0);
for(uint16_t i = 0; i < concurrency_; i ++)
v.push_back(
std::async(std::launch::async, [this, i, &init_count]{

// thread local date string get function
auto last = std::chrono::steady_clock::now();

std::string date_str;
auto update_date_str = [&]
{
auto last_time_t = time(0);
tm my_tm;

#if defined(_MSC_VER) or defined(__MINGW32__)
gmtime_s(&my_tm, &last_time_t);
#else
gmtime_r(&last_time_t, &my_tm);
#endif
date_str.resize(100);
size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm);
date_str.resize(date_str_sz);
};
update_date_str();
get_cached_date_str_pool_[i] = [&]()->std::string
{
if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1))
{
last = std::chrono::steady_clock::now();
update_date_str();
}
return date_str;
};

// initializing timer queue
detail::dumb_timer_queue timer_queue;
timer_queue_pool_[i] = &timer_queue;

timer_queue.set_io_service(*io_service_pool_[i]);
boost::asio::deadline_timer timer(*io_service_pool_[i]);
timer.expires_from_now(boost::posix_time::seconds(1));

std::function<void(const boost::system::error_code& ec)> handler;
handler = [&](const boost::system::error_code& ec){
if (ec)
return;
timer_queue.process();
timer.expires_from_now(boost::posix_time::seconds(1));
timer.async_wait(handler);
};
timer.async_wait(handler);

init_count ++;
while(1)
{
try
{
if (io_service_pool_[i]->run() == 0)
{
// when io_service.run returns 0, there are no more works to do.
break;
}
} catch(std::exception& e)
{
CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what();
}
}
}));

if (tick_function_ && tick_interval_.count() > 0)
{
tick_timer_.expires_from_now(boost::posix_time::milliseconds(tick_interval_.count()));
tick_timer_.async_wait([this](const boost::system::error_code& ec)
{
if (ec)
return;
on_tick();
});
}

CROW_LOG_INFO << server_name_ << " server is listening at " << socket_path_
<< " using " << concurrency_ << " threads";
CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs.";

signals_.async_wait(
[&](const boost::system::error_code& /*error*/, int /*signal_number*/){
stop();
});

while(concurrency_ != init_count)
std::this_thread::yield();

do_accept();

std::thread([this]{
io_service_.run();
CROW_LOG_INFO << "Exiting.";
}).join();
}

void stop()
{
io_service_.stop();
for(auto& io_service:io_service_pool_)
io_service->stop();
}

private:
asio::io_service& pick_io_service()
{
// TODO load balancing
roundrobin_index_++;
if (roundrobin_index_ >= io_service_pool_.size())
roundrobin_index_ = 0;
return *io_service_pool_[roundrobin_index_];
}

void do_accept()
{
asio::io_service& is = pick_io_service();
auto p = new Connection<Adaptor, Handler, Middlewares...>(
is, handler_, server_name_, middlewares_,
get_cached_date_str_pool_[roundrobin_index_], *timer_queue_pool_[roundrobin_index_],
adaptor_ctx_);
acceptor_.async_accept(p->socket(),
[this, p, &is](boost::system::error_code ec)
{
if (!ec)
{
is.post([p]
{
p->start();
});
}
else
{
delete p;
}
do_accept();
});
}

private:
asio::io_service io_service_;
std::vector<std::unique_ptr<asio::io_service>> io_service_pool_;
std::vector<detail::dumb_timer_queue*> timer_queue_pool_;
std::vector<std::function<std::string()>> get_cached_date_str_pool_;
stream_protocol::acceptor acceptor_;
boost::asio::signal_set signals_;
boost::asio::deadline_timer tick_timer_;

Handler* handler_;
uint16_t concurrency_{1};
std::string server_name_ = "Crow/0.1";
std::string socket_path_;
unsigned int roundrobin_index_{};

std::chrono::milliseconds tick_interval_;
std::function<void()> tick_function_;

std::tuple<Middlewares...>* middlewares_;

#ifdef CROW_ENABLE_SSL
bool use_ssl_{false};
boost::asio::ssl::context ssl_context_{boost::asio::ssl::context::sslv23};
Expand Down
5 changes: 5 additions & 0 deletions include/crow/routing.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ namespace crow
res = response(404);
res.end();
}
virtual void handle_upgrade(const request&, response& res, UnixSocketAdaptor&&)
{
res = response(404);
res.end();
}
#ifdef CROW_ENABLE_SSL
virtual void handle_upgrade(const request&, response& res, SSLAdaptor&&)
{
Expand Down
49 changes: 49 additions & 0 deletions include/crow/socket_adaptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace crow
{
using namespace boost;
using tcp = asio::ip::tcp;
using stream_protocol = asio::local::stream_protocol;

struct SocketAdaptor
{
Expand Down Expand Up @@ -56,6 +57,54 @@ namespace crow

tcp::socket socket_;
};

struct UnixSocketAdaptor
{
using context = void;
UnixSocketAdaptor(boost::asio::io_service& io_service, context*)
: socket_(io_service)
{
}

boost::asio::io_service& get_io_service()
{
return socket_.get_io_service();
}

stream_protocol::socket& raw_socket()
{
return socket_;
}

stream_protocol::socket& socket()
{
return socket_;
}

stream_protocol::endpoint remote_endpoint()
{
return socket_.local_endpoint();
}

bool is_open()
{
return socket_.is_open();
}

void close()
{
boost::system::error_code ec;
socket_.close(ec);
}

template <typename F>
void start(F f)
{
f(boost::system::error_code());
}

stream_protocol::socket socket_;
};

#ifdef CROW_ENABLE_SSL
struct SSLAdaptor
Expand Down