Skip to content

Commit

Permalink
split implementations of functions out to eventloop.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
philoinovsky committed Jun 26, 2021
1 parent 77c094e commit c5fa9ae
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 99 deletions.
1 change: 1 addition & 0 deletions build/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ lib boost_python
import.cpp
exec.cpp
object/function_doc_signature.cpp
eventloop.cpp
: # requirements
<link>static:<define>BOOST_PYTHON_STATIC_LIB
<define>BOOST_PYTHON_SOURCE
Expand Down
116 changes: 18 additions & 98 deletions include/boost/python/eventloop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@
# ifndef EVENT_LOOP_PY2021_H_
# define EVENT_LOOP_PY2021_H_

#include <mutex>
#include <functional>
#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/python.hpp>

namespace a = boost::asio;
Expand All @@ -32,40 +29,8 @@ class EventLoop
std::unordered_map<int, std::unique_ptr<a::posix::stream_descriptor>> _descriptor_map;
std::chrono::steady_clock::time_point _created_time;

void _add_reader_or_writer(int fd, py::object f, int key)
{
// add descriptor
if (_descriptor_map.find(key) == _descriptor_map.end())
{
_descriptor_map.emplace(key,
std::move(std::make_unique<a::posix::stream_descriptor>(_strand.context(), fd))
);
}

_descriptor_map.find(key)->second->async_wait(a::posix::descriptor::wait_type::wait_read,
a::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec)
{
// move descriptor
auto iter = loop->_descriptor_map.find(key);
if (iter != loop->_descriptor_map.end())
{
iter->second->release();
loop->_descriptor_map.erase(iter);
}
loop->call_soon(f);
}));
return;
}

void _remove_reader_or_writer(int key)
{
auto iter = _descriptor_map.find(key);
if (iter != _descriptor_map.end())
{
iter->second->release();
_descriptor_map.erase(iter);
}
}
void _add_reader_or_writer(int fd, py::object f, int key);
void _remove_reader_or_writer(int key);

public:
EventLoop(a::io_context& ctx):
Expand All @@ -83,98 +48,53 @@ class EventLoop
}

// TODO: implement this
void call_soon_thread_safe(py::object f)
{
return;
}
inline void call_soon_thread_safe(py::object f) {};

// Schedule callback to be called after the given delay number of seconds
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
void call_later(double delay, py::object f)
{
// add timer
_id_to_timer_map.emplace(_timer_id,
std::move(std::make_unique<a::steady_timer>(_strand.context(),
std::chrono::steady_clock::now() + std::chrono::nanoseconds(int64_t(delay * 1e9))))
);

_id_to_timer_map.find(_timer_id)->second->async_wait(
// remove timer
a::bind_executor(_strand, [id=_timer_id, f, loop=this] (const boost::system::error_code& ec)
{
loop->_id_to_timer_map.erase(id);
loop->call_soon(f);
}));
_timer_id++;
}
void call_later(double delay, py::object f);

void call_at(double when, py::object f)
{
double diff = when - time();
if (diff > 0)
return call_later(diff, f);
return call_soon(f);
}
void call_at(double when, py::object f);

double time()
inline double time()
{
auto now = std::chrono::steady_clock::now();
std::chrono::duration<double> diff = now - _created_time;
return diff.count();
return static_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - _created_time).count();
}

// week 2 ......start......

void add_reader(int fd, py::object f)
inline void add_reader(int fd, py::object f)
{
_add_reader_or_writer(fd, f, fd * 2);
}

void remove_reader(int fd)
inline void remove_reader(int fd)
{
_remove_reader_or_writer(fd * 2);
}

void add_writer(int fd, py::object f)
inline void add_writer(int fd, py::object f)
{
_add_reader_or_writer(fd, f, fd * 2 + 1);
}

void remove_writer(int fd)
inline void remove_writer(int fd)
{
_remove_reader_or_writer(fd * 2 + 1);
}


void sock_recv()
{

}

void sock_recv_into()
{

}

void sock_sendall()
{
void sock_recv(py::object sock, int bytes);

}
void sock_recv_into(py::object sock, py::object buffer);

void sock_connect()
{
void sock_sendall(py::object sock, py::object data);

}

void sock_accept()
{
void sock_connect(py::object sock, py::object address);

}

void sock_sendfile()
{

}
void sock_accept(py::object sock);

void sock_sendfile(py::object sock, py::object file, int offset = 0, int count = 0, bool fallback = true);

// week 2 ......end......

Expand Down
111 changes: 111 additions & 0 deletions src/eventloop.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright Pan Yue 2021.
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

// TODO:
// 1. posix::stream_descriptor need windows version
// 2. call_* need return async.Handle

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/python.hpp>

namespace a = boost::asio;
namespace c = std::chrono;
namespace py = boost::python;

namespace boost { namespace python { namespace eventloop {

void EventLoop::_add_reader_or_writer(int fd, py::object f, int key)
{
// add descriptor
if (_descriptor_map.find(key) == _descriptor_map.end())
{
_descriptor_map.emplace(key,
std::move(std::make_unique<a::posix::stream_descriptor>(_strand.context(), fd))
);
}

_descriptor_map.find(key)->second->async_wait(a::posix::descriptor::wait_type::wait_read,
a::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec)
{
// move descriptor
auto iter = loop->_descriptor_map.find(key);
if (iter != loop->_descriptor_map.end())
{
iter->second->release();
loop->_descriptor_map.erase(iter);
}
loop->call_soon(f);
}));
return;
}

void EventLoop::_remove_reader_or_writer(int key)
{
auto iter = _descriptor_map.find(key);
if (iter != _descriptor_map.end())
{
iter->second->release();
_descriptor_map.erase(iter);
}
}

void EventLoop::call_later(double delay, py::object f)
{
// add timer
_id_to_timer_map.emplace(_timer_id,
std::move(std::make_unique<a::steady_timer>(_strand.context(),
std::chrono::steady_clock::now() + std::chrono::nanoseconds(int64_t(delay * 1e9))))
);

_id_to_timer_map.find(_timer_id)->second->async_wait(
// remove timer
a::bind_executor(_strand, [id=_timer_id, f, loop=this] (const boost::system::error_code& ec)
{
loop->_id_to_timer_map.erase(id);
loop->call_soon(f);
}));
_timer_id++;
}

void EventLoop::call_at(double when, py::object f)
{
double diff = when - time();
if (diff > 0)
return call_later(diff, f);
return call_soon(f);
}

void EventLoop::sock_recv(py::object sock, int bytes)
{

}

void EventLoop::sock_recv_into(py::object sock, py::object buffer)
{

}

void EventLoop::sock_sendall(py::object sock, py::object data)
{

}

void EventLoop::sock_connect(py::object sock, py::object address)
{

}

void EventLoop::sock_accept(py::object sock)
{

}

void EventLoop::sock_sendfile(py::object sock, py::object file, int offset, int count, bool fallback)
{

}

}}}
3 changes: 2 additions & 1 deletion src/fabscript
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ bpl = library('boost_python' + root.py_suffix,
'wrapper.cpp',
'import.cpp',
'exec.cpp',
'object/function_doc_signature.cpp'],
'object/function_doc_signature.cpp',
'eventloop.cpp'],
dependencies=root.config,
features=features + define('BOOST_PYTHON_SOURCE'))

Expand Down

0 comments on commit c5fa9ae

Please sign in to comment.