Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection pooled chorus client implementation. #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions TODO.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
TODO
----

- Add timeouts to chain.Client.

- Dogtag stats.

- Implement fast/full sync and read/write pauses.
Expand Down
2 changes: 1 addition & 1 deletion wade/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ class Client(object):

def __init__(self, conf, timeout):
self._conf = conf
self._chorus_client = chorus.Client(self._conf['nodes'], timeout)
self._chorus_client = chorus.Client(self._conf['nodes'])
self._peer_ids = self._conf['nodes'].keys()

def close(self):
Expand Down
345 changes: 54 additions & 291 deletions wade/chorus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,20 @@
import logging
import socket
import threading
import Queue
from functools import partial
from collections import namedtuple
from select import select
from contextlib import contextmanager

import pyuv
import msgpack
from posix_ipc import BusyError
from posix_ipc import Semaphore
from posix_ipc import O_CREX

from circular_buffer import CircularBuffer
from circular_buffer import CircularBufferError


OK = 'ok'
ERR = 'err'
CALL_LIMIT = 'call_limit'
PEER_DISCONNECT = 'peer_disconnect'
TIMEOUT = 'timeout'
RECV_BYTES = 1024 * 64
SEND_BYTES = 1024 * 64
OUTGOING_PEER_BUFFER_SIZE = 1024 ** 2 # 1 MB, controls max request size
SOCKET_CREATION_TIMEOUT = 1
SELECT_TIMEOUT = 1 # timeout for select.select


class NodeError(Exception):
Expand Down Expand Up @@ -330,294 +320,67 @@ def _unwind_outgoing(self, outgoing):
outgoing.handle.close()


""" Begin client code """


class ClientError(Exception):
pass


class Peer(object):
def __init__(self, peer_id, socket):
self.peer_id = peer_id
self.socket = socket
self.pending_requests = {} # req_id -> ValueEvent
self.incoming_buffer = msgpack.Unpacker()
self.outgoing_buffer = CircularBuffer(OUTGOING_PEER_BUFFER_SIZE)
self.lock = threading.Lock()

def close(self):
try:
self.lock.release()
except threading.ThreadError:
pass # lock may not have been acquired.
self.socket.close()

def add_request(self, req_id, request_bytes, value_event):
"""Add ValueEvent to pending requests and write request_bytes to
outgoing_buffer"""
with self.lock:
self.pending_requests[req_id] = value_event
self.outgoing_buffer.write(request_bytes)

def handle_response(self, response):
"""Extract req_id from response and check if there is a pending request
for the req_id. If so set the value.
"""
with self.lock:
req_id, status, message = response
if req_id in self.pending_requests: # request may have timed out
self.pending_requests[req_id].set((status, message))

def remove_request(self, req_id):
with self.lock:
del self.pending_requests[req_id]


class ValueEventTimeout(Exception):
def __init__(self, timeout):
super(ValueEventTimeout, self).__init__(
'Attempt to wait for ValueEvent timed out after %s s' % timeout
)


class ValueEvent(object):
"""Provides behavior similar to threading.Event. However, this
allows associating a value when "setting" (or notifying) the Event object.

The ValueEvent object is used to communicate between two threads when one
thread needs a value that is computed by another. The reading thread waits
on the value (but does not set it), and the writing thread sets the value
then triggers the event. In other words, there is exactly one write to the
value from one thread, and one read from a different thread, and the
ValueEvent object guarantees they don't overlap so long as the get and set
methods are used by the reading and writing thread, respectively.

Additionally, Python's threading.Event object provides a timeout feature
like ours but introduces unacceptable delays. For that reason we use
posix_ipc.Semaphore to mimic threading.Event without the performance
penalty.

http://stackoverflow.com/questions/21779183/python-eventwait-with-timeout-gives-delay
"""
def __init__(self, name=None):
self._value = None
self._semaphore = Semaphore(name, flags=O_CREX)

