Skip to content

Commit

Permalink
replace make_function with lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
philoinovsky committed Jul 11, 2021
1 parent 914a18d commit f6795ba
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 69 deletions.
28 changes: 20 additions & 8 deletions include/boost/python/eventloop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ class event_loop
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
inline void call_soon(object f)
{
_strand.post([f, loop=this] {f();});
return;
_strand.post([f]{f();});
}

// TODO: implement this
Expand All @@ -57,22 +56,22 @@ class event_loop

inline void add_reader(int fd, object f)
{
_add_reader_or_writer(fd, f, fd * 2);
_async_wait_fd(fd, f, _read_key(fd));
}

inline void remove_reader(int fd)
{
_remove_reader_or_writer(fd * 2);
_descriptor_map.erase(_read_key(fd));
}

inline void add_writer(int fd, object f)
{
_add_reader_or_writer(fd, f, fd * 2 + 1);
_async_wait_fd(fd, f, _write_key(fd));
}

inline void remove_writer(int fd)
{
_remove_reader_or_writer(fd * 2 + 1);
_descriptor_map.erase(_write_key(fd));
}


Expand Down Expand Up @@ -127,8 +126,21 @@ class event_loop
std::unordered_map<int, std::unique_ptr<boost::asio::posix::stream_descriptor>> _descriptor_map;
std::chrono::steady_clock::time_point _created_time;

void _add_reader_or_writer(int fd, object f, int key);
void _remove_reader_or_writer(int key);
inline int _read_key(int fd)
{
return fd * 2;
}

inline int _write_key(int fd)
{
return fd * 2 + 1;
}

template<typename F>
void _async_wait_fd(int fd, F f, int key);

static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr);
static void _sock_accept(event_loop& loop, object fut, object sock);
};

}}}
Expand Down
98 changes: 37 additions & 61 deletions src/eventloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ bool _hasattr(object o, const char* name)
return PyObject_HasAttrString(o.ptr(), name);
}

void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr)
}

void event_loop::_sock_connect_cb(object pymod_socket, object fut, object sock, object addr)
{
try
{
Expand Down Expand Up @@ -61,11 +63,10 @@ void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr)
}
}

void _sock_accept(event_loop& loop, object fut, object sock)
void event_loop::_sock_accept(event_loop& loop, object fut, object sock)
{
int fd = extract<int>(sock.attr("fileno")());
object conn;
object address;
object conn, address;
try
{
object ret = sock.attr("accept")();
Expand All @@ -80,9 +81,7 @@ void _sock_accept(event_loop& loop, object fut, object sock)
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
{
PyErr_Clear();
loop.add_reader(fd, make_function(bind(
_sock_accept, boost::ref(loop), fut, sock),
default_call_policies(), boost::mpl::vector<void, object>()));
loop._async_wait_fd(fd, bind(_sock_accept, boost::ref(loop), fut, sock), loop._write_key(fd));
}
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
Expand All @@ -94,12 +93,11 @@ void _sock_accept(event_loop& loop, object fut, object sock)
PyErr_Clear();
fut.attr("set_exception")(std::current_exception());
}
}
}

}
}

void event_loop::_add_reader_or_writer(int fd, object f, int key)
template<typename F>
void event_loop::_async_wait_fd(int fd, F f, int key)
{
// add descriptor
if (_descriptor_map.find(key) == _descriptor_map.end())
Expand All @@ -110,30 +108,14 @@ void event_loop::_add_reader_or_writer(int fd, object f, int key)
}

_descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read,
boost::asio::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec)
boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec)
{
// move descriptor
auto iter = loop->_descriptor_map.find(key);
if (iter != loop->_descriptor_map.end())
{
iter->second->release();
loop->_descriptor_map.erase(iter);
}
loop->call_soon(f);
_descriptor_map.erase(key);
f();
}));
return;
}

void event_loop::_remove_reader_or_writer(int key)
{
auto iter = _descriptor_map.find(key);
if (iter != _descriptor_map.end())
{
iter->second->release();
_descriptor_map.erase(iter);
}
}

