Skip to content

Commit

Permalink
workers/gthread: Remove locks + one event queue + general cleanup
Browse files Browse the repository at this point in the history
The main purpose is to remove complexity from gthread by:

* Removing the lock for handling self._keep and self.poller. This is
  possible since we now do all such manipulation on the main thread
  instead. When a connection is done, it posts a callback through the
  PollableMethodCaller which gets executed on the main thread.

* Having a single event queue (self.poller), as opposed to also
  managing a set of futures. This fixes benoitc#3146 (although there are
  more minimal ways of doing it).

There are other more minor things as well:

* Renaming some variables, e.g. self._keep to self.keepalived_conns.
* Remove self-explanatory comments (what the code does, not why).
* Just decide that socket is blocking.
* Use time.monotonic() for timeouts in gthread.

Some complexity has been added to the shutdown sequence, but hopefully
for good reason: it's to make sure that all already accepted
connections are served within the grace period.
  • Loading branch information
sylt committed Jul 3, 2024
1 parent 9802e21 commit 36bcf51
Showing 1 changed file with 113 additions and 147 deletions.
260 changes: 113 additions & 147 deletions gunicorn/workers/gthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from concurrent import futures
import errno
import os
import queue
import selectors
import socket
import ssl
Expand All @@ -22,7 +23,6 @@
from collections import deque
from datetime import datetime
from functools import partial
from threading import RLock

from . import base
from .. import http
Expand All @@ -41,44 +41,64 @@ def __init__(self, cfg, sock, client, server):

self.timeout = None
self.parser = None
self.initialized = False

# set the socket to non blocking
self.sock.setblocking(False)

def init(self):
self.initialized = True
self.sock.setblocking(True)

if self.parser is None:
# wrap the socket if needed
if self.cfg.is_ssl:
self.sock = sock.ssl_wrap_socket(self.sock, self.cfg)

# initialize the parser
self.parser = http.RequestParser(self.cfg, self.sock, self.client)

def set_timeout(self):
# set the timeout
self.timeout = time.time() + self.cfg.keepalive
def is_initialized(self):
return bool(self.parser)

def set_keepalive_timeout(self):
self.timeout = time.monotonic() + self.cfg.keepalive

def close(self):
util.close(self.sock)


class PollableMethodQueue(object):

def __init__(self):
self.fds = []
self.method_queue = None

def init(self):
self.fds = os.pipe()
self.method_queue = queue.SimpleQueue()

def close(self):
for fd in self.fds:
os.close(fd)

def get_fd(self):
return self.fds[0]

def defer(self, callback, *args):
self.method_queue.put(partial(callback, *args))
os.write(self.fds[1], b'0')

def run_callbacks(self, max_callbacks_at_a_time=10):
zeroes = os.read(self.fds[0], max_callbacks_at_a_time)
for _ in range(0, len(zeroes)):
method = self.method_queue.get()
method()


class ThreadWorker(base.Worker):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_connections = self.cfg.worker_connections
self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
# initialise the pool
self.tpool = None
self.thread_pool = None
self.poller = None
self._lock = None
self.futures = deque()
self._keep = deque()
self.keepalived_conns = deque()
self.nr_conns = 0
self.method_queue = PollableMethodQueue()

@classmethod
def check_config(cls, cfg, log):
Expand All @@ -89,100 +109,66 @@ def check_config(cls, cfg, log):
"Check the number of worker connections and threads.")

def init_process(self):
self.tpool = self.get_thread_pool()
self.thread_pool = self.get_thread_pool()
self.poller = selectors.DefaultSelector()
self._lock = RLock()
self.method_queue.init()
super().init_process()

def get_thread_pool(self):
"""Override this method to customize how the thread pool is created"""
return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)

def handle_quit(self, sig, frame):
def handle_exit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
self.tpool.shutdown(False)
time.sleep(0.1)
sys.exit(0)

def _wrap_future(self, fs, conn):
fs.conn = conn
self.futures.append(fs)
fs.add_done_callback(self.finish_request)

def enqueue_req(self, conn):
conn.init()
# submit the connection to a worker
fs = self.tpool.submit(self.handle, conn)
self._wrap_future(fs, conn)
self.method_queue.defer(lambda: None) # To wake up poller.select()

def handle_quit(self, sig, frame):
self.thread_pool.shutdown(False)
super().handle_quit(sig, frame)

def set_accept_enabled(self, enabled):
for sock in self.sockets:
if enabled:
self.poller.register(sock, selectors.EVENT_READ, self.accept)
else:
self.poller.unregister(sock)

def accept(self, server, listener):
def accept(self, listener):
try:
sock, client = listener.accept()
# initialize the connection object
conn = TConn(self.cfg, sock, client, server)

self.nr_conns += 1
# wait until socket is readable
with self._lock:
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
sock.setblocking(True) # Explicitly set behavior since it differs per OS
conn = TConn(self.cfg, sock, client, listener.getsockname())

self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise

def on_client_socket_readable(self, conn, client):
with self._lock:
# unregister the client from the poller
self.poller.unregister(client)
self.poller.unregister(client)

if conn.initialized:
# remove the connection from keepalive
try:
self._keep.remove(conn)
except ValueError:
# race condition
return
if conn.is_initialized():
self.keepalived_conns.remove(conn)
conn.init()