def __del__(self):
# If unlink isn't explicitly called the OS will *not* release the
# semaphore object, even if the program crashes. We may want to spawn
# a new process to manage them or give the semaphores known names when
# creating them so they can be reclaimed on restart.
self._semaphore.unlink()

def get(self, timeout):
try:
self._semaphore.acquire(timeout)
except BusyError:
raise ValueEventTimeout(timeout)
return self._value

def set(self, value):
self._value = value
self._semaphore.release()


class Client(object):
"""Multithreaded client implementation.
"""

def __init__(self, conf, timeout):
def __init__(self, conf, per_peer_pool_size=4):
self._conf = conf
self._timeout = timeout
self._unpacker = msgpack.Unpacker()
self._packer = msgpack.Packer()
self._recv_bytes = 64 * 1024
self._per_peer_pool_size = per_peer_pool_size
self._next_req_id = 1
self._next_req_id_lock = threading.Lock()

self._sockets = {}
for peer_id in self._conf.keys():
self._sockets[peer_id] = Queue.Queue(self._per_peer_pool_size)
for i in xrange(self._per_peer_pool_size):
self._sockets[peer_id].put([None])

@contextmanager
def _get_peer_connection(self, peer_id):
# fixme: handle timeout
entry = self._sockets[peer_id].get(block=True)
sock = entry[0]
if sock is None:
sock = socket.create_connection(self._conf[peer_id])
entry[0] = sock

# connection variables
self._peers = {} # peer_id -> Peer
self._sock_to_peer = {} # socket.connection -> Peer
self._peers_lock = threading.Lock() # for _peers and _sock_to_peers
try:
yield sock
except socket.error as e:
logging.warning(
"disconnect from peer %d due to socket error: %s",
peer_id,
e,
)
del sock
entry[0] = None

# request / response variables
self._req_count = 0
self._req_count_lock = threading.Lock()
self._sockets[peer_id].put(entry)

# For reuse of ValueEvent objects by a thread.
self._threadlocal = threading.local()
def reqrep(self, peer_id, message):
"""Grabs a connection from the pool and makes a synchronous
request/reply call.

self._patch_client_for_gevent()
"""

self._bg_thread = threading.Thread(
target=self._process_requests_in_background
)
self._bg_thread.setDaemon(True)
self._bg_thread.start()
with self._next_req_id_lock:
req_id = self._next_req_id
self._next_req_id += 1

def _patch_client_for_gevent(self):
try:
import gevent
import gevent.monkey
except ImportError:
gevent_enabled = False
else:
gevent_enabled = bool(gevent.monkey.saved)

if gevent_enabled:
self._Timeout = gevent.Timeout
self._sleep = gevent.sleep
self._get_value_event = lambda: gevent.event.AsyncResult()
else:
self._Timeout = ValueEventTimeout
self._sleep = lambda _: None
self._get_value_event = self._ensure_value_event

def _ensure_value_event(self):
"""
A ValueEvent object is used by one thread at a time so it can be
safely reused instead of creating a new one for each call to reqrep.
Creating a ValueEvent is expensive, see it's docstring.
"""
ev = getattr(self._threadlocal, 'event', None)
if ev is None:
ev = ValueEvent()
self._threadlocal.event = ev
return ev

def _bg_clean_up_peer(self, peer):
"""Remove peer and associated sockets from _peers and _sock_to_peer.
Also close the peer.
"""
with self._peers_lock:
del self._peers[peer.peer_id]
del self._sock_to_peer[peer.socket]
peer.close()
peer = None

def _bg_select_peers(self, timeout=SELECT_TIMEOUT):
"""Similar to select.select, but instead of returning sockets this
returns the associated Peer objects.
"""
with self._peers_lock:
socks = self._sock_to_peer.keys()

if not socks:
self._sleep(0)
return [], [], []