void event_loop::call_later(double delay, object f)
{
auto p_timer = std::make_shared<boost::asio::steady_timer>(_strand.context(),
Expand All @@ -156,13 +138,13 @@ object event_loop::sock_recv(object sock, size_t nbytes)
int fd = extract<int>(sock.attr("fileno")());
int fd_dup = dup(fd);
object py_fut = _pymod_concurrent_future.attr("Future")();
add_reader(fd_dup, make_function(
[py_fut, nbytes, fd=fd_dup] (object obj) {
_async_wait_fd(fd_dup,
[py_fut, nbytes, fd=fd_dup] {
std::vector<char> buffer(nbytes);
read(fd, buffer.data(), nbytes);
py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes))));
},
default_call_policies(), boost::mpl::vector<void, object>()));
_read_key(fd));
return py_fut;
}

Expand All @@ -172,13 +154,13 @@ object event_loop::sock_recv_into(object sock, object buffer)
int fd_dup = dup(fd);
ssize_t nbytes = len(buffer);
object py_fut = _pymod_concurrent_future.attr("Future")();
add_reader(fd_dup, make_function(
[py_fut, nbytes, fd=fd_dup] (object obj) {
_async_wait_fd(fd_dup,
[py_fut, nbytes, fd=fd_dup] {
std::vector<char> buffer(nbytes);
ssize_t nbytes_read = read(fd, buffer.data(), nbytes);
py_fut.attr("set_result")(nbytes_read);
},
default_call_policies(), boost::mpl::vector<void, object>()));
},
_read_key(fd));
return py_fut;
}

Expand All @@ -189,12 +171,12 @@ object event_loop::sock_sendall(object sock, object data)
char const* py_str = extract<char const*>(data.attr("decode")());
ssize_t py_str_len = len(data);
object py_fut = _pymod_concurrent_future.attr("Future")();
add_writer(fd_dup, make_function(
[py_fut, fd, py_str, py_str_len] (object obj) {
_async_wait_fd(fd_dup,
[py_fut, fd, py_str, py_str_len] {
write(fd, py_str, py_str_len);
py_fut.attr("set_result")(object());
},
default_call_policies(), boost::mpl::vector<void, object>()));
},
_write_key(fd));
return py_fut;
}

Expand All @@ -205,22 +187,20 @@ object event_loop::sock_connect(object sock, object address)
{
// TODO: _ensure_resolve
}
object fut = _pymod_concurrent_future.attr("Future")();
object py_fut = _pymod_concurrent_future.attr("Future")();
int fd = extract<int>(sock.attr("fileno")());
try
{
sock.attr("connect")(address);
fut.attr("set_result")(object());
py_fut.attr("set_result")(object());
}
catch (const error_already_set& e)
{
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
{
PyErr_Clear();
add_writer(dup(fd), make_function(bind(
_sock_connect_cb, _pymod_socket, fut, sock, address),
default_call_policies(), boost::mpl::vector<void, object>()));
_async_wait_fd(dup(fd), bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd));
}
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
Expand All @@ -230,17 +210,17 @@ object event_loop::sock_connect(object sock, object address)
else
{
PyErr_Clear();
fut.attr("set_exception")(std::current_exception());
py_fut.attr("set_exception")(std::current_exception());
}
}
return fut;
return py_fut;
}

object event_loop::sock_accept(object sock)
{
object fut = _pymod_concurrent_future.attr("Future")();
_sock_accept(*this, fut, sock);
return fut;
object py_fut = _pymod_concurrent_future.attr("Future")();
_sock_accept(*this, py_fut, sock);
return py_fut;
}

// TODO: implement this
Expand All @@ -263,26 +243,22 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex
object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags)
{
object py_fut = _pymod_concurrent_future.attr("Future")();
call_soon(make_function(
[this, py_fut, host, port, family, type, proto, flags] (object obj) {
_strand.post(
[this, py_fut, host, port, family, type, proto, flags] {
object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags);
py_fut.attr("set_result")(res);
},
default_call_policies(),
boost::mpl::vector<void, object>()));
});
return py_fut;
}

object event_loop::getnameinfo(object sockaddr, int flags)
{
object py_fut = _pymod_concurrent_future.attr("Future")();
call_soon(make_function(
[this, py_fut, sockaddr, flags] (object obj) {
_strand.post(
[this, py_fut, sockaddr, flags] {
object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags);
py_fut.attr("set_result")(res);
},
default_call_policies(),
boost::mpl::vector<void, object>()));
});
return py_fut;
}

Expand Down

0 comments on commit f6795ba

Please sign in to comment.