diff --git a/.github/workflows/bsd.yml b/.github/workflows/bsd.yml new file mode 100644 index 000000000..0b8cbfe9b --- /dev/null +++ b/.github/workflows/bsd.yml @@ -0,0 +1,67 @@ +name: bsd +on: + push: + branches: + - master + paths: + - '*.py' + - 'tox.ini' + - '.github/workflows/bsd.yml' + pull_request: + branches: + - master +permissions: + # BOLD WARNING: do not add permissions, this workflow executes remote code + contents: read +env: + FORCE_COLOR: 1 +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true +jobs: + freebsd: + name: freebsd + timeout-minutes: 20 + runs-on: ubuntu-latest + strategy: + fail-fast: true + steps: + - uses: actions/checkout@v4 + - uses: vmactions/freebsd-vm@v1 + with: + prepare: pkg install -y nginx python311 py311-pip py311-tox py311-sqlite3 + usesh: true + copyback: false + # not a typo: "openssl --version" != "openssl version" + run: | + uname -a \ + && python3.11 --version \ + && python3.11 -m tox --version \ + && openssl version \ + && pkg info nginx \ + && python3.11 -m tox -e run-module \ + && python3.11 -m tox -e run-entrypoint \ + && python3.11 -m tox -e py + + openbsd: + name: openbsd + timeout-minutes: 20 + runs-on: ubuntu-latest + strategy: + fail-fast: true + steps: + - uses: actions/checkout@v4 + - uses: vmactions/openbsd-vm@v1 + with: + prepare: pkg_add python py3-pip py3-tox py3-sqlite3 nginx + usesh: true + copyback: false + run: | + uname -a \ + && python3 --version \ + && python3 -m tox --version \ + && openssl version \ + && pkg_info nginx \ + && python3 -m tox -e run-module \ + && python3 -m tox -e run-entrypoint \ + && python3 -m tox -e py diff --git a/.github/workflows/illumos.yml b/.github/workflows/illumos.yml new file mode 100644 index 000000000..2e3b059cc --- /dev/null +++ b/.github/workflows/illumos.yml @@ -0,0 +1,52 @@ +name: illumos +on: + push: + branches: + - master + paths: + - '*.py' + - 'tox.ini' + - '.github/workflows/illumos.yml' + pull_request: + branches: + - master +permissions: + # BOLD WARNING: do not add permissions, this workflow executes remote code + contents: read +env: + FORCE_COLOR: 1 +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true +jobs: + omnios: + name: illumos + timeout-minutes: 20 + runs-on: ubuntu-latest + strategy: + fail-fast: true + steps: + - uses: actions/checkout@v4 + - uses: vmactions/omnios-vm@v1 + with: + # need gcc: compile greenlet from source + # autoconf must pretend inotify unavail: libev FTBFS + # /tmp/.nginx must exist because nginx will not create configured tmp + prepare: | + pkg install pip-311 python-311 sqlite-3 nginx gcc13 + usesh: true + copyback: false + run: | + cat /etc/release \ + && uname -a \ + && python3 --version \ + && openssl version \ + && pkg info nginx \ + && gcc -dM -E - > 8 - if exitcode != 0: - self.log.error('Worker (pid:%s) exited with code %s', wpid, exitcode) + continue + + if os.WIFEXITED(status): + # A worker was normally terminated. If the termination + # reason was that it could not boot, we'll halt the server + # to avoid infinite start/stop cycles. + exitcode = os.WEXITSTATUS(status) + log = self.log.error if exitcode != 0 else self.log.debug + log('Worker (pid:%s) exited with code %s', wpid, exitcode) if exitcode == self.WORKER_BOOT_ERROR: reason = "Worker failed to boot." raise HaltServer(reason, self.WORKER_BOOT_ERROR) if exitcode == self.APP_LOAD_ERROR: reason = "App failed to load." raise HaltServer(reason, self.APP_LOAD_ERROR) - - if exitcode > 0: - # If the exit code of the worker is greater than 0, - # let the user know. - self.log.error("Worker (pid:%s) exited with code %s.", - wpid, exitcode) - elif status > 0: - # If the exit code of the worker is 0 and the status - # is greater than 0, then it was most likely killed - # via a signal. - try: - sig_name = signal.Signals(status).name - except ValueError: - sig_name = "code {}".format(status) - msg = "Worker (pid:{}) was sent {}!".format( - wpid, sig_name) - - # Additional hint for SIGKILL - if status == signal.SIGKILL: - msg += " Perhaps out of memory?" - self.log.error(msg) - - worker = self.WORKERS.pop(wpid, None) - if not worker: - continue + elif os.WIFSIGNALED(status): + # A worker was terminated by a signal. + sig = os.WTERMSIG(status) + try: + sig_name = signal.Signals(sig).name + except ValueError: + sig_name = "signal {}".format(sig) + msg = "Worker (pid:{}) was terminated by {}!".format( + wpid, sig_name) + + # Additional hint for SIGKILL + if sig == signal.SIGKILL: + msg += " Perhaps out of memory?" + self.log.error(msg) + + worker = self.WORKERS.pop(wpid, None) + if worker: worker.tmp.close() self.cfg.child_exit(self, worker) except OSError as e: @@ -583,6 +547,16 @@ def manage_workers(self): "value": active_worker_count, "mtype": "gauge"}) + if self.cfg.enable_backlog_metric: + backlog = sum(sock.get_backlog() or 0 + for sock in self.LISTENERS) + + if backlog >= 0: + self.log.debug("socket backlog: {0}".format(backlog), + extra={"metric": "gunicorn.backlog", + "value": backlog, + "mtype": "histogram"}) + def spawn_worker(self): self.worker_age += 1 worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, @@ -604,6 +578,8 @@ def spawn_worker(self): try: util._setproctitle("worker [%s]" % self.proc_name) self.log.info("Booting worker with pid: %s", worker.pid) + if self.cfg.reuse_port: + worker.sockets = sock.create_sockets(self.cfg, self.log) self.cfg.post_fork(self, worker) worker.init_process() sys.exit(0) diff --git a/gunicorn/config.py b/gunicorn/config.py index 402a26b68..e21dcf58a 100644 --- a/gunicorn/config.py +++ b/gunicorn/config.py @@ -4,6 +4,7 @@ # Please remember to run "make -C docs html" after update "desc" attributes. +from functools import reduce import argparse import copy import grp @@ -237,10 +238,24 @@ def paste_global_conf(self): class SettingMeta(type): def __new__(cls, name, bases, attrs): super_new = super().__new__ - parents = [b for b in bases if isinstance(b, SettingMeta)] - if not parents: + + # creating parent class, return without registering known setting + if "desc" not in attrs: return super_new(cls, name, bases, attrs) + # creating new leaf class, register setting + # FIXME: put idiomatic expression for inheritance in metaclass here + parent_attrs = [ + {k: v for (k, v) in vars(b).items() if not k.startswith("__")} + for b in bases[::-1] # backwards: last in class hierarchy wins + ] + + # Python 3.9 permits dict() | dict() => operator.or_ suffices + def dict_or(a, b): + c = a.copy() + c.update(b) + return c + attrs = reduce(dict_or, parent_attrs + [attrs, ]) attrs["order"] = len(KNOWN_SETTINGS) attrs["validator"] = staticmethod(attrs["validator"]) @@ -255,20 +270,21 @@ def fmt_desc(cls, desc): setattr(cls, "short", desc.splitlines()[0]) -class Setting: +class BaseSetting: name = None value = None section = None cli = None validator = None - type = None + type = str meta = None - action = None + action = "store" default = None short = None desc = None nargs = None const = None + hidden_in_help = False def __init__(self): if self.default is not None: @@ -284,10 +300,10 @@ def add_option(self, parser): kwargs = { "dest": self.name, - "action": self.action or "store", - "type": self.type or str, + "action": self.action, + "type": self.type, "default": None, - "help": help_txt + "help": argparse.SUPPRESS if self.hidden_in_help else help_txt, } if self.meta is not None: @@ -329,7 +345,16 @@ def __repr__(self): ) -Setting = SettingMeta('Setting', (Setting,), {}) +class Setting(BaseSetting, metaclass=SettingMeta): + pass + + +class Unstable: + pass # likely: move to separate document + + +class Deprecated(Unstable): + hidden_in_help = True def validate_bool(val): @@ -547,6 +572,31 @@ def get_default_config_file(): return None +class PosIntSetting(Setting): + meta = "INT" + validator = validate_pos_int + type = int + + +class HexPosIntSetting(Setting): + meta = "INT" + validator = validate_pos_int + type = auto_int + + +class BoolSetting(Setting): + validator = validate_bool + default = False + action = 'store_true' + + +class HookSetting(Setting): + # typically; not defined here to keep # of args in leaf: + # validator = validate_callable(-1) + section = "Server Hooks" + type = callable + + class ConfigFile(Setting): name = "config" section = "Config File" @@ -619,13 +669,10 @@ class Bind(Setting): """ -class Backlog(Setting): +class Backlog(PosIntSetting): name = "backlog" section = "Server Socket" cli = ["--backlog"] - meta = "INT" - validator = validate_pos_int - type = int default = 2048 desc = """\ The maximum number of pending connections. @@ -639,13 +686,10 @@ class Backlog(Setting): """ -class Workers(Setting): +class Workers(PosIntSetting): name = "workers" section = "Worker Processes" cli = ["-w", "--workers"] - meta = "INT" - validator = validate_pos_int - type = int default = int(os.environ.get("WEB_CONCURRENCY", 1)) desc = """\ The number of worker processes for handling requests. @@ -694,13 +738,10 @@ class WorkerClass(Setting): """ -class WorkerThreads(Setting): +class WorkerThreads(PosIntSetting): name = "threads" section = "Worker Processes" cli = ["--threads"] - meta = "INT" - validator = validate_pos_int - type = int default = 1 desc = """\ The number of worker threads for handling requests. @@ -722,13 +763,10 @@ class WorkerThreads(Setting): """ -class WorkerConnections(Setting): +class WorkerConnections(PosIntSetting): name = "worker_connections" section = "Worker Processes" cli = ["--worker-connections"] - meta = "INT" - validator = validate_pos_int - type = int default = 1000 desc = """\ The maximum number of simultaneous clients. @@ -737,13 +775,10 @@ class WorkerConnections(Setting): """ -class MaxRequests(Setting): +class MaxRequests(PosIntSetting): name = "max_requests" section = "Worker Processes" cli = ["--max-requests"] - meta = "INT" - validator = validate_pos_int - type = int default = 0 desc = """\ The maximum number of requests a worker will process before restarting. @@ -757,13 +792,10 @@ class MaxRequests(Setting): """ -class MaxRequestsJitter(Setting): +class MaxRequestsJitter(PosIntSetting): name = "max_requests_jitter" section = "Worker Processes" cli = ["--max-requests-jitter"] - meta = "INT" - validator = validate_pos_int - type = int default = 0 desc = """\ The maximum jitter to add to the *max_requests* setting. @@ -776,13 +808,10 @@ class MaxRequestsJitter(Setting): """ -class Timeout(Setting): +class Timeout(PosIntSetting): name = "timeout" section = "Worker Processes" cli = ["-t", "--timeout"] - meta = "INT" - validator = validate_pos_int - type = int default = 30 desc = """\ Workers silent for more than this many seconds are killed and restarted. @@ -798,13 +827,10 @@ class Timeout(Setting): """ -class GracefulTimeout(Setting): +class GracefulTimeout(PosIntSetting): name = "graceful_timeout" section = "Worker Processes" cli = ["--graceful-timeout"] - meta = "INT" - validator = validate_pos_int - type = int default = 30 desc = """\ Timeout for graceful workers restart. @@ -815,13 +841,10 @@ class GracefulTimeout(Setting): """ -class Keepalive(Setting): +class Keepalive(PosIntSetting): name = "keepalive" section = "Worker Processes" cli = ["--keep-alive"] - meta = "INT" - validator = validate_pos_int - type = int default = 2 desc = """\ The number of seconds to wait for requests on a Keep-Alive connection. @@ -837,13 +860,10 @@ class Keepalive(Setting): """ -class LimitRequestLine(Setting): +class LimitRequestLine(PosIntSetting): name = "limit_request_line" section = "Security" cli = ["--limit-request-line"] - meta = "INT" - validator = validate_pos_int - type = int default = 4094 desc = """\ The maximum size of HTTP request line in bytes. @@ -861,13 +881,10 @@ class LimitRequestLine(Setting): """ -class LimitRequestFields(Setting): +class LimitRequestFields(PosIntSetting): name = "limit_request_fields" section = "Security" cli = ["--limit-request-fields"] - meta = "INT" - validator = validate_pos_int - type = int default = 100 desc = """\ Limit the number of HTTP headers fields in a request. @@ -879,13 +896,10 @@ class LimitRequestFields(Setting): """ -class LimitRequestFieldSize(Setting): +class LimitRequestFieldSize(PosIntSetting): name = "limit_request_field_size" section = "Security" cli = ["--limit-request-field_size"] - meta = "INT" - validator = validate_pos_int - type = int default = 8190 desc = """\ Limit the allowed size of an HTTP request header field. @@ -899,13 +913,10 @@ class LimitRequestFieldSize(Setting): """ -class Reload(Setting): +class Reload(BoolSetting): name = "reload" section = 'Debugging' cli = ['--reload'] - validator = validate_bool - action = 'store_true' - default = False desc = '''\ Restart workers when code changes. @@ -921,6 +932,10 @@ class Reload(Setting): system polling. Generally, inotify should be preferred if available because it consumes less system resources. + .. note:: + If the application fails to load while this option is used, + the (potentially sensitive!) traceback will be shared in + the response to subsequent HTTP requests. .. note:: In order to use the inotify reloader, you must have the ``inotify`` package installed. @@ -956,20 +971,20 @@ class ReloadExtraFiles(Setting): validator = validate_list_of_existing_files default = [] desc = """\ - Extends :ref:`reload` option to also watch and reload on additional files + Alternative or extension to :ref:`reload` option to (also) watch + and reload on additional files (e.g., templates, configurations, specifications, etc.). .. versionadded:: 19.8 + .. versionchanged:: 23.FIXME + Option no longer silently ignored if used without :ref:`reload`. """ -class Spew(Setting): +class Spew(BoolSetting): name = "spew" section = "Debugging" cli = ["--spew"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Install a trace function that spews every line executed by the server. @@ -977,38 +992,29 @@ class Spew(Setting): """ -class ConfigCheck(Setting): +class ConfigCheck(BoolSetting): name = "check_config" section = "Debugging" cli = ["--check-config"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Check the configuration and exit. The exit status is 0 if the configuration is correct, and 1 if the configuration is incorrect. """ -class PrintConfig(Setting): +class PrintConfig(BoolSetting): name = "print_config" section = "Debugging" cli = ["--print-config"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Print the configuration settings as fully resolved. Implies :ref:`check-config`. """ -class PreloadApp(Setting): +class PreloadApp(BoolSetting): name = "preload_app" section = "Server Mechanics" cli = ["--preload"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Load application code before the worker processes are forked. @@ -1025,6 +1031,7 @@ class Sendfile(Setting): cli = ["--no-sendfile"] validator = validate_bool action = "store_const" + # FIXME: how should the default look in docs? const = False desc = """\ @@ -1042,13 +1049,10 @@ class Sendfile(Setting): """ -class ReusePort(Setting): +class ReusePort(BoolSetting): name = "reuse_port" section = "Server Mechanics" cli = ["--reuse-port"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Set the ``SO_REUSEPORT`` flag on the listening socket. @@ -1069,13 +1073,10 @@ class Chdir(Setting): """ -class Daemon(Setting): +class Daemon(BoolSetting): name = "daemon" section = "Server Mechanics" cli = ["-D", "--daemon"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Daemonize the Gunicorn process. @@ -1123,6 +1124,10 @@ class Pidfile(Setting): A filename to use for the PID file. If not set, no PID file will be written. + + .. note:: + During master re-exec, a ``.2`` suffix is added to + this path to store the PID of the newly launched master. """ @@ -1182,13 +1187,10 @@ class Group(Setting): """ -class Umask(Setting): +class Umask(HexPosIntSetting): name = "umask" section = "Server Mechanics" cli = ["-m", "--umask"] - meta = "INT" - validator = validate_pos_int - type = auto_int default = 0 desc = """\ A bit mask for the file mode on files written by Gunicorn. @@ -1202,13 +1204,10 @@ class Umask(Setting): """ -class Initgroups(Setting): +class Initgroups(BoolSetting): name = "initgroups" section = "Server Mechanics" cli = ["--initgroups"] - validator = validate_bool - action = 'store_true' - default = False desc = """\ If true, set the worker process's group access list with all of the @@ -1373,13 +1372,10 @@ class AccessLog(Setting): """ -class DisableRedirectAccessToSyslog(Setting): +class DisableRedirectAccessToSyslog(BoolSetting): name = "disable_redirect_access_to_syslog" section = "Logging" cli = ["--disable-redirect-access-to-syslog"] - validator = validate_bool - action = 'store_true' - default = False desc = """\ Disable redirect access logs to syslog. @@ -1469,13 +1465,10 @@ class Loglevel(Setting): """ -class CaptureOutput(Setting): +class CaptureOutput(BoolSetting): name = "capture_output" section = "Logging" cli = ["--capture-output"] - validator = validate_bool - action = 'store_true' - default = False desc = """\ Redirect stdout/stderr to specified file in :ref:`errorlog`. @@ -1582,13 +1575,10 @@ class SyslogTo(Setting): """ -class Syslog(Setting): +class Syslog(BoolSetting): name = "syslog" section = "Logging" cli = ["--log-syslog"] - validator = validate_bool - action = 'store_true' - default = False desc = """\ Send *Gunicorn* logs to syslog. @@ -1625,13 +1615,10 @@ class SyslogFacility(Setting): """ -class EnableStdioInheritance(Setting): +class EnableStdioInheritance(BoolSetting): name = "enable_stdio_inheritance" section = "Logging" cli = ["-R", "--enable-stdio-inheritance"] - validator = validate_bool - default = False - action = "store_true" desc = """\ Enable stdio inheritance. @@ -1693,6 +1680,20 @@ class StatsdPrefix(Setting): """ +class BacklogMetric(Setting): + name = "enable_backlog_metric" + section = "Logging" + cli = ["--enable-backlog-metric"] + validator = validate_bool + default = False + action = "store_true" + desc = """\ + Enable socket backlog metric (only supported on Linux). + + .. versionadded:: 23.1 + """ + + class Procname(Setting): name = "proc_name" section = "Process Naming" @@ -1754,11 +1755,9 @@ class Paste(Setting): """ -class OnStarting(Setting): +class OnStarting(HookSetting): name = "on_starting" - section = "Server Hooks" validator = validate_callable(1) - type = callable def on_starting(server): pass @@ -1770,11 +1769,9 @@ def on_starting(server): """ -class OnReload(Setting): +class OnReload(HookSetting): name = "on_reload" - section = "Server Hooks" validator = validate_callable(1) - type = callable def on_reload(server): pass @@ -1786,11 +1783,9 @@ def on_reload(server): """ -class WhenReady(Setting): +class WhenReady(HookSetting): name = "when_ready" - section = "Server Hooks" validator = validate_callable(1) - type = callable def when_ready(server): pass @@ -1802,11 +1797,9 @@ def when_ready(server): """ -class Prefork(Setting): +class Prefork(HookSetting): name = "pre_fork" - section = "Server Hooks" validator = validate_callable(2) - type = callable def pre_fork(server, worker): pass @@ -1819,11 +1812,9 @@ def pre_fork(server, worker): """ -class Postfork(Setting): +class Postfork(HookSetting): name = "post_fork" - section = "Server Hooks" validator = validate_callable(2) - type = callable def post_fork(server, worker): pass @@ -1836,11 +1827,9 @@ def post_fork(server, worker): """ -class PostWorkerInit(Setting): +class PostWorkerInit(HookSetting): name = "post_worker_init" - section = "Server Hooks" validator = validate_callable(1) - type = callable def post_worker_init(worker): pass @@ -1854,11 +1843,9 @@ def post_worker_init(worker): """ -class WorkerInt(Setting): +class WorkerInt(HookSetting): name = "worker_int" - section = "Server Hooks" validator = validate_callable(1) - type = callable def worker_int(worker): pass @@ -1872,11 +1859,9 @@ def worker_int(worker): """ -class WorkerAbort(Setting): +class WorkerAbort(HookSetting): name = "worker_abort" - section = "Server Hooks" validator = validate_callable(1) - type = callable def worker_abort(worker): pass @@ -1892,11 +1877,9 @@ def worker_abort(worker): """ -class PreExec(Setting): +class PreExec(HookSetting): name = "pre_exec" - section = "Server Hooks" validator = validate_callable(1) - type = callable def pre_exec(server): pass @@ -1908,11 +1891,9 @@ def pre_exec(server): """ -class PreRequest(Setting): +class PreRequest(HookSetting): name = "pre_request" - section = "Server Hooks" validator = validate_callable(2) - type = callable def pre_request(worker, req): worker.log.debug("%s %s", req.method, req.path) @@ -1942,11 +1923,9 @@ def post_request(worker, req, environ, resp): """ -class ChildExit(Setting): +class ChildExit(HookSetting): name = "child_exit" - section = "Server Hooks" validator = validate_callable(2) - type = callable def child_exit(server, worker): pass @@ -1961,11 +1940,9 @@ def child_exit(server, worker): """ -class WorkerExit(Setting): +class WorkerExit(HookSetting): name = "worker_exit" - section = "Server Hooks" validator = validate_callable(2) - type = callable def worker_exit(server, worker): pass @@ -1978,11 +1955,9 @@ def worker_exit(server, worker): """ -class NumWorkersChanged(Setting): +class NumWorkersChanged(HookSetting): name = "nworkers_changed" - section = "Server Hooks" validator = validate_callable(3) - type = callable def nworkers_changed(server, new_value, old_value): pass @@ -1998,9 +1973,8 @@ def nworkers_changed(server, new_value, old_value): """ -class OnExit(Setting): +class OnExit(HookSetting): name = "on_exit" - section = "Server Hooks" validator = validate_callable(1) def on_exit(server): @@ -2014,11 +1988,9 @@ def on_exit(server): """ -class NewSSLContext(Setting): +class NewSSLContext(HookSetting): name = "ssl_context" - section = "Server Hooks" validator = validate_callable(2) - type = callable def ssl_context(config, default_ssl_context_factory): return default_ssl_context_factory() @@ -2049,13 +2021,10 @@ def ssl_context(conf, default_ssl_context_factory): """ -class ProxyProtocol(Setting): +class ProxyProtocol(BoolSetting): name = "proxy_protocol" section = "Server Mechanics" cli = ["--proxy-protocol"] - validator = validate_bool - default = False - action = "store_true" desc = """\ Enable detect PROXY protocol (PROXY mode). @@ -2120,7 +2089,7 @@ class CertFile(Setting): """ -class SSLVersion(Setting): +class SSLVersion(Setting, Deprecated): name = "ssl_version" section = "SSL" cli = ["--ssl-version"] @@ -2196,25 +2165,20 @@ class CACerts(Setting): """ -class SuppressRaggedEOFs(Setting): +class SuppressRaggedEOFs(BoolSetting): name = "suppress_ragged_eofs" section = "SSL" cli = ["--suppress-ragged-eofs"] - action = "store_true" - default = True - validator = validate_bool + default = True # (sic!) desc = """\ Suppress ragged EOFs (see stdlib ssl module's) """ -class DoHandshakeOnConnect(Setting): +class DoHandshakeOnConnect(BoolSetting): name = "do_handshake_on_connect" section = "SSL" cli = ["--do-handshake-on-connect"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Whether to perform SSL handshake on socket connect (see stdlib ssl module's) """ @@ -2266,13 +2230,10 @@ class PasteGlobalConf(Setting): """ -class PermitObsoleteFolding(Setting): +class PermitObsoleteFolding(BoolSetting): name = "permit_obsolete_folding" section = "Server Mechanics" cli = ["--permit-obsolete-folding"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Permit requests employing obsolete HTTP line folding mechanism @@ -2287,13 +2248,10 @@ class PermitObsoleteFolding(Setting): """ -class StripHeaderSpaces(Setting): +class StripHeaderSpaces(BoolSetting, Deprecated): name = "strip_header_spaces" section = "Server Mechanics" cli = ["--strip-header-spaces"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Strip spaces present between the header name and the the ``:``. @@ -2306,13 +2264,11 @@ class StripHeaderSpaces(Setting): """ -class PermitUnconventionalHTTPMethod(Setting): +class PermitUnconventionalHTTPMethod(BoolSetting, Unstable): name = "permit_unconventional_http_method" section = "Server Mechanics" cli = ["--permit-unconventional-http-method"] - validator = validate_bool - action = "store_true" - default = False + # when removed or changed: consider making True the new default desc = """\ Permit HTTP methods not matching conventions, such as IANA registration guidelines @@ -2332,13 +2288,10 @@ class PermitUnconventionalHTTPMethod(Setting): """ -class PermitUnconventionalHTTPVersion(Setting): +class PermitUnconventionalHTTPVersion(BoolSetting, Unstable): name = "permit_unconventional_http_version" section = "Server Mechanics" cli = ["--permit-unconventional-http-version"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Permit HTTP version not matching conventions of 2023 @@ -2353,13 +2306,10 @@ class PermitUnconventionalHTTPVersion(Setting): """ -class CasefoldHTTPMethod(Setting): +class CasefoldHTTPMethod(BoolSetting, Deprecated): name = "casefold_http_method" section = "Server Mechanics" cli = ["--casefold-http-method"] - validator = validate_bool - action = "store_true" - default = False desc = """\ Transform received HTTP methods to uppercase diff --git a/gunicorn/http/body.py b/gunicorn/http/body.py index d7ee29e78..01ad4637d 100644 --- a/gunicorn/http/body.py +++ b/gunicorn/http/body.py @@ -6,7 +6,7 @@ import sys from gunicorn.http.errors import (NoMoreData, ChunkMissingTerminator, - InvalidChunkSize) + InvalidChunkSize, InvalidChunkExtension) class ChunkedReader: @@ -91,6 +91,10 @@ def parse_chunk_size(self, unreader, data=None): chunk_size, *chunk_ext = line.split(b";", 1) if chunk_ext: chunk_size = chunk_size.rstrip(b" \t") + # Security: Don't allow CRs and LFs in chunk extensions + # This can cause request smuggling issues with some proxies + if any(c in chunk_ext[0] for c in (b"\n", b"\r")): + raise InvalidChunkExtension(chunk_ext[0]) if any(n not in b"0123456789abcdefABCDEF" for n in chunk_size): raise InvalidChunkSize(chunk_size) if len(chunk_size) == 0: diff --git a/gunicorn/http/errors.py b/gunicorn/http/errors.py index bcb970072..8ed1d0b75 100644 --- a/gunicorn/http/errors.py +++ b/gunicorn/http/errors.py @@ -97,6 +97,14 @@ def __str__(self): return "Invalid chunk size: %r" % self.data +class InvalidChunkExtension(IOError): + def __init__(self, data): + self.data = data + + def __str__(self): + return "Invalid chunk extension: %r" % self.data + + class ChunkMissingTerminator(IOError): def __init__(self, term): self.term = term diff --git a/gunicorn/http/message.py b/gunicorn/http/message.py index 59ce0bf4b..5279a07bd 100644 --- a/gunicorn/http/message.py +++ b/gunicorn/http/message.py @@ -178,8 +178,9 @@ def set_body_reader(self): elif name == "TRANSFER-ENCODING": # T-E can be a list # https://datatracker.ietf.org/doc/html/rfc9112#name-transfer-encoding - vals = [v.strip() for v in value.split(',')] - for val in vals: + te_split_at_comma = [v.strip() for v in value.split(',')] + # N.B. we might have split in the middle of quoted transfer-parameter + for val in te_split_at_comma: if val.lower() == "chunked": # DANGER: transfer codings stack, and stacked chunking is never intended if chunked: @@ -187,7 +188,7 @@ def set_body_reader(self): chunked = True elif val.lower() == "identity": # does not do much, could still plausibly desync from what the proxy does - # safe option: nuke it, its never needed + # safe option: reject, its never needed if chunked: raise InvalidHeader("TRANSFER-ENCODING", req=self) elif val.lower() in ('compress', 'deflate', 'gzip'): @@ -196,6 +197,8 @@ def set_body_reader(self): raise InvalidHeader("TRANSFER-ENCODING", req=self) self.force_close() else: + # DANGER: this not only rejects unknown encodings, but also + # leftovers from not splitting at transfer-coding boundary raise UnsupportedTransferCoding(value) if chunked: @@ -203,11 +206,13 @@ def set_body_reader(self): # a) CL + TE (TE overrides CL.. only safe if the recipient sees it that way too) # b) chunked HTTP/1.0 (always faulty) if self.version < (1, 1): - # framing wonky, see RFC 9112 Section 6.1 + # framing is faulty + # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-16 raise InvalidHeader("TRANSFER-ENCODING", req=self) if content_length is not None: # we cannot be certain the message framing we understood matches proxy intent # -> whatever happens next, remaining input must not be trusted + # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-15 raise InvalidHeader("CONTENT-LENGTH", req=self) self.body = Body(ChunkedReader(self, self.unreader)) elif content_length is not None: diff --git a/gunicorn/instrument/statsd.py b/gunicorn/instrument/statsd.py index 7bc4e6ffd..102e2b534 100644 --- a/gunicorn/instrument/statsd.py +++ b/gunicorn/instrument/statsd.py @@ -17,6 +17,7 @@ GAUGE_TYPE = "gauge" COUNTER_TYPE = "counter" HISTOGRAM_TYPE = "histogram" +TIMER_TYPE = "timer" class Statsd(Logger): @@ -31,11 +32,14 @@ def __init__(self, cfg): else: address_family = socket.AF_INET + self.sock = None try: self.sock = socket.socket(address_family, socket.SOCK_DGRAM) self.sock.connect(cfg.statsd_host) except Exception: - self.sock = None + if self.sock is not None: + self.sock.close() + self.sock = None self.dogstatsd_tags = cfg.dogstatsd_tags @@ -80,6 +84,8 @@ def log(self, lvl, msg, *args, **kwargs): self.increment(metric, value) elif typ == HISTOGRAM_TYPE: self.histogram(metric, value) + elif typ == TIMER_TYPE: + self.timer(metric, value) else: pass @@ -101,7 +107,7 @@ def access(self, resp, req, environ, request_time): status = status.decode('utf-8') if isinstance(status, str): status = int(status.split(None, 1)[0]) - self.histogram("gunicorn.request.duration", duration_in_ms) + self.timer("gunicorn.request.duration", duration_in_ms) self.increment("gunicorn.requests", 1) self.increment("gunicorn.request.status.%d" % status, 1) @@ -116,9 +122,12 @@ def increment(self, name, value, sampling_rate=1.0): def decrement(self, name, value, sampling_rate=1.0): self._sock_send("{0}{1}:-{2}|c|@{3}".format(self.prefix, name, value, sampling_rate)) - def histogram(self, name, value): + def timer(self, name, value): self._sock_send("{0}{1}:{2}|ms".format(self.prefix, name, value)) + def histogram(self, name, value): + self._sock_send("{0}{1}:{2}|h".format(self.prefix, name, value)) + def _sock_send(self, msg): try: if isinstance(msg, str): diff --git a/gunicorn/reloader.py b/gunicorn/reloader.py index 1c67f2a7d..51fd15c88 100644 --- a/gunicorn/reloader.py +++ b/gunicorn/reloader.py @@ -14,17 +14,21 @@ class Reloader(threading.Thread): - def __init__(self, extra_files=None, interval=1, callback=None): + def __init__(self, extra_files=None, interval=1, callback=None, auto_detect=False): super().__init__() self.daemon = True self._extra_files = set(extra_files or ()) self._interval = interval self._callback = callback + self._auto_detect = auto_detect def add_extra_file(self, filename): self._extra_files.add(filename) def get_files(self): + if not self._auto_detect: + return self._extra_files + fnames = [ COMPILED_EXT_RE.sub('py', module.__file__) for module in tuple(sys.modules.values()) @@ -71,12 +75,13 @@ class InotifyReloader(threading.Thread): | inotify.constants.IN_MOVE_SELF | inotify.constants.IN_MOVED_FROM | inotify.constants.IN_MOVED_TO) - def __init__(self, extra_files=None, callback=None): + def __init__(self, extra_files=None, callback=None, auto_detect=False): super().__init__() self.daemon = True self._callback = callback self._dirs = set() self._watcher = Inotify() + self._auto_detect = auto_detect for extra_file in extra_files: self.add_extra_file(extra_file) @@ -91,6 +96,9 @@ def add_extra_file(self, filename): self._dirs.add(dirname) def get_dirs(self): + if not self._auto_detect: + return set() + fnames = [ os.path.dirname(os.path.abspath(COMPILED_EXT_RE.sub('py', module.__file__))) for module in tuple(sys.modules.values()) @@ -100,6 +108,7 @@ def get_dirs(self): return set(fnames) def run(self): + # FIXME: _watchers/_dirs inconsistent - latter gets reset self._dirs = self.get_dirs() for dirname in self._dirs: @@ -117,7 +126,7 @@ def run(self): else: class InotifyReloader: - def __init__(self, extra_files=None, callback=None): + def __init__(self, extra_files=None, callback=None, auto_detect=False): raise ImportError('You must have the inotify module installed to ' 'use the inotify reloader') diff --git a/gunicorn/sock.py b/gunicorn/sock.py index eb2b6fa9c..8153ea323 100644 --- a/gunicorn/sock.py +++ b/gunicorn/sock.py @@ -4,13 +4,16 @@ import errno import os +import platform import socket import ssl import stat import sys import time +import struct from gunicorn import util +PLATFORM = sys.platform class BaseSocket: @@ -70,6 +73,9 @@ def close(self): self.sock = None + def get_backlog(self): + return -1 + class TCPSocket(BaseSocket): @@ -88,6 +94,23 @@ def set_options(self, sock, bound=False): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) return super().set_options(sock, bound=bound) + if PLATFORM == "linux": + def get_backlog(self): + if self.sock: + # tcp_info struct from include/uapi/linux/tcp.h + fmt = 'B' * 8 + 'I' * 24 + try: + tcp_info_struct = self.sock.getsockopt(socket.IPPROTO_TCP, + socket.TCP_INFO, 104) + # 12 is tcpi_unacked + return struct.unpack(fmt, tcp_info_struct)[12] + except AttributeError: + pass + return 0 + else: + def get_backlog(self): + return -1 + class TCP6Socket(TCPSocket): @@ -189,6 +212,10 @@ def create_sockets(conf, log, fds=None): log.error("Invalid address: %s", str(addr)) msg = "connection to {addr} failed: {error}" log.error(msg.format(addr=str(addr), error=str(e))) + if e.errno == errno.EOPNOTSUPP: # (sic!) + if "microsoft" in platform.release().lower(): + log.info("hint: mixing win32 filesystems and unix " + "sockets is not supported.") if i < 5: log.debug("Retrying in 1 second.") time.sleep(1) diff --git a/gunicorn/util.py b/gunicorn/util.py index e66dbebf3..8114a8206 100644 --- a/gunicorn/util.py +++ b/gunicorn/util.py @@ -4,7 +4,6 @@ import ast import email.utils import errno -import fcntl import html import importlib import inspect @@ -31,6 +30,7 @@ import urllib.parse REDIRECT_TO = getattr(os, 'devnull', '/dev/null') +REASON_PHRASE_RE = re.compile(rb'[ \t\x21-\x7e\x80-\xff]*') # Server and Date aren't technically hop-by-hop # headers, but they are in the purview of the @@ -251,14 +251,17 @@ def parse_address(netloc, default_port='8000'): def close_on_exec(fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFD) - flags |= fcntl.FD_CLOEXEC - fcntl.fcntl(fd, fcntl.F_SETFD, flags) + # available since Python 3.4, equivalent to either: + # ioctl(fd, FIOCLEX) + # fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC) + os.set_inheritable(fd, False) def set_non_blocking(fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK - fcntl.fcntl(fd, fcntl.F_SETFL, flags) + # available since Python 3.5, equivalent to either: + # ioctl(fd, FIONBIO) + # fcntl(fd, fcntl.F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) + os.set_blocking(fd, False) def close(sock): @@ -307,6 +310,16 @@ def write_nonblock(sock, data, chunked=False): def write_error(sock, status_int, reason, mesg): + # we may reflect user input in mesg + # .. as long as it is escaped appropriately for indicated Content-Type + # we should send our own reason text + # .. we shall never send misleading or invalid HTTP status lines + if not REASON_PHRASE_RE.fullmatch(reason.encode("latin-1")): + raise AssertionError("Attempted to return malformed error reason: %r" % (reason, )) + # we should avoid chosing status codes that are already in use + # indicating special handling in our proxies + if not (100 <= status_int <= 599): # RFC9110 15 + raise AssertionError("Attempted to return invalid error status code: %r" % (status_int, )) html_error = textwrap.dedent("""\ diff --git a/gunicorn/workers/base.py b/gunicorn/workers/base.py index 93c465c98..cf4e3372e 100644 --- a/gunicorn/workers/base.py +++ b/gunicorn/workers/base.py @@ -29,6 +29,8 @@ class Worker: + WORKAROUND_BASE_EXCEPTIONS = () # none + SIGNALS = [getattr(signal, "SIG%s" % x) for x in ( "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split() )] @@ -109,7 +111,7 @@ def init_process(self): # Prevent fd inheritance for s in self.sockets: - util.close_on_exec(s) + util.close_on_exec(s.fileno()) util.close_on_exec(self.tmp.fileno()) self.wait_fds = self.sockets + [self.PIPE[0]] @@ -119,7 +121,7 @@ def init_process(self): self.init_signals() # start the reloader - if self.cfg.reload: + if self.cfg.reload or self.cfg.reload_extra_files: def changed(fname): self.log.info("Worker reloading: %s modified", fname) self.alive = False @@ -130,7 +132,7 @@ def changed(fname): reloader_cls = reloader_engines[self.cfg.reload_engine] self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files, - callback=changed) + callback=changed, auto_detect=self.cfg.reload) self.load_wsgi() if self.reloader: @@ -262,7 +264,7 @@ def handle_error(self, req, client, addr, exc): if hasattr(req, "uri"): self.log.exception("Error handling request %s", req.uri) else: - self.log.exception("Error handling request (no URI read)") + self.log.debug("Error handling request (no URI read)") status_int = 500 reason = "Internal Server Error" mesg = "" diff --git a/gunicorn/workers/base_async.py b/gunicorn/workers/base_async.py index 9466d6aaa..bb6d560d6 100644 --- a/gunicorn/workers/base_async.py +++ b/gunicorn/workers/base_async.py @@ -81,7 +81,11 @@ def handle(self, listener, client, addr): self.log.debug("Ignoring socket not connected") else: self.log.debug("Ignoring EPIPE") - except BaseException as e: + except self.WORKAROUND_BASE_EXCEPTIONS as e: + self.log.warning("Catched async exception (compat workaround). " + "If this is not a bug in your app, please file a report.") + self.handle_error(req, client, addr, e) + except Exception as e: self.handle_error(req, client, addr, e) finally: util.close(client) diff --git a/gunicorn/workers/geventlet.py b/gunicorn/workers/geventlet.py index 087eb61ec..87f9c4e1f 100644 --- a/gunicorn/workers/geventlet.py +++ b/gunicorn/workers/geventlet.py @@ -123,6 +123,8 @@ def patch_sendfile(): class EventletWorker(AsyncWorker): + WORKAROUND_BASE_EXCEPTIONS = (eventlet.Timeout, ) + def patch(self): hubs.use_hub() eventlet.monkey_patch() diff --git a/gunicorn/workers/ggevent.py b/gunicorn/workers/ggevent.py index b9b9b4408..d45066410 100644 --- a/gunicorn/workers/ggevent.py +++ b/gunicorn/workers/ggevent.py @@ -31,6 +31,8 @@ class GeventWorker(AsyncWorker): + WORKAROUND_BASE_EXCEPTIONS = (gevent.Timeout, ) + server_class = None wsgi_handler = None @@ -41,7 +43,7 @@ def patch(self): sockets = [] for s in self.sockets: sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM, - fileno=s.sock.fileno())) + fileno=s.sock.detach())) self.sockets = sockets def notify(self): diff --git a/gunicorn/workers/gthread.py b/gunicorn/workers/gthread.py index 7a23228cd..b47ddaef5 100644 --- a/gunicorn/workers/gthread.py +++ b/gunicorn/workers/gthread.py @@ -13,6 +13,7 @@ from concurrent import futures import errno import os +import queue import selectors import socket import ssl @@ -21,7 +22,6 @@ from collections import deque from datetime import datetime from functools import partial -from threading import RLock from . import base from .. import http @@ -40,44 +40,64 @@ def __init__(self, cfg, sock, client, server): self.timeout = None self.parser = None - self.initialized = False - - # set the socket to non blocking - self.sock.setblocking(False) def init(self): - self.initialized = True - self.sock.setblocking(True) - if self.parser is None: # wrap the socket if needed if self.cfg.is_ssl: self.sock = sock.ssl_wrap_socket(self.sock, self.cfg) - # initialize the parser self.parser = http.RequestParser(self.cfg, self.sock, self.client) - def set_timeout(self): - # set the timeout - self.timeout = time.time() + self.cfg.keepalive + def is_initialized(self): + return bool(self.parser) + + def set_keepalive_timeout(self): + self.timeout = time.monotonic() + self.cfg.keepalive def close(self): util.close(self.sock) +class PollableMethodQueue(object): + + def __init__(self): + self.fds = [] + self.method_queue = None + + def init(self): + self.fds = os.pipe() + self.method_queue = queue.SimpleQueue() + + def close(self): + for fd in self.fds: + os.close(fd) + + def get_fd(self): + return self.fds[0] + + def defer(self, callback, *args): + self.method_queue.put(partial(callback, *args)) + os.write(self.fds[1], b'0') + + def run_callbacks(self, max_callbacks_at_a_time=10): + zeroes = os.read(self.fds[0], max_callbacks_at_a_time) + for _ in range(0, len(zeroes)): + method = self.method_queue.get() + method() + + class ThreadWorker(base.Worker): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.worker_connections = self.cfg.worker_connections self.max_keepalived = self.cfg.worker_connections - self.cfg.threads - # initialise the pool - self.tpool = None + self.thread_pool = None self.poller = None - self._lock = None - self.futures = deque() - self._keep = deque() + self.keepalived_conns = deque() self.nr_conns = 0 + self.method_queue = PollableMethodQueue() @classmethod def check_config(cls, cfg, log): @@ -88,100 +108,67 @@ def check_config(cls, cfg, log): "Check the number of worker connections and threads.") def init_process(self): - self.tpool = self.get_thread_pool() + self.thread_pool = self.get_thread_pool() self.poller = selectors.DefaultSelector() - self._lock = RLock() + self.method_queue.init() super().init_process() def get_thread_pool(self): """Override this method to customize how the thread pool is created""" return futures.ThreadPoolExecutor(max_workers=self.cfg.threads) + def handle_exit(self, sig, frame): + if self.alive: + self.alive = False + self.method_queue.defer(lambda: None) # To wake up poller.select() + def handle_quit(self, sig, frame): - self.alive = False - # worker_int callback - self.cfg.worker_int(self) - self.tpool.shutdown(False) - time.sleep(0.1) - sys.exit(0) - - def _wrap_future(self, fs, conn): - fs.conn = conn - self.futures.append(fs) - fs.add_done_callback(self.finish_request) - - def enqueue_req(self, conn): - conn.init() - # submit the connection to a worker - fs = self.tpool.submit(self.handle, conn) - self._wrap_future(fs, conn) + self.thread_pool.shutdown(False) + super().handle_quit(sig, frame) + + def set_accept_enabled(self, enabled): + for sock in self.sockets: + if enabled: + self.poller.register(sock, selectors.EVENT_READ, self.accept) + else: + self.poller.unregister(sock) - def accept(self, server, listener): + def accept(self, listener): try: sock, client = listener.accept() - # initialize the connection object - conn = TConn(self.cfg, sock, client, server) - self.nr_conns += 1 - # wait until socket is readable - with self._lock: - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + sock.setblocking(True) # Explicitly set behavior since it differs per OS + conn = TConn(self.cfg, sock, client, listener.getsockname()) + + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) except OSError as e: if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK): raise def on_client_socket_readable(self, conn, client): - with self._lock: - # unregister the client from the poller - self.poller.unregister(client) + self.poller.unregister(client) - if conn.initialized: - # remove the connection from keepalive - try: - self._keep.remove(conn) - except ValueError: - # race condition - return + if conn.is_initialized(): + self.keepalived_conns.remove(conn) + conn.init() - # submit the connection to a worker - self.enqueue_req(conn) + fs = self.thread_pool.submit(self.handle, conn) + fs.add_done_callback( + lambda fut: self.method_queue.defer(self.finish_request, conn, fut)) def murder_keepalived(self): - now = time.time() - while True: - with self._lock: - try: - # remove the connection from the queue - conn = self._keep.popleft() - except IndexError: - break - - delta = conn.timeout - now + now = time.monotonic() + while self.keepalived_conns: + delta = self.keepalived_conns[0].timeout - now if delta > 0: - # add the connection back to the queue - with self._lock: - self._keep.appendleft(conn) break - else: - self.nr_conns -= 1 - # remove the socket from the poller - with self._lock: - try: - self.poller.unregister(conn.sock) - except OSError as e: - if e.errno != errno.EBADF: - raise - except KeyError: - # already removed by the system, continue - pass - except ValueError: - # already removed by the system continue - pass - - # close the socket - conn.close() + + conn = self.keepalived_conns.popleft() + self.poller.unregister(conn.sock) + self.nr_conns -= 1 + conn.close() def is_parent_alive(self): # If our parent changed then we shut down. @@ -190,39 +177,23 @@ def is_parent_alive(self): return False return True + def wait_for_and_dispatch_events(self, timeout): + for key, _ in self.poller.select(timeout): + callback = key.data + callback(key.fileobj) + def run(self): - # init listeners, add them to the event loop - for sock in self.sockets: - sock.setblocking(False) - # a race condition during graceful shutdown may make the listener - # name unavailable in the request handler so capture it once here - server = sock.getsockname() - acceptor = partial(self.accept, server) - self.poller.register(sock, selectors.EVENT_READ, acceptor) + self.set_accept_enabled(True) + self.poller.register(self.method_queue.get_fd(), + selectors.EVENT_READ, + self.method_queue.run_callbacks) while self.alive: # notify the arbiter we are alive self.notify() - # can we accept more connections? - if self.nr_conns < self.worker_connections: - # wait for an event - events = self.poller.select(1.0) - for key, _ in events: - callback = key.data - callback(key.fileobj) - - # check (but do not wait) for finished requests - result = futures.wait(self.futures, timeout=0, - return_when=futures.FIRST_COMPLETED) - else: - # wait for a request to finish - result = futures.wait(self.futures, timeout=1.0, - return_when=futures.FIRST_COMPLETED) - - # clean up finished requests - for fut in result.done: - self.futures.remove(fut) + new_connections_accepted = self.nr_conns < self.worker_connections + self.wait_for_and_dispatch_events(timeout=1) if not self.is_parent_alive(): break @@ -230,57 +201,53 @@ def run(self): # handle keepalive timeouts self.murder_keepalived() - self.tpool.shutdown(False) + new_connections_still_accepted = self.nr_conns < self.worker_connections + if new_connections_accepted != new_connections_still_accepted: + self.set_accept_enabled(new_connections_still_accepted) + + # Don't accept any new connections, as we're about to shut down + if self.nr_conns < self.worker_connections: + self.set_accept_enabled(False) + + # ... but try handle all already accepted connections within the grace period + graceful_timeout = time.monotonic() + self.cfg.graceful_timeout + while self.nr_conns > 0: + time_remaining = max(graceful_timeout - time.monotonic(), 0) + if time_remaining == 0: + break + self.wait_for_and_dispatch_events(timeout=time_remaining) + + self.thread_pool.shutdown(wait=False) self.poller.close() + self.method_queue.close() for s in self.sockets: s.close() - futures.wait(self.futures, timeout=self.cfg.graceful_timeout) - - def finish_request(self, fs): - if fs.cancelled(): - self.nr_conns -= 1 - fs.conn.close() - return - + def finish_request(self, conn, fs): try: - (keepalive, conn) = fs.result() - # if the connection should be kept alived add it - # to the eventloop and record it + keepalive = not fs.cancelled() and fs.result() if keepalive and self.alive: - # flag the socket as non blocked - conn.sock.setblocking(False) - - # register the connection - conn.set_timeout() - with self._lock: - self._keep.append(conn) - - # add the socket to the event loop - self.poller.register(conn.sock, selectors.EVENT_READ, - partial(self.on_client_socket_readable, conn)) + conn.set_keepalive_timeout() + self.keepalived_conns.append(conn) + self.poller.register(conn.sock, selectors.EVENT_READ, + partial(self.on_client_socket_readable, conn)) else: self.nr_conns -= 1 conn.close() except Exception: - # an exception happened, make sure to close the - # socket. self.nr_conns -= 1 - fs.conn.close() + conn.close() def handle(self, conn): - keepalive = False req = None try: req = next(conn.parser) if not req: - return (False, conn) + return False # handle the request - keepalive = self.handle_request(req, conn) - if keepalive: - return (keepalive, conn) + return self.handle_request(req, conn) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) @@ -307,7 +274,7 @@ def handle(self, conn): except Exception as e: self.handle_error(req, conn.sock, conn.client, e) - return (False, conn) + return False def handle_request(self, req, conn): environ = {} @@ -327,7 +294,7 @@ def handle_request(self, req, conn): if not self.alive or not self.cfg.keepalive: resp.force_close() - elif len(self._keep) >= self.max_keepalived: + elif len(self.keepalived_conns) >= self.max_keepalived: resp.force_close() respiter = self.wsgi(environ, resp.start_response) diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py index 4c029f912..608dfe99d 100644 --- a/gunicorn/workers/sync.py +++ b/gunicorn/workers/sync.py @@ -27,7 +27,7 @@ class SyncWorker(base.Worker): def accept(self, listener): client, addr = listener.accept() client.setblocking(1) - util.close_on_exec(client) + util.close_on_exec(client.fileno()) self.handle(listener, client, addr) def wait(self, timeout): @@ -110,6 +110,8 @@ def run_for_multiple(self, timeout): return def run(self): + assert len(self.WORKAROUND_BASE_EXCEPTIONS) == 0 + # if no timeout is given the worker will never wait and will # use the CPU for nothing. This minimal timeout prevent it. timeout = self.timeout or 0.5 @@ -153,7 +155,7 @@ def handle(self, listener, client, addr): self.log.debug("Ignoring socket not connected") else: self.log.debug("Ignoring EPIPE") - except BaseException as e: + except Exception as e: self.handle_error(req, client, addr, e) finally: util.close(client) diff --git a/pyproject.toml b/pyproject.toml index eaca1eac0..24fae1d6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,9 @@ main = "gunicorn.app.pasterapp:serve" norecursedirs = ["examples", "lib", "local", "src"] testpaths = ["tests/"] addopts = "--assert=plain --cov=gunicorn --cov-report=xml" +filterwarnings = [ + "error", +] [tool.setuptools] zip-safe = false diff --git a/tests/requests/invalid/chunked_14.http b/tests/requests/invalid/chunked_14.http new file mode 100644 index 000000000..e51e15d95 --- /dev/null +++ b/tests/requests/invalid/chunked_14.http @@ -0,0 +1,7 @@ +POST /chunked_newline_in_chunk_ext HTTP/1.1\r\n +Transfer-Encoding: chunked\r\n +\r\n +5;foo\nbar\r\n +hello\r\n +0\r\n +\r\n diff --git a/tests/requests/invalid/chunked_14.py b/tests/requests/invalid/chunked_14.py new file mode 100644 index 000000000..14ef63b43 --- /dev/null +++ b/tests/requests/invalid/chunked_14.py @@ -0,0 +1,2 @@ +from gunicorn.http.errors import InvalidChunkExtension +request = InvalidChunkExtension diff --git a/tests/t.py b/tests/t.py index 4f1fcaf50..ad400adfe 100644 --- a/tests/t.py +++ b/tests/t.py @@ -64,3 +64,6 @@ def send(self, data): def seek(self, offset, whence=0): self.tmp.seek(offset, whence) + + def close(self): + self.tmp.close() diff --git a/tests/test_http.py b/tests/test_http.py index 3aa4808f9..0f6907235 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -104,6 +104,23 @@ def test_http_header_encoding(): with pytest.raises(UnicodeEncodeError): mocked_socket.sendall(util.to_bytestring(header_str, "ascii")) +def test_http_reflected_xss_in_error(): + """ If we put arbitrary user input into the HTTP status line, our proxy could get confused """ + + mocked_socket = mock.MagicMock() + with pytest.raises(UnicodeEncodeError): + util.write_error( + mocked_socket, 501, + "Not latin-1: \N{egg}", + "unused_", + ) + + with pytest.raises(AssertionError): + util.write_error( + mocked_socket, 501, + "Extra newline shall not appear in HTTP Status line: \n", + "harmless, will appear properly quoted in html", + ) def test_http_invalid_response_header(): """ tests whether http response headers are contains control chars """ @@ -182,11 +199,14 @@ def test_socket_unreader_chunk(): fake_sock = t.FakeSocket(io.BytesIO(b'Lorem ipsum dolor')) sock_unreader = SocketUnreader(fake_sock, max_chunk=5) - assert sock_unreader.chunk() == b'Lorem' - assert sock_unreader.chunk() == b' ipsu' - assert sock_unreader.chunk() == b'm dol' - assert sock_unreader.chunk() == b'or' - assert sock_unreader.chunk() == b'' + try: + assert sock_unreader.chunk() == b'Lorem' + assert sock_unreader.chunk() == b' ipsu' + assert sock_unreader.chunk() == b'm dol' + assert sock_unreader.chunk() == b'or' + assert sock_unreader.chunk() == b'' + finally: + fake_sock.close() def test_length_reader_read(): diff --git a/tests/test_nginx.py b/tests/test_nginx.py new file mode 100644 index 000000000..8d1ad69ed --- /dev/null +++ b/tests/test_nginx.py @@ -0,0 +1,470 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# hint: can see stdout as the (complex) test progresses using: +# python -B -m pytest -s -vvvv --ff \ +# --override-ini=addopts=--strict-markers --exitfirst \ +# -- tests/test_nginx.py + +import importlib +import os +import secrets +import shutil +import signal +import subprocess +import shutil +import sys +import time +from itertools import chain +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + import http.client + from typing import Any, NamedTuple, Self + +# test runner may not be system administrator. not needed here, to run nginx +PATH = "/usr/sbin:/usr/local/sbin:"+os.environ.get("PATH", "/usr/local/bin:/usr/bin") +CMD_OPENSSL = shutil.which("openssl", path=PATH) +CMD_NGINX = shutil.which("nginx", path=PATH) + +pytestmark = pytest.mark.skipif( + CMD_OPENSSL is None or CMD_NGINX is None, + reason="need nginx and openssl binaries", +) + +STDOUT = 0 +STDERR = 1 + +TEST_SIMPLE = [ + pytest.param("sync"), + "eventlet", + "gevent", + "gevent_wsgi", + "gevent_pywsgi", + # "tornado", + "gthread", + # "aiohttp.GunicornWebWorker", # different app signature + # "aiohttp.GunicornUVLoopWebWorker", # " +] # type: list[str|NamedTuple] + +WORKER_DEPENDS = { + "sync": [], + "gthread": [], + "aiohttp.GunicornWebWorker": ["aiohttp"], + "aiohttp.GunicornUVLoopWebWorker": ["aiohttp", "uvloop"], + "uvicorn.workers.UvicornWorker": ["uvicorn"], # deprecated + "uvicorn.workers.UvicornH11Worker": ["uvicorn"], # deprecated + "uvicorn_worker.UvicornWorker": ["uvicorn_worker"], + "uvicorn_worker.UvicornH11Worker": ["uvicorn_worker"], + "eventlet": ["eventlet"], + "gevent": ["gevent"], + "gevent_wsgi": ["gevent"], + "gevent_pywsgi": ["gevent"], + "tornado": ["tornado"], +} +DEP_WANTED = set(chain(*WORKER_DEPENDS.values())) # type: set[str] +DEP_INSTALLED = set() # type: set[str] +WORKER_ORDER = list(WORKER_DEPENDS.keys()) + +for dependency in DEP_WANTED: + try: + importlib.import_module(dependency) + DEP_INSTALLED.add(dependency) + except ImportError: + pass + +for worker_name, worker_needs in WORKER_DEPENDS.items(): + missing = list(pkg for pkg in worker_needs if pkg not in DEP_INSTALLED) + if missing: + for T in (TEST_SIMPLE,): + if worker_name not in T: + continue + T.remove(worker_name) + skipped_worker = pytest.param( + worker_name, marks=pytest.mark.skip("%s not installed" % (missing[0])) + ) + T.append(skipped_worker) + +WORKER_COUNT = 2 +GRACEFUL_TIMEOUT = 3 +APP_IMPORT_NAME = "testsyntax" +APP_FUNC_NAME = "myapp" +HTTP_HOST = "local.test" + +PY_APPLICATION = f""" +import time +def {APP_FUNC_NAME}(environ, start_response): + body = b"response body from app" + response_head = [ + ("Content-Type", "text/plain"), + ("Content-Length", "%d" % len(body)), + ] + start_response("200 OK", response_head) + time.sleep(0.02) + return iter([body]) +""" + +# used in string.format() - duplicate {{ and }} +NGINX_CONFIG_TEMPLATE = """ +pid {pid_path}; +worker_processes 1; +error_log stderr notice; +events {{ + worker_connections 1024; +}} +worker_shutdown_timeout 1; +http {{ + default_type application/octet-stream; + access_log /dev/stdout combined; + upstream upstream_gunicorn {{ + server {gunicorn_upstream} fail_timeout=0; + }} + + server {{ listen {server_bind} default_server; return 400; }} + server {{ + listen {server_bind}; client_max_body_size 4G; + server_name {server_name}; + root {static_dir}; + location / {{ try_files $uri @proxy_to_app; }} + + location @proxy_to_app {{ + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Host $http_host; + proxy_http_version 1.1; + proxy_redirect off; + proxy_pass {proxy_method}://upstream_gunicorn; + }} + }} +}} +""" + + +class SubProcess: + GRACEFUL_SIGNAL = signal.SIGTERM + + def __enter__(self): + # type: () -> Self + self.run() + return self + + def __exit__(self, *exc): + # type: (*Any) -> None + if self.p is None: + return + self.p.send_signal(signal.SIGKILL) + stdout, stderr = self.p.communicate(timeout=1 + GRACEFUL_TIMEOUT) + ret = self.p.returncode + assert stdout == b"", stdout + assert ret == 0, (ret, stdout, stderr) + + def read_stdio(self, *, key, timeout_sec, wait_for_keyword, expect=None): + # type: (int, int, str, set[str]|None) -> str + # try: + # stdout, stderr = self.p.communicate(timeout=timeout) + # except subprocess.TimeoutExpired: + buf = ["", ""] + seen_keyword = 0 + unseen_keywords = list(expect or []) + poll_per_second = 20 + assert key in {0, 1}, key + assert self.p is not None # this helps static type checkers + assert self.p.stdout is not None # this helps static type checkers + assert self.p.stderr is not None # this helps static type checkers + for _ in range(timeout_sec * poll_per_second): + print("parsing", buf, "waiting for", wait_for_keyword, unseen_keywords) + for fd, file in enumerate([self.p.stdout, self.p.stderr]): + read = file.read(64 * 1024) + if read is not None: + buf[fd] += read.decode("utf-8", "surrogateescape") + if seen_keyword or wait_for_keyword in buf[key]: + seen_keyword += 1 + for additional_keyword in tuple(unseen_keywords): + for somewhere in buf: + if additional_keyword in somewhere: + unseen_keywords.remove(additional_keyword) + # gathered all the context we wanted + if seen_keyword and not unseen_keywords: + break + # not seen expected output? wait for % of original timeout + # .. maybe we will still see better error context that way + if seen_keyword > (0.5 * timeout_sec * poll_per_second): + break + # retcode = self.p.poll() + # if retcode is not None: + # break + time.sleep(1.0 / poll_per_second) + # assert buf[abs(key - 1)] == "" + assert wait_for_keyword in buf[key], (wait_for_keyword, *buf) + assert not unseen_keywords, (unseen_keywords, *buf) + return buf[key] + + def run(self): + # type: () -> None + self.p = subprocess.Popen( + self._argv, + bufsize=0, # allow read to return short + cwd=self.temp_path, + shell=False, + close_fds=True, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + ) + os.set_blocking(self.p.stdout.fileno(), False) + os.set_blocking(self.p.stderr.fileno(), False) + assert self.p.stdout is not None # this helps static type checkers + + def graceful_quit(self, expect=None): + # type: (set[str]|None) -> str + if self.p is None: + raise AssertionError("called graceful_quit() when not running") + self.p.send_signal(self.GRACEFUL_SIGNAL) + # self.p.kill() + stdout = self.p.stdout.read(64 * 1024) or b"" + stderr = self.p.stderr.read(64 * 1024) or b"" + try: + o, e = self.p.communicate(timeout=GRACEFUL_TIMEOUT) + stdout += o + stderr += e + except subprocess.TimeoutExpired: + pass + assert stdout == b"" + self.p.stdout.close() + self.p.stderr.close() + exitcode = self.p.poll() # will return None if running + assert exitcode == 0, (exitcode, stdout, stderr) + print("output after signal: ", stdout, stderr, exitcode) + self.p = None + ret = stderr.decode("utf-8", "surrogateescape") + for keyword in expect or (): + assert keyword in ret, (keyword, ret) + return ret + + +class NginxProcess(SubProcess): + GRACEFUL_SIGNAL = signal.SIGQUIT + + def __init__( + self, + *, + temp_path, + config, + ): + assert isinstance(temp_path, Path) + self.conf_path = (temp_path / ("%s.nginx" % APP_IMPORT_NAME)).absolute() + self.p = None # type: subprocess.Popen[bytes] | None + self.temp_path = temp_path + with open(self.conf_path, "w+") as f: + f.write(config) + self._argv = [ + CMD_NGINX, + # nginx 1.19.5+ added the -e cmdline flag - may be testing earlier + # "-e", "stderr", + "-c", + "%s" % self.conf_path, + ] + + +def generate_dummy_ssl_cert(cert_path, key_path): + # dummy self-signed cert + subprocess.check_output( + [ + CMD_OPENSSL, + "req", + "-new", + "-newkey", + # "ed25519", + # OpenBSD 7.5 / LibreSSL 3.9.0 / Python 3.10.13 + # ssl.SSLError: [SSL: UNKNOWN_CERTIFICATE_TYPE] unknown certificate type (_ssl.c:3900) + # workaround: use RSA keys for testing + "rsa", + "-outform", + "PEM", + "-subj", + "/C=DE", + "-addext", + "subjectAltName=DNS:%s" % (HTTP_HOST), + "-days", + "1", + "-nodes", + "-x509", + "-keyout", + "%s" % (key_path), + "-out", + "%s" % (cert_path), + ], + shell=False, + ) + + +@pytest.fixture(scope="session") +def dummy_ssl_cert(tmp_path_factory): + base_tmp_dir = tmp_path_factory.getbasetemp().parent + crt = base_tmp_dir / "dummy.crt" + key = base_tmp_dir / "dummy.key" + print(crt, key) + # generate once, reuse for all tests + # with FileLock("%s.lock" % crt): + if not crt.is_file(): + generate_dummy_ssl_cert(crt, key) + return crt, key + + +class GunicornProcess(SubProcess): + def __init__( + self, + *, + temp_path, + server_bind, + read_size=1024, + ssl_files=None, + worker_class="sync", + ): + self.conf_path = Path(os.devnull) + self.p = None # type: subprocess.Popen[bytes] | None + assert isinstance(temp_path, Path) + self.temp_path = temp_path + self.py_path = (temp_path / ("%s.py" % APP_IMPORT_NAME)).absolute() + with open(self.py_path, "w+") as f: + f.write(PY_APPLICATION) + + ssl_opt = [] + if ssl_files is not None: + cert_path, key_path = ssl_files + ssl_opt = [ + "--do-handshake-on-connect", + "--certfile=%s" % cert_path, + "--keyfile=%s" % key_path, + ] + + self._argv = [ + sys.executable, + "-m", + "gunicorn", + "--config=%s" % self.conf_path, + "--log-level=debug", + "--worker-class=%s" % (worker_class, ), + "--workers=%d" % WORKER_COUNT, + # unsupported at the time this test was submitted + # "--buf-read-size=%d" % read_size, + "--enable-stdio-inheritance", + "--access-logfile=-", + "--disable-redirect-access-to-syslog", + "--graceful-timeout=%d" % (GRACEFUL_TIMEOUT,), + "--bind=%s" % server_bind, + "--reuse-port", + *ssl_opt, + "--", + f"{APP_IMPORT_NAME}:{APP_FUNC_NAME}", + ] + + +class Client: + def __init__(self, host_port): + # type: (str) -> None + self._host_port = host_port + + def __enter__(self): + # type: () -> Self + import http.client + + self.conn = http.client.HTTPConnection(self._host_port, timeout=2) + return self + + def __exit__(self, *exc): + self.conn.close() + + def get(self, path): + # type: () -> http.client.HTTPResponse + self.conn.request("GET", path, headers={"Host": HTTP_HOST}, body="GETBODY!") + return self.conn.getresponse() + + +# @pytest.mark.parametrize("read_size", [50+secrets.randbelow(2048)]) +@pytest.mark.parametrize("ssl", [False, True], ids=["plain", "ssl"]) +@pytest.mark.parametrize("worker_class", TEST_SIMPLE) +def test_nginx_proxy(*, ssl, worker_class, dummy_ssl_cert, read_size=1024): + # avoid ports <= 6144 which may be in use by CI runner + # avoid quickly reusing ports as they might not be cleared immediately on BSD + worker_index = WORKER_ORDER.index(worker_class) + fixed_port = 1024 * 6 + (2 if ssl else 0) + (4 * worker_index) + # FIXME: should also test inherited socket (LISTEN_FDS) + # FIXME: should also test non-inherited (named) UNIX socket + gunicorn_bind = "[::1]:%d" % fixed_port + + # syntax matches between nginx conf and http client + nginx_bind = "[::1]:%d" % (fixed_port + 1) + + static_dir = "/run/gunicorn/nonexist" + # gunicorn_upstream = "unix:/run/gunicorn/for-nginx.sock" + # syntax "[ipv6]:port" matches between gunicorn and nginx + gunicorn_upstream = gunicorn_bind + + with TemporaryDirectory(suffix="_temp_py") as tempdir_name, Client( + nginx_bind + ) as client: + temp_path = Path(tempdir_name) + nginx_config = NGINX_CONFIG_TEMPLATE.format( + server_bind=nginx_bind, + pid_path="%s" % (temp_path / "nginx.pid"), + gunicorn_upstream=gunicorn_upstream, + server_name=HTTP_HOST, + static_dir=static_dir, + proxy_method="https" if ssl else "http", + ) + + with GunicornProcess( + server_bind=gunicorn_bind, + worker_class=worker_class, + read_size=read_size, + ssl_files=dummy_ssl_cert if ssl else None, + temp_path=temp_path, + ) as server, NginxProcess( + config=nginx_config, + temp_path=temp_path, + ) as proxy: + proxy.read_stdio( + key=STDERR, + timeout_sec=4, + wait_for_keyword="start worker processes", + ) + + server.read_stdio( + key=STDERR, + wait_for_keyword="Arbiter booted", + timeout_sec=4, + expect={ + "Booting worker", + }, + ) + + for num_request in range(5): + path = "/pytest/%d" % (num_request) + response = client.get(path) + assert response.status == 200 + assert response.read() == b"response body from app" + + # using 1.1 to not fail on tornado reporting for 1.0 + # nginx sees our HTTP/1.1 request + proxy.read_stdio( + key=STDOUT, timeout_sec=2, wait_for_keyword="GET %s HTTP/1.1" % path + ) + # gunicorn sees the HTTP/1.1 request from nginx + server.read_stdio( + key=STDOUT, timeout_sec=2, wait_for_keyword="GET %s HTTP/1.1" % path + ) + + server.graceful_quit( + expect={ + "Handling signal: SIGTERM", + "Shutting down: Master", + }, + ) + proxy.graceful_quit() diff --git a/tests/test_reload.py b/tests/test_reload.py index f5b182583..5ea0549f1 100644 --- a/tests/test_reload.py +++ b/tests/test_reload.py @@ -39,9 +39,12 @@ def test_reload_on_syntax_error(): log = mock.Mock() worker = MyWorker(age=0, ppid=0, sockets=[], app=app, timeout=0, cfg=cfg, log=log) - worker.init_process() - reloader.start.assert_called_with() - reloader.add_extra_file.assert_called_with('syntax_error_filename') + try: + worker.init_process() + reloader.start.assert_called_with() + reloader.add_extra_file.assert_called_with('syntax_error_filename') + finally: + worker.tmp.close() def test_start_reloader_after_load_wsgi(): @@ -56,13 +59,16 @@ def test_start_reloader_after_load_wsgi(): log = mock.Mock() worker = MyWorker(age=0, ppid=0, sockets=[], app=app, timeout=0, cfg=cfg, log=log) - worker.load_wsgi = mock.Mock() - mock_parent = mock.Mock() - mock_parent.attach_mock(worker.load_wsgi, 'load_wsgi') - mock_parent.attach_mock(reloader.start, 'reloader_start') - - worker.init_process() - mock_parent.assert_has_calls([ - mock.call.load_wsgi(), - mock.call.reloader_start(), - ]) + try: + worker.load_wsgi = mock.Mock() + mock_parent = mock.Mock() + mock_parent.attach_mock(worker.load_wsgi, 'load_wsgi') + mock_parent.attach_mock(reloader.start, 'reloader_start') + + worker.init_process() + mock_parent.assert_has_calls([ + mock.call.load_wsgi(), + mock.call.reloader_start(), + ]) + finally: + worker.tmp.close() diff --git a/tests/test_wrk.py b/tests/test_wrk.py new file mode 100644 index 000000000..f5ce229d5 --- /dev/null +++ b/tests/test_wrk.py @@ -0,0 +1,406 @@ +# +# This file is part of gunicorn released under the MIT license. +# See the NOTICE for more information. + +# hint: can see stdout as the (complex) test progresses using: +# python -B -m pytest -s -vvvv --ff \ +# --override-ini=addopts=--strict-markers --exitfirst \ +# -- tests/test_nginx.py + +import importlib +import os +import secrets +import shutil +import signal +import subprocess +import sys +import re +import time +from itertools import chain +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + import http.client + from typing import Any, NamedTuple, Self + +# path may be /usr/local/bin for packages ported from other OS +CMD_OPENSSL = shutil.which("openssl") +CMD_WRK = shutil.which("wrk") + +RATE = re.compile(r"^Requests/sec: *([0-9]+(?:\.[0-9]+)?)$", re.MULTILINE) + +pytestmark = pytest.mark.skipif( + CMD_OPENSSL is None or CMD_WRK is None, + reason="need openssl and wrk binaries", +) + +STDOUT = 0 +STDERR = 1 + +TEST_SIMPLE = [ + pytest.param("sync"), + "eventlet", + "gevent", + "gevent_wsgi", + "gevent_pywsgi", + # "tornado", + "gthread", + # "aiohttp.GunicornWebWorker", # different app signature + # "aiohttp.GunicornUVLoopWebWorker", # " +] # type: list[str|NamedTuple] + +WORKER_DEPENDS = { + "aiohttp.GunicornWebWorker": ["aiohttp"], + "aiohttp.GunicornUVLoopWebWorker": ["aiohttp", "uvloop"], + "uvicorn.workers.UvicornWorker": ["uvicorn"], # deprecated + "uvicorn.workers.UvicornH11Worker": ["uvicorn"], # deprecated + "uvicorn_worker.UvicornWorker": ["uvicorn_worker"], + "uvicorn_worker.UvicornH11Worker": ["uvicorn_worker"], + "eventlet": ["eventlet"], + "gevent": ["gevent"], + "gevent_wsgi": ["gevent"], + "gevent_pywsgi": ["gevent"], + "tornado": ["tornado"], +} +DEP_WANTED = set(chain(*WORKER_DEPENDS.values())) # type: set[str] +DEP_INSTALLED = set() # type: set[str] + +for dependency in DEP_WANTED: + try: + importlib.import_module(dependency) + DEP_INSTALLED.add(dependency) + except ImportError: + pass + +for worker_name, worker_needs in WORKER_DEPENDS.items(): + missing = list(pkg for pkg in worker_needs if pkg not in DEP_INSTALLED) + if missing: + for T in (TEST_SIMPLE,): + if worker_name not in T: + continue + T.remove(worker_name) + skipped_worker = pytest.param( + worker_name, marks=pytest.mark.skip("%s not installed" % (missing[0])) + ) + T.append(skipped_worker) + +WORKER_COUNT = 2 +GRACEFUL_TIMEOUT = 10 +APP_IMPORT_NAME = "testsyntax" +APP_FUNC_NAME = "myapp" +HTTP_HOST = "local.test" + +PY_APPLICATION = f""" +import time +def {APP_FUNC_NAME}(environ, start_response): + body = b"response body from app" + response_head = [ + ("Content-Type", "text/plain"), + ("Content-Length", "%d" % len(body)), + ] + start_response("200 OK", response_head) + time.sleep(0.1) + return iter([body]) +""" + +class SubProcess: + GRACEFUL_SIGNAL = signal.SIGTERM + + def __enter__(self): + # type: () -> Self + self.run() + return self + + def __exit__(self, *exc): + # type: (*Any) -> None + if self.p is None: + return + self.p.send_signal(signal.SIGKILL) + stdout, stderr = self.p.communicate(timeout=1 + GRACEFUL_TIMEOUT) + ret = self.p.returncode + assert stdout[-512:] == b"", stdout + assert ret == 0, (ret, stdout, stderr) + + def read_stdio(self, *, key, timeout_sec, wait_for_keyword, expect=None): + # type: (int, int, str, set[str]|None) -> str + # try: + # stdout, stderr = self.p.communicate(timeout=timeout) + # except subprocess.TimeoutExpired: + buf = ["", ""] + seen_keyword = 0 + unseen_keywords = list(expect or []) + poll_per_second = 20 + assert key in {0, 1}, key + assert self.p is not None # this helps static type checkers + assert self.p.stdout is not None # this helps static type checkers + assert self.p.stderr is not None # this helps static type checkers + for _ in range(timeout_sec * poll_per_second): + keep_reading = False + for fd, file in enumerate([self.p.stdout, self.p.stderr]): + read = file.read(64 * 1024) + if read is not None: + buf[fd] += read.decode("utf-8", "surrogateescape") + keep_reading = True + if seen_keyword or wait_for_keyword in buf[key]: + seen_keyword += 1 + for additional_keyword in tuple(unseen_keywords): + for somewhere in buf: + if additional_keyword in somewhere: + unseen_keywords.remove(additional_keyword) + # gathered all the context we wanted + if seen_keyword and not unseen_keywords: + if not keep_reading: + break + # not seen expected output? wait for % of original timeout + # .. maybe we will still see better error context that way + if seen_keyword > (0.5 * timeout_sec * poll_per_second): + break + # retcode = self.p.poll() + # if retcode is not None: + # break + time.sleep(1.0 / poll_per_second) + # assert buf[abs(key - 1)] == "" + assert wait_for_keyword in buf[key], (wait_for_keyword, *buf) + assert not unseen_keywords, (unseen_keywords, *buf) + return buf[key] + + def run(self): + # type: () -> None + self.p = subprocess.Popen( + self._argv, + bufsize=0, # allow read to return short + cwd=self.temp_path, + shell=False, + close_fds=True, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + ) + os.set_blocking(self.p.stdout.fileno(), False) + os.set_blocking(self.p.stderr.fileno(), False) + assert self.p.stdout is not None # this helps static type checkers + + def graceful_quit(self, expect=None, ignore=None): + # type: (set[str]|None) -> str + if self.p is None: + raise AssertionError("called graceful_quit() when not running") + self.p.send_signal(self.GRACEFUL_SIGNAL) + # self.p.kill() + stdout = self.p.stdout.read(64 * 1024) or b"" + stderr = self.p.stderr.read(64 * 1024) or b"" + try: + o, e = self.p.communicate(timeout=GRACEFUL_TIMEOUT) + stdout += o + stderr += e + except subprocess.TimeoutExpired: + pass + out = stdout.decode("utf-8", "surrogateescape") + for line in out.split("\n"): + if any(i in line for i in (ignore or ())): + continue + assert line == "" + exitcode = self.p.poll() # will return None if running + self.p.stdout.close() + self.p.stderr.close() + assert exitcode == 0, (exitcode, stdout, stderr) + # print("output after signal: ", stdout, stderr, exitcode) + self.p = None + ret = stderr.decode("utf-8", "surrogateescape") + for keyword in expect or (): + assert keyword in ret, (keyword, ret) + return ret + + +def generate_dummy_ssl_cert(cert_path, key_path): + # dummy self-signed cert + subprocess.check_output( + [ + CMD_OPENSSL, + "req", + "-new", + "-newkey", + # "ed25519", + # OpenBSD 7.5 / LibreSSL 3.9.0 / Python 3.10.13 + # ssl.SSLError: [SSL: UNKNOWN_CERTIFICATE_TYPE] unknown certificate type (_ssl.c:3900) + # workaround: use RSA keys for testing + "rsa", + "-outform", + "PEM", + "-subj", + "/C=DE", + "-addext", + "subjectAltName=DNS:%s" % (HTTP_HOST), + "-days", + "1", + "-nodes", + "-x509", + "-keyout", + "%s" % (key_path), + "-out", + "%s" % (cert_path), + ], + shell=False, + ) + + +@pytest.fixture(scope="session") +def dummy_ssl_cert(tmp_path_factory): + base_tmp_dir = tmp_path_factory.getbasetemp().parent + crt = base_tmp_dir / "dummy.crt" + key = base_tmp_dir / "dummy.key" + print(crt, key) + # generate once, reuse for all tests + # with FileLock("%s.lock" % crt): + if not crt.is_file(): + generate_dummy_ssl_cert(crt, key) + return crt, key + + +class GunicornProcess(SubProcess): + def __init__( + self, + *, + temp_path, + server_bind, + read_size=1024, + ssl_files=None, + worker_class="sync", + ): + self.conf_path = Path(os.devnull) + self.p = None # type: subprocess.Popen[bytes] | None + assert isinstance(temp_path, Path) + self.temp_path = temp_path + self.py_path = (temp_path / ("%s.py" % APP_IMPORT_NAME)).absolute() + with open(self.py_path, "w+") as f: + f.write(PY_APPLICATION) + + ssl_opt = [] + if ssl_files is not None: + cert_path, key_path = ssl_files + ssl_opt = [ + "--do-handshake-on-connect", + "--certfile=%s" % cert_path, + "--keyfile=%s" % key_path, + ] + thread_opt = [] + if worker_class != "sync": + thread_opt = ["--threads=50"] + + self._argv = [ + sys.executable, + "-m", + "gunicorn", + "--config=%s" % self.conf_path, + "--log-level=info", + "--worker-class=%s" % worker_class, + "--workers=%d" % WORKER_COUNT, + # unsupported at the time this test was submitted + # "--buf-read-size=%d" % read_size, + "--enable-stdio-inheritance", + "--access-logfile=-", + "--disable-redirect-access-to-syslog", + "--graceful-timeout=%d" % (GRACEFUL_TIMEOUT,), + "--bind=%s" % server_bind, + "--reuse-port", + *thread_opt, + *ssl_opt, + "--", + f"{APP_IMPORT_NAME}:{APP_FUNC_NAME}", + ] + + +class Client: + def __init__(self, url_base): + # type: (str) -> None + self._url_base = url_base + self._env = os.environ.copy() + self._env["LC_ALL"] = "C" + + def __enter__(self): + # type: () -> Self + return self + + def __exit__(self, *exc): + pass + + def get(self, path): + # type: () -> http.client.HTTPResponse + assert path.startswith("/") + threads = 10 + connections = 100 + out = subprocess.check_output([CMD_WRK, "-t", "%d" % threads, "-c","%d" % connections, "-d5s","%s%s" % (self._url_base, path, )], shell=False, env=self._env) + + return out.decode("utf-8", "replace") + + +# @pytest.mark.parametrize("read_size", [50+secrets.randbelow(2048)]) +@pytest.mark.parametrize("ssl", [False, True], ids=["plain", "ssl"]) +@pytest.mark.parametrize("worker_class", TEST_SIMPLE) +def test_wrk(*, ssl, worker_class, dummy_ssl_cert, read_size=1024): + + if worker_class == "eventlet" and ssl: + pytest.skip("eventlet worker does not catch errors in ssl.wrap_socket") + + # avoid ports <= 6144 which may be in use by CI runner + fixed_port = 1024 * 6 + secrets.randbelow(1024 * 9) + # FIXME: should also test inherited socket (LISTEN_FDS) + # FIXME: should also test non-inherited (named) UNIX socket + gunicorn_bind = "[::1]:%d" % fixed_port + + proxy_method="https" if ssl else "http" + + with TemporaryDirectory(suffix="_temp_py") as tempdir_name, Client( + proxy_method + "://" + gunicorn_bind + ) as client: + temp_path = Path(tempdir_name) + + with GunicornProcess( + server_bind=gunicorn_bind, + worker_class=worker_class, + read_size=read_size, + ssl_files=dummy_ssl_cert if ssl else None, + temp_path=temp_path, + ) as server: + server.read_stdio( + key=STDERR, + wait_for_keyword="[INFO] Starting gunicorn", + timeout_sec=6, + expect={ + "[INFO] Booting worker", + }, + ) + + path = "/pytest/basic" + out = client.get(path) + print("##############\n" + out) + + extract = RATE.search(out) + assert extract is not None, out + rate = float(extract.groups()[0]) + if worker_class == "sync": + assert rate > 5 + else: + assert rate > 50 + + server.read_stdio( + key=STDOUT, timeout_sec=2, wait_for_keyword="GET %s HTTP/1.1" % path + ) + if ssl: + pass + #server.read_stdio( + # key=STDERR, + # wait_for_keyword="[DEBUG] ssl connection closed", + # timeout_sec=4, + #) + + server.graceful_quit( + ignore={"GET %s HTTP/1.1" % path, "Ignoring connection epipe", "Ignoring connection reset"}, + expect={ + "[INFO] Handling signal: SIGTERM", + }, + )