readable, writable, exceptional = select(socks, socks, socks, timeout)
with self._peers_lock:
readable = [self._sock_to_peer[s] for s in readable]
writable = [self._sock_to_peer[s] for s in writable]
exceptional = [self._sock_to_peer[s] for s in exceptional]
return readable, writable, exceptional

def _process_requests_in_background(self):
"""Executes forever, handles sending and receiving messages from peers.
Meant to be called in a background thread (see constructor).

This checks for peers with readable data (added by reqrep) and feeds the
data from its socket into the peer's incoming_buffer until a message can
be read. The request id is read from the message and if a pending
request exists the associated ValueEvent object is "set" with the
message contents.

For each writable peer, this function reads data from the peer's
outgoing buffer, if it's not empty, and sends it along on the peer's
socket.

This loop also handles client disconnects / errors.
"""
while True:
readable, writable, exceptional = self._bg_select_peers()

for peer in readable:
data = peer.socket.recv(RECV_BYTES)
if data:
peer.incoming_buffer.feed(data)
try:
response = peer.incoming_buffer.unpack()
except msgpack.OutOfData:
continue
peer.handle_response(response)
else:
self._bg_clean_up_peer(peer)
if peer in writable:
writable.remove(peer)
if peer in exceptional:
exceptional.remove(peer)

for peer in writable:
# single-reader configuration means we can safely unlock between
# peeking and committing.
with peer.lock:
next_bytes = peer.outgoing_buffer.peek(SEND_BYTES)
if not next_bytes:
continue

sent_bytes = peer.socket.send(next_bytes)
if sent_bytes == 0:
self._bg_clean_up_peer(peer)
if peer in exceptional:
exceptional.remove(peer)
continue

with peer.lock:
peer.outgoing_buffer.commit_read(sent_bytes)

for peer in exceptional:
self._bg_clean_up_peer(peer)

def _ensure_connection(self, peer_id, timeout=SOCKET_CREATION_TIMEOUT):
"""Connects to the peer's socket and creates a Peer object.

@return: Peer
"""
with self._peers_lock:
if peer_id not in self._peers:
sock = socket.create_connection(self._conf[peer_id],
timeout=timeout)
peer = Peer(peer_id, sock)
self._peers[peer_id] = peer
self._sock_to_peer[sock] = peer

return self._peers[peer_id]

def _inc_req_count(self):
with self._req_count_lock:
curr = self._req_count
self._req_count += 1
return curr
with self._get_peer_connection(peer_id) as sock:
# We'll let socket errors propagate up so that we can get
# rid of it from the pool.

def close(self):
for peer in self._peers.values():
self._bg_clean_up_peer(peer)

def reqrep(self, peer_id, message, timeout=False):
"""The message is serialized and added to the peer's outgoing buffer
with an associated request id (add_request). The request id is also used
to associate a ValueEvent (or AsyncResult) object with the call in the
peer's pending_requests dict such that the background thread,
_process_requests_in_background, can "respond" to the reqrep call by
setting the ValueEvent object.

@param timeout, float. In seconds, optional if set on client
initialization. Specify None for no timeout.
"""
if timeout is False and self._timeout is False:
raise ClientError('must specify timeout value')
elif timeout is False:
timeout = self._timeout
packer = msgpack.Packer()
sock.sendall(packer.pack([req_id, message]))

req_id = self._inc_req_count()
peer = self._ensure_connection(peer_id)
value_event = self._get_value_event()
outgoing_bytes = self._packer.pack([req_id, message])
unpacker = msgpack.Unpacker()
while True:
data = sock.recv(self._recv_bytes)
unpacker.feed(data)
for payload in unpacker:
req_id, status, message = payload
return status, message

peer.add_request(req_id, outgoing_bytes, value_event)
try:
status, message = value_event.get(timeout)
except self._Timeout:
status = TIMEOUT
message = 'request timed out after %s s' % (timeout)
peer.remove_request(req_id)
return status, message
def close(self):
pass
Loading