From 36bcf513e218f5df25ebca7ddf1519fc5168cc5e Mon Sep 17 00:00:00 2001 From: Richard Eklycke Date: Sun, 18 Feb 2024 00:29:44 +0100 Subject: [PATCH] workers/gthread: Remove locks + one event queue + general cleanup The main purpose is to remove complexity from gthread by: * Removing the lock for handling self._keep and self.poller. This is possible since we now do all such manipulation on the main thread instead. When a connection is done, it posts a callback through the PollableMethodCaller which gets executed on the main thread. * Having a single event queue (self.poller), as opposed to also managing a set of futures. This fixes #3146 (although there are more minimal ways of doing it). There are other more minor things as well: * Renaming some variables, e.g. self._keep to self.keepalived_conns. * Remove self-explanatory comments (what the code does, not why). * Just decide that socket is blocking. * Use time.monotonic() for timeouts in gthread. Some complexity has been added to the shutdown sequence, but hopefully for good reason: it's to make sure that all already accepted connections are served within the grace period. --- gunicorn/workers/gthread.py | 260 ++++++++++++++++-------------------- 1 file changed, 113 insertions(+), 147 deletions(-) diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index c9c42345f..49946d775 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -14,6 +14,7 @@ from concurrent import futures import errno import os +import queue import selectors import socket import ssl @@ -22,7 +23,6 @@ from collections import deque from datetime import datetime from functools import partial -from threading import RLock from . import base from .. import http @@ -41,44 +41,64 @@ def __init__(self, cfg, sock, client, server): self.timeout = None self.parser = None - self.initialized = False - - # set the socket to non blocking - self.sock.setblocking(False) def init(self): - self.initialized = True - self.sock.setblocking(True) - if self.parser is None: # wrap the socket if needed if self.cfg.is_ssl: self.sock = sock.ssl_wrap_socket(self.sock, self.cfg) - # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock, self.client) - def set_timeout(self): - # set the timeout - self.timeout = time.time() + self.cfg.keepalive + def is_initialized(self): + return bool(self.parser) + + def set_keepalive_timeout(self): + self.timeout = time.monotonic() + self.cfg.keepalive def close(self): util.close(self.sock) +class PollableMethodQueue(object): + + def __init__(self): + self.fds = [] + self.method_queue = None + + def init(self): + self.fds = os.pipe() + self.method_queue = queue.SimpleQueue() + + def close(self): + for fd in self.fds: + os.close(fd) + + def get_fd(self): + return self.fds[0] + + def defer(self, callback, *args): + self.method_queue.put(partial(callback, *args)) + os.write(self.fds[1], b'0') + + def run_callbacks(self, max_callbacks_at_a_time=10): + zeroes = os.read(self.fds[0], max_callbacks_at_a_time) + for _ in range(0, len(zeroes)): + method = self.method_queue.get() + method() + + class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads - # initialise the pool - self.tpool = None + self.thread_pool = None self.poller = None - self._lock = None - self.futures = deque() - self._keep = deque() + self.keepalived_conns = deque() self.nr_conns = 0 + self.method_queue = PollableMethodQueue() @classmethod def check_config(cls, cfg, log): @@ -89,100 +109,66 @@ def check_config(cls, cfg, log): "Check the number of worker connections and threads.") def init_process(self): - self.tpool = self.get_thread_pool() + self.thread_pool = self.get_thread_pool() self.poller = selectors.DefaultSelector() - self._lock = RLock() + self.method_queue.init() super().init_process() def get_thread_pool(self): """Override this method to customize how the thread pool is created""" return futures.ThreadPoolExecutor(max_workers=self.cfg.threads) - def handle_quit(self, sig, frame): + def handle_exit(self, sig, frame): self.alive = False - # worker_int callback - self.cfg.worker_int(self) - self.tpool.shutdown(False) - time.sleep(0.1) - sys.exit(0) - - def _wrap_future(self, fs, conn): - fs.conn = conn - self.futures.append(fs) - fs.add_done_callback(self.finish_request) - - def enqueue_req(self, conn): - conn.init() - # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + self.method_queue.defer(lambda: None) # To wake up poller.select() + + def handle_quit(self, sig, frame): + self.thread_pool.shutdown(False) + super().handle_quit(sig, frame) + + def set_accept_enabled(self, enabled): + for sock in self.sockets: + if enabled: + self.poller.register(sock, selectors.EVENT_READ, self.accept) + else: + self.poller.unregister(sock) - def accept(self, server, listener): + def accept(self, listener): try: sock, client = listener.accept() - # initialize the connection object - conn = TConn(self.cfg, sock, client, server) - self.nr_conns += 1 - # wait until socket is readable - with self._lock: - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + sock.setblocking(True) # Explicitly set behavior since it differs per OS + conn = TConn(self.cfg, sock, client, listener.getsockname()) + + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) except EnvironmentError as e: if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise def on_client_socket_readable(self, conn, client): - with self._lock: - # unregister the client from the poller - self.poller.unregister(client) + self.poller.unregister(client) - if conn.initialized: - # remove the connection from keepalive - try: - self._keep.remove(conn) - except ValueError: - # race condition - return + if conn.is_initialized(): + self.keepalived_conns.remove(conn) + conn.init() - # submit the connection to a worker - self.enqueue_req(conn) + fs = self.thread_pool.submit(self.handle, conn) + fs.add_done_callback( + lambda fut: self.method_queue.defer(self.finish_request, conn, fut)) def murder_keepalived(self): - now = time.time() - while True: - with self._lock: - try: - # remove the connection from the queue - conn = self._keep.popleft() - except IndexError: - break - - delta = conn.timeout - now + now = time.monotonic() + while self.keepalived_conns: + delta = self.keepalived_conns[0].timeout - now if delta > 0: - # add the connection back to the queue - with self._lock: - self._keep.appendleft(conn) break - else: - self.nr_conns -= 1 - # remove the socket from the poller - with self._lock: - try: - self.poller.unregister(conn.sock) - except EnvironmentError as e: - if e.errno != errno.EBADF: - raise - except KeyError: - # already removed by the system, continue - pass - except ValueError: - # already removed by the system continue - pass - - # close the socket - conn.close() + + conn = self.keepalived_conns.popleft() + self.poller.unregister(conn.sock) + self.nr_conns -= 1 + conn.close() def is_parent_alive(self): # If our parent changed then we shut down. @@ -191,39 +177,23 @@ def is_parent_alive(self): return False return True + def wait_for_and_dispatch_events(self, timeout): + for key, _ in self.poller.select(timeout): + callback = key.data + callback(key.fileobj) + def run(self): - # init listeners, add them to the event loop - for sock in self.sockets: - sock.setblocking(False) - # a race condition during graceful shutdown may make the listener - # name unavailable in the request handler so capture it once here - server = sock.getsockname() - acceptor = partial(self.accept, server) - self.poller.register(sock, selectors.EVENT_READ, acceptor) + self.set_accept_enabled(True) + self.poller.register(self.method_queue.get_fd(), + selectors.EVENT_READ, + self.method_queue.run_callbacks) while self.alive: # notify the arbiter we are alive self.notify() - # can we accept more connections? - if self.nr_conns < self.worker_connections: - # wait for an event - events = self.poller.select(1.0) - for key, _ in events: - callback = key.data - callback(key.fileobj) - - # check (but do not wait) for finished requests - result = futures.wait(self.futures, timeout=0, - return_when=futures.FIRST_COMPLETED) - else: - # wait for a request to finish - result = futures.wait(self.futures, timeout=1.0, - return_when=futures.FIRST_COMPLETED) - - # clean up finished requests - for fut in result.done: - self.futures.remove(fut) + new_connections_accepted = self.nr_conns < self.worker_connections + self.wait_for_and_dispatch_events(timeout=1) if not self.is_parent_alive(): break @@ -231,57 +201,53 @@ def run(self): # handle keepalive timeouts self.murder_keepalived() - self.tpool.shutdown(False) + new_connections_still_accepted = self.nr_conns < self.worker_connections + if new_connections_accepted != new_connections_still_accepted: + self.set_accept_enabled(new_connections_still_accepted) + + # Don't accept any new connections, as we're about to shut down + if self.nr_conns < self.worker_connections: + self.set_accept_enabled(False) + + # ... but try handle all already accepted connections within the grace period + graceful_timeout = time.monotonic() + self.cfg.graceful_timeout + while self.nr_conns > 0: + time_remaining = max(graceful_timeout - time.monotonic(), 0) + if time_remaining == 0: + break + self.wait_for_and_dispatch_events(timeout=time_remaining) + + self.thread_pool.shutdown(wait=False) self.poller.close() + self.method_queue.close() for s in self.sockets: s.close() - futures.wait(self.futures, timeout=self.cfg.graceful_timeout) - - def finish_request(self, fs): - if fs.cancelled(): - self.nr_conns -= 1 - fs.conn.close() - return - + def finish_request(self, conn, fs): try: - (keepalive, conn) = fs.result() - # if the connection should be kept alived add it - # to the eventloop and record it + keepalive = not fs.cancelled() and fs.result() if keepalive and self.alive: - # flag the socket as non blocked - conn.sock.setblocking(False) - - # register the connection - conn.set_timeout() - with self._lock: - self._keep.append(conn) - - # add the socket to the event loop - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + conn.set_keepalive_timeout() + self.keepalived_conns.append(conn) + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) else: self.nr_conns -= 1 conn.close() except Exception: - # an exception happened, make sure to close the - # socket. self.nr_conns -= 1 - fs.conn.close() + conn.close() def handle(self, conn): - keepalive = False req = None try: req = next(conn.parser) if not req: - return (False, conn) + return False # handle the request - keepalive = self.handle_request(req, conn) - if keepalive: - return (keepalive, conn) + return self.handle_request(req, conn) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) @@ -308,7 +274,7 @@ def handle(self, conn): except Exception as e: self.handle_error(req, conn.sock, conn.client, e) - return (False, conn) + return False def handle_request(self, req, conn): environ = {} @@ -328,7 +294,7 @@ def handle_request(self, req, conn): if not self.alive or not self.cfg.keepalive: resp.force_close() - elif len(self._keep) >= self.max_keepalived: + elif len(self.keepalived_conns) >= self.max_keepalived: resp.force_close() respiter = self.wsgi(environ, resp.start_response)