Skip to content

Commit

Permalink
fix(ingest): avoid multiprocessing "fork" start method
Browse files Browse the repository at this point in the history
We use threads in a variety of places within our CLI. The multiprocessing "fork" start method is not safe to use with threads.
MacOS and Windows already default to "spawn", and Linux will as well starting in Python 3.14.
See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

This fixes a bug where the classification module would deadlock when
using more than one worker process.

Eventually it may make sense to use "forkserver" as the default where available, but we can revisit that in the future.
  • Loading branch information
hsheth2 committed Feb 3, 2025
1 parent 6f0d475 commit a813ab8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
9 changes: 9 additions & 0 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import multiprocessing
import os
import platform
import sys
Expand Down Expand Up @@ -217,6 +218,14 @@ def init(use_password: bool = False) -> None:


def main(**kwargs):
# We use threads in a variety of places within our CLI. The multiprocessing
# "fork" start method is not safe to use with threads.
# MacOS and Windows already default to "spawn", and Linux will as well starting in Python 3.14.
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# Eventually it may make sense to use "forkserver" as the default where available,
# but we can revisit that in the future.
multiprocessing.set_start_method("spawn", force=True)

# This wrapper prevents click from suppressing errors.
try:
sys.exit(datahub(standalone_mode=False, **kwargs))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import concurrent.futures
import logging
import multiprocessing
from dataclasses import dataclass, field
from functools import partial
from math import ceil
Expand Down Expand Up @@ -182,6 +183,11 @@ def async_classify(

with concurrent.futures.ProcessPoolExecutor(
max_workers=self.config.classification.max_workers,
# The fork start method, which is the default on Linux for Python < 3.14, is not
# safe when the main process uses threads. The default start method on windows/macOS is
# already spawn, and will be changed to spawn for Linux in Python 3.14.
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
mp_context=multiprocessing.get_context("spawn"),
) as executor:
column_info_proposal_futures = [
executor.submit(
Expand Down

0 comments on commit a813ab8

Please sign in to comment.