# submit the connection to a worker
self.enqueue_req(conn)
fs = self.thread_pool.submit(self.handle, conn)
fs.add_done_callback(
lambda fut: self.method_queue.defer(self.finish_request, conn, fut))

def murder_keepalived(self):
now = time.time()
while True:
with self._lock:
try:
# remove the connection from the queue
conn = self._keep.popleft()
except IndexError:
break

delta = conn.timeout - now
now = time.monotonic()
while self.keepalived_conns:
delta = self.keepalived_conns[0].timeout - now
if delta > 0:
# add the connection back to the queue
with self._lock:
self._keep.appendleft(conn)
break
else:
self.nr_conns -= 1
# remove the socket from the poller
with self._lock:
try:
self.poller.unregister(conn.sock)
except EnvironmentError as e:
if e.errno != errno.EBADF:
raise
except KeyError:
# already removed by the system, continue
pass
except ValueError:
# already removed by the system continue
pass

# close the socket
conn.close()

conn = self.keepalived_conns.popleft()
self.poller.unregister(conn.sock)
self.nr_conns -= 1
conn.close()

def is_parent_alive(self):
# If our parent changed then we shut down.
Expand All @@ -191,97 +177,77 @@ def is_parent_alive(self):
return False
return True

def wait_for_and_dispatch_events(self, timeout):
for key, _ in self.poller.select(timeout):
callback = key.data
callback(key.fileobj)

def run(self):
# init listeners, add them to the event loop
for sock in self.sockets:
sock.setblocking(False)
# a race condition during graceful shutdown may make the listener
# name unavailable in the request handler so capture it once here
server = sock.getsockname()
acceptor = partial(self.accept, server)
self.poller.register(sock, selectors.EVENT_READ, acceptor)
self.set_accept_enabled(True)
self.poller.register(self.method_queue.get_fd(),
selectors.EVENT_READ,
self.method_queue.run_callbacks)

while self.alive:
# notify the arbiter we are alive
self.notify()

# can we accept more connections?
if self.nr_conns < self.worker_connections:
# wait for an event
events = self.poller.select(1.0)
for key, _ in events:
callback = key.data
callback(key.fileobj)

# check (but do not wait) for finished requests
result = futures.wait(self.futures, timeout=0,
return_when=futures.FIRST_COMPLETED)
else:
# wait for a request to finish
result = futures.wait(self.futures, timeout=1.0,
return_when=futures.FIRST_COMPLETED)

# clean up finished requests
for fut in result.done:
self.futures.remove(fut)
new_connections_accepted = self.nr_conns < self.worker_connections
self.wait_for_and_dispatch_events(timeout=1)

if not self.is_parent_alive():
break

# handle keepalive timeouts
self.murder_keepalived()

self.tpool.shutdown(False)
new_connections_still_accepted = self.nr_conns < self.worker_connections
if new_connections_accepted != new_connections_still_accepted:
self.set_accept_enabled(new_connections_still_accepted)

# Don't accept any new connections, as we're about to shut down
if self.nr_conns < self.worker_connections:
self.set_accept_enabled(False)

# ... but try handle all already accepted connections within the grace period
graceful_timeout = time.monotonic() + self.cfg.graceful_timeout
while self.nr_conns > 0:
time_remaining = max(graceful_timeout - time.monotonic(), 0)
if time_remaining == 0:
break
self.wait_for_and_dispatch_events(timeout=time_remaining)

self.thread_pool.shutdown(wait=False)
self.poller.close()
self.method_queue.close()

for s in self.sockets:
s.close()

futures.wait(self.futures, timeout=self.cfg.graceful_timeout)

def finish_request(self, fs):
if fs.cancelled():
self.nr_conns -= 1
fs.conn.close()
return

def finish_request(self, conn, fs):
try:
(keepalive, conn) = fs.result()
# if the connection should be kept alived add it
# to the eventloop and record it
keepalive = not fs.cancelled() and fs.result()
if keepalive and self.alive:
# flag the socket as non blocked
conn.sock.setblocking(False)

# register the connection
conn.set_timeout()
with self._lock:
self._keep.append(conn)

# add the socket to the event loop
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
conn.set_keepalive_timeout()
self.keepalived_conns.append(conn)
self.poller.register(conn.sock, selectors.EVENT_READ,
partial(self.on_client_socket_readable, conn))
else:
self.nr_conns -= 1
conn.close()
except Exception:
# an exception happened, make sure to close the
# socket.
self.nr_conns -= 1
fs.conn.close()
conn.close()

def handle(self, conn):
keepalive = False
req = None
try:
req = next(conn.parser)
if not req:
return (False, conn)
return False

# handle the request
keepalive = self.handle_request(req, conn)
if keepalive:
return (keepalive, conn)
return self.handle_request(req, conn)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)

Expand All @@ -308,7 +274,7 @@ def handle(self, conn):
except Exception as e:
self.handle_error(req, conn.sock, conn.client, e)

return (False, conn)
return False

def handle_request(self, req, conn):
environ = {}
Expand All @@ -328,7 +294,7 @@ def handle_request(self, req, conn):

if not self.alive or not self.cfg.keepalive:
resp.force_close()
elif len(self._keep) >= self.max_keepalived:
elif len(self.keepalived_conns) >= self.max_keepalived:
resp.force_close()

respiter = self.wsgi(environ, resp.start_response)
Expand Down

0 comments on commit 36bcf51

Please sign in to comment.