diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 58b9e7bf3..090214625 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -37,8 +37,7 @@ class event_loop // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. inline void call_soon(object f) { - _strand.post([f, loop=this] {f();}); - return; + _strand.post([f]{f();}); } // TODO: implement this @@ -57,22 +56,22 @@ class event_loop inline void add_reader(int fd, object f) { - _add_reader_or_writer(fd, f, fd * 2); + _async_wait_fd(fd, f, _read_key(fd)); } inline void remove_reader(int fd) { - _remove_reader_or_writer(fd * 2); + _descriptor_map.erase(_read_key(fd)); } inline void add_writer(int fd, object f) { - _add_reader_or_writer(fd, f, fd * 2 + 1); + _async_wait_fd(fd, f, _write_key(fd)); } inline void remove_writer(int fd) { - _remove_reader_or_writer(fd * 2 + 1); + _descriptor_map.erase(_write_key(fd)); } @@ -127,8 +126,21 @@ class event_loop std::unordered_map> _descriptor_map; std::chrono::steady_clock::time_point _created_time; - void _add_reader_or_writer(int fd, object f, int key); - void _remove_reader_or_writer(int key); + inline int _read_key(int fd) + { + return fd * 2; + } + + inline int _write_key(int fd) + { + return fd * 2 + 1; + } + + template + void _async_wait_fd(int fd, F f, int key); + + static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr); + static void _sock_accept(event_loop& loop, object fut, object sock); }; }}} diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 913506b46..d2141d170 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -27,7 +27,9 @@ bool _hasattr(object o, const char* name) return PyObject_HasAttrString(o.ptr(), name); } -void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) +} + +void event_loop::_sock_connect_cb(object pymod_socket, object fut, object sock, object addr) { try { @@ -61,11 +63,10 @@ void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) } } -void _sock_accept(event_loop& loop, object fut, object sock) +void event_loop::_sock_accept(event_loop& loop, object fut, object sock) { int fd = extract(sock.attr("fileno")()); - object conn; - object address; + object conn, address; try { object ret = sock.attr("accept")(); @@ -80,9 +81,7 @@ void _sock_accept(event_loop& loop, object fut, object sock) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - loop.add_reader(fd, make_function(bind( - _sock_accept, boost::ref(loop), fut, sock), - default_call_policies(), boost::mpl::vector())); + loop._async_wait_fd(fd, bind(_sock_accept, boost::ref(loop), fut, sock), loop._write_key(fd)); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) @@ -94,12 +93,11 @@ void _sock_accept(event_loop& loop, object fut, object sock) PyErr_Clear(); fut.attr("set_exception")(std::current_exception()); } - } -} - + } } -void event_loop::_add_reader_or_writer(int fd, object f, int key) +template +void event_loop::_async_wait_fd(int fd, F f, int key) { // add descriptor if (_descriptor_map.find(key) == _descriptor_map.end()) @@ -110,30 +108,14 @@ void event_loop::_add_reader_or_writer(int fd, object f, int key) } _descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, - boost::asio::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec) + boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec) { - // move descriptor - auto iter = loop->_descriptor_map.find(key); - if (iter != loop->_descriptor_map.end()) - { - iter->second->release(); - loop->_descriptor_map.erase(iter); - } - loop->call_soon(f); + _descriptor_map.erase(key); + f(); })); return; } -void event_loop::_remove_reader_or_writer(int key) -{ - auto iter = _descriptor_map.find(key); - if (iter != _descriptor_map.end()) - { - iter->second->release(); - _descriptor_map.erase(iter); - } -} - void event_loop::call_later(double delay, object f) { auto p_timer = std::make_shared(_strand.context(), @@ -156,13 +138,13 @@ object event_loop::sock_recv(object sock, size_t nbytes) int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); object py_fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd_dup, make_function( - [py_fut, nbytes, fd=fd_dup] (object obj) { + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); }, - default_call_policies(), boost::mpl::vector())); + _read_key(fd)); return py_fut; } @@ -172,13 +154,13 @@ object event_loop::sock_recv_into(object sock, object buffer) int fd_dup = dup(fd); ssize_t nbytes = len(buffer); object py_fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd_dup, make_function( - [py_fut, nbytes, fd=fd_dup] (object obj) { + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); ssize_t nbytes_read = read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(nbytes_read); - }, - default_call_policies(), boost::mpl::vector())); + }, + _read_key(fd)); return py_fut; } @@ -189,12 +171,12 @@ object event_loop::sock_sendall(object sock, object data) char const* py_str = extract(data.attr("decode")()); ssize_t py_str_len = len(data); object py_fut = _pymod_concurrent_future.attr("Future")(); - add_writer(fd_dup, make_function( - [py_fut, fd, py_str, py_str_len] (object obj) { + _async_wait_fd(fd_dup, + [py_fut, fd, py_str, py_str_len] { write(fd, py_str, py_str_len); py_fut.attr("set_result")(object()); - }, - default_call_policies(), boost::mpl::vector())); + }, + _write_key(fd)); return py_fut; } @@ -205,12 +187,12 @@ object event_loop::sock_connect(object sock, object address) { // TODO: _ensure_resolve } - object fut = _pymod_concurrent_future.attr("Future")(); + object py_fut = _pymod_concurrent_future.attr("Future")(); int fd = extract(sock.attr("fileno")()); try { sock.attr("connect")(address); - fut.attr("set_result")(object()); + py_fut.attr("set_result")(object()); } catch (const error_already_set& e) { @@ -218,9 +200,7 @@ object event_loop::sock_connect(object sock, object address) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - add_writer(dup(fd), make_function(bind( - _sock_connect_cb, _pymod_socket, fut, sock, address), - default_call_policies(), boost::mpl::vector())); + _async_wait_fd(dup(fd), bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd)); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) @@ -230,17 +210,17 @@ object event_loop::sock_connect(object sock, object address) else { PyErr_Clear(); - fut.attr("set_exception")(std::current_exception()); + py_fut.attr("set_exception")(std::current_exception()); } } - return fut; + return py_fut; } object event_loop::sock_accept(object sock) { - object fut = _pymod_concurrent_future.attr("Future")(); - _sock_accept(*this, fut, sock); - return fut; + object py_fut = _pymod_concurrent_future.attr("Future")(); + _sock_accept(*this, py_fut, sock); + return py_fut; } // TODO: implement this @@ -263,26 +243,22 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) { object py_fut = _pymod_concurrent_future.attr("Future")(); - call_soon(make_function( - [this, py_fut, host, port, family, type, proto, flags] (object obj) { + _strand.post( + [this, py_fut, host, port, family, type, proto, flags] { object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); py_fut.attr("set_result")(res); - }, - default_call_policies(), - boost::mpl::vector())); + }); return py_fut; } object event_loop::getnameinfo(object sockaddr, int flags) { object py_fut = _pymod_concurrent_future.attr("Future")(); - call_soon(make_function( - [this, py_fut, sockaddr, flags] (object obj) { + _strand.post( + [this, py_fut, sockaddr, flags] { object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags); py_fut.attr("set_result")(res); - }, - default_call_policies(), - boost::mpl::vector())); + }); return py_fut; }