diff --git a/TODO.rst b/TODO.rst index 6157072..2a28651 100644 --- a/TODO.rst +++ b/TODO.rst @@ -1,6 +1,8 @@ TODO ---- +- Add timeouts to chain.Client. + - Dogtag stats. - Implement fast/full sync and read/write pauses. diff --git a/wade/chain.py b/wade/chain.py index 4c27f45..a73b834 100644 --- a/wade/chain.py +++ b/wade/chain.py @@ -685,7 +685,7 @@ class Client(object): def __init__(self, conf, timeout): self._conf = conf - self._chorus_client = chorus.Client(self._conf['nodes'], timeout) + self._chorus_client = chorus.Client(self._conf['nodes']) self._peer_ids = self._conf['nodes'].keys() def close(self): diff --git a/wade/chorus.py b/wade/chorus.py index 32c608a..b029970 100644 --- a/wade/chorus.py +++ b/wade/chorus.py @@ -8,18 +8,13 @@ import logging import socket import threading +import Queue from functools import partial from collections import namedtuple -from select import select +from contextlib import contextmanager import pyuv import msgpack -from posix_ipc import BusyError -from posix_ipc import Semaphore -from posix_ipc import O_CREX - -from circular_buffer import CircularBuffer -from circular_buffer import CircularBufferError OK = 'ok' @@ -27,11 +22,6 @@ CALL_LIMIT = 'call_limit' PEER_DISCONNECT = 'peer_disconnect' TIMEOUT = 'timeout' -RECV_BYTES = 1024 * 64 -SEND_BYTES = 1024 * 64 -OUTGOING_PEER_BUFFER_SIZE = 1024 ** 2 # 1 MB, controls max request size -SOCKET_CREATION_TIMEOUT = 1 -SELECT_TIMEOUT = 1 # timeout for select.select class NodeError(Exception): @@ -330,294 +320,67 @@ def _unwind_outgoing(self, outgoing): outgoing.handle.close() -""" Begin client code """ - - -class ClientError(Exception): - pass - - -class Peer(object): - def __init__(self, peer_id, socket): - self.peer_id = peer_id - self.socket = socket - self.pending_requests = {} # req_id -> ValueEvent - self.incoming_buffer = msgpack.Unpacker() - self.outgoing_buffer = CircularBuffer(OUTGOING_PEER_BUFFER_SIZE) - self.lock = threading.Lock() - - def close(self): - try: - self.lock.release() - except threading.ThreadError: - pass # lock may not have been acquired. - self.socket.close() - - def add_request(self, req_id, request_bytes, value_event): - """Add ValueEvent to pending requests and write request_bytes to - outgoing_buffer""" - with self.lock: - self.pending_requests[req_id] = value_event - self.outgoing_buffer.write(request_bytes) - - def handle_response(self, response): - """Extract req_id from response and check if there is a pending request - for the req_id. If so set the value. - """ - with self.lock: - req_id, status, message = response - if req_id in self.pending_requests: # request may have timed out - self.pending_requests[req_id].set((status, message)) - - def remove_request(self, req_id): - with self.lock: - del self.pending_requests[req_id] - - -class ValueEventTimeout(Exception): - def __init__(self, timeout): - super(ValueEventTimeout, self).__init__( - 'Attempt to wait for ValueEvent timed out after %s s' % timeout - ) - - -class ValueEvent(object): - """Provides behavior similar to threading.Event. However, this - allows associating a value when "setting" (or notifying) the Event object. - - The ValueEvent object is used to communicate between two threads when one - thread needs a value that is computed by another. The reading thread waits - on the value (but does not set it), and the writing thread sets the value - then triggers the event. In other words, there is exactly one write to the - value from one thread, and one read from a different thread, and the - ValueEvent object guarantees they don't overlap so long as the get and set - methods are used by the reading and writing thread, respectively. - - Additionally, Python's threading.Event object provides a timeout feature - like ours but introduces unacceptable delays. For that reason we use - posix_ipc.Semaphore to mimic threading.Event without the performance - penalty. - - http://stackoverflow.com/questions/21779183/python-eventwait-with-timeout-gives-delay - """ - def __init__(self, name=None): - self._value = None - self._semaphore = Semaphore(name, flags=O_CREX) - - def __del__(self): - # If unlink isn't explicitly called the OS will *not* release the - # semaphore object, even if the program crashes. We may want to spawn - # a new process to manage them or give the semaphores known names when - # creating them so they can be reclaimed on restart. - self._semaphore.unlink() - - def get(self, timeout): - try: - self._semaphore.acquire(timeout) - except BusyError: - raise ValueEventTimeout(timeout) - return self._value - - def set(self, value): - self._value = value - self._semaphore.release() - class Client(object): - """Multithreaded client implementation. - """ - - def __init__(self, conf, timeout): + def __init__(self, conf, per_peer_pool_size=4): self._conf = conf - self._timeout = timeout - self._unpacker = msgpack.Unpacker() - self._packer = msgpack.Packer() + self._recv_bytes = 64 * 1024 + self._per_peer_pool_size = per_peer_pool_size + self._next_req_id = 1 + self._next_req_id_lock = threading.Lock() + + self._sockets = {} + for peer_id in self._conf.keys(): + self._sockets[peer_id] = Queue.Queue(self._per_peer_pool_size) + for i in xrange(self._per_peer_pool_size): + self._sockets[peer_id].put([None]) + + @contextmanager + def _get_peer_connection(self, peer_id): + # fixme: handle timeout + entry = self._sockets[peer_id].get(block=True) + sock = entry[0] + if sock is None: + sock = socket.create_connection(self._conf[peer_id]) + entry[0] = sock - # connection variables - self._peers = {} # peer_id -> Peer - self._sock_to_peer = {} # socket.connection -> Peer - self._peers_lock = threading.Lock() # for _peers and _sock_to_peers + try: + yield sock + except socket.error as e: + logging.warning( + "disconnect from peer %d due to socket error: %s", + peer_id, + e, + ) + del sock + entry[0] = None - # request / response variables - self._req_count = 0 - self._req_count_lock = threading.Lock() + self._sockets[peer_id].put(entry) - # For reuse of ValueEvent objects by a thread. - self._threadlocal = threading.local() + def reqrep(self, peer_id, message): + """Grabs a connection from the pool and makes a synchronous + request/reply call. - self._patch_client_for_gevent() + """ - self._bg_thread = threading.Thread( - target=self._process_requests_in_background - ) - self._bg_thread.setDaemon(True) - self._bg_thread.start() + with self._next_req_id_lock: + req_id = self._next_req_id + self._next_req_id += 1 - def _patch_client_for_gevent(self): - try: - import gevent - import gevent.monkey - except ImportError: - gevent_enabled = False - else: - gevent_enabled = bool(gevent.monkey.saved) - - if gevent_enabled: - self._Timeout = gevent.Timeout - self._sleep = gevent.sleep - self._get_value_event = lambda: gevent.event.AsyncResult() - else: - self._Timeout = ValueEventTimeout - self._sleep = lambda _: None - self._get_value_event = self._ensure_value_event - - def _ensure_value_event(self): - """ - A ValueEvent object is used by one thread at a time so it can be - safely reused instead of creating a new one for each call to reqrep. - Creating a ValueEvent is expensive, see it's docstring. - """ - ev = getattr(self._threadlocal, 'event', None) - if ev is None: - ev = ValueEvent() - self._threadlocal.event = ev - return ev - - def _bg_clean_up_peer(self, peer): - """Remove peer and associated sockets from _peers and _sock_to_peer. - Also close the peer. - """ - with self._peers_lock: - del self._peers[peer.peer_id] - del self._sock_to_peer[peer.socket] - peer.close() - peer = None - - def _bg_select_peers(self, timeout=SELECT_TIMEOUT): - """Similar to select.select, but instead of returning sockets this - returns the associated Peer objects. - """ - with self._peers_lock: - socks = self._sock_to_peer.keys() - - if not socks: - self._sleep(0) - return [], [], [] - - readable, writable, exceptional = select(socks, socks, socks, timeout) - with self._peers_lock: - readable = [self._sock_to_peer[s] for s in readable] - writable = [self._sock_to_peer[s] for s in writable] - exceptional = [self._sock_to_peer[s] for s in exceptional] - return readable, writable, exceptional - - def _process_requests_in_background(self): - """Executes forever, handles sending and receiving messages from peers. - Meant to be called in a background thread (see constructor). - - This checks for peers with readable data (added by reqrep) and feeds the - data from its socket into the peer's incoming_buffer until a message can - be read. The request id is read from the message and if a pending - request exists the associated ValueEvent object is "set" with the - message contents. - - For each writable peer, this function reads data from the peer's - outgoing buffer, if it's not empty, and sends it along on the peer's - socket. - - This loop also handles client disconnects / errors. - """ - while True: - readable, writable, exceptional = self._bg_select_peers() - - for peer in readable: - data = peer.socket.recv(RECV_BYTES) - if data: - peer.incoming_buffer.feed(data) - try: - response = peer.incoming_buffer.unpack() - except msgpack.OutOfData: - continue - peer.handle_response(response) - else: - self._bg_clean_up_peer(peer) - if peer in writable: - writable.remove(peer) - if peer in exceptional: - exceptional.remove(peer) - - for peer in writable: - # single-reader configuration means we can safely unlock between - # peeking and committing. - with peer.lock: - next_bytes = peer.outgoing_buffer.peek(SEND_BYTES) - if not next_bytes: - continue - - sent_bytes = peer.socket.send(next_bytes) - if sent_bytes == 0: - self._bg_clean_up_peer(peer) - if peer in exceptional: - exceptional.remove(peer) - continue - - with peer.lock: - peer.outgoing_buffer.commit_read(sent_bytes) - - for peer in exceptional: - self._bg_clean_up_peer(peer) - - def _ensure_connection(self, peer_id, timeout=SOCKET_CREATION_TIMEOUT): - """Connects to the peer's socket and creates a Peer object. - - @return: Peer - """ - with self._peers_lock: - if peer_id not in self._peers: - sock = socket.create_connection(self._conf[peer_id], - timeout=timeout) - peer = Peer(peer_id, sock) - self._peers[peer_id] = peer - self._sock_to_peer[sock] = peer - - return self._peers[peer_id] - - def _inc_req_count(self): - with self._req_count_lock: - curr = self._req_count - self._req_count += 1 - return curr + with self._get_peer_connection(peer_id) as sock: + # We'll let socket errors propagate up so that we can get + # rid of it from the pool. - def close(self): - for peer in self._peers.values(): - self._bg_clean_up_peer(peer) - - def reqrep(self, peer_id, message, timeout=False): - """The message is serialized and added to the peer's outgoing buffer - with an associated request id (add_request). The request id is also used - to associate a ValueEvent (or AsyncResult) object with the call in the - peer's pending_requests dict such that the background thread, - _process_requests_in_background, can "respond" to the reqrep call by - setting the ValueEvent object. - - @param timeout, float. In seconds, optional if set on client - initialization. Specify None for no timeout. - """ - if timeout is False and self._timeout is False: - raise ClientError('must specify timeout value') - elif timeout is False: - timeout = self._timeout + packer = msgpack.Packer() + sock.sendall(packer.pack([req_id, message])) - req_id = self._inc_req_count() - peer = self._ensure_connection(peer_id) - value_event = self._get_value_event() - outgoing_bytes = self._packer.pack([req_id, message]) + unpacker = msgpack.Unpacker() + while True: + data = sock.recv(self._recv_bytes) + unpacker.feed(data) + for payload in unpacker: + req_id, status, message = payload + return status, message - peer.add_request(req_id, outgoing_bytes, value_event) - try: - status, message = value_event.get(timeout) - except self._Timeout: - status = TIMEOUT - message = 'request timed out after %s s' % (timeout) - peer.remove_request(req_id) - return status, message + def close(self): + pass diff --git a/wade/circular_buffer.py b/wade/circular_buffer.py deleted file mode 100644 index 360752a..0000000 --- a/wade/circular_buffer.py +++ /dev/null @@ -1,108 +0,0 @@ - - -class CircularBufferError(Exception): - pass - - -class CircularBuffer(object): - """Unlike traditional circular buffers, this allows reading and writing - multiple values at a time. Additionally, this object provides peeking and - commiting reads. See test_circular_buffer.py for examples. - - The interface is almost identical to the following code but unlike it our - implementation efficiently frees up space when commiting reads: - http://c.learncodethehardway.org/book/ex44.html - """ - def __init__(self, capacity): - self._capacity = capacity - self._length = self._capacity + 1 - self._data = bytearray([0] * self._length) - self._start = 0 - self._end = 0 - - def __len__(self): - return self.available_data() - - def __repr__(self): - return 'CircularBuffer(%s, length=%d, free=%d, capacity=%d)' % \ - (self.peek_all(), - self.available_data(), - self.available_space(), - self._capacity) - - def _read(self, amount, commit): - """Read up to amount and return a list. May return less data than - requested when amount > available_data(). It is the caller's - responsibility to check the length of the returned result list. - - @return: bytearray - """ - if amount <= 0: - raise CircularBufferError('Must request a positive amount of data') - if not self.available_data(): - return bytearray() - - amount = min(amount, self.available_data()) - read_end = self._start + amount - - if read_end < self._length: - ret = self._data[self._start:read_end] - else: - ret = self._data[self._start:] + self._data[:(read_end - self._length)] - - if commit: - self.commit_read(amount) - - return ret - - def available_data(self): - return (self._end - self._start) % self._length - - def available_space(self): - return self._capacity - self.available_data() - - def commit_read(self, amount): - self._start = (self._start + amount) % self._length - - def commit_write(self, amount): - self._end = (self._end + amount) % self._length - - def read(self, amount): - return self._read(amount, commit=True) - - def read_all(self): - if not self.available_data(): - return bytearray() - return self.read(self.available_data()) - - def peek(self, amount): - return self._read(amount, commit=False) - - def peek_all(self): - if not self.available_data(): - return bytearray() - return self.peek(self.available_data()) - - def write(self, data): - """Writes a string or bytes into the buffer if it fits. - - @param data, str or byte - """ - amount = len(data) - if amount > self.available_space(): - raise CircularBufferError( - 'Not enough space: %d requested, %d available' % \ - (amount, self.available_space())) - - write_end = self._end + amount - - if write_end < self._length: # if no wrap around - self._data[self._end:write_end] = data - else: # if wrap around - partition = self._length - self._end - end_block = data[:partition] - start_block = data[partition:] - self._data[self._end:] = end_block # write at end of buffer - self._data[:len(start_block)] = start_block # write leftover at beginning - - self.commit_write(amount)