diff --git a/.github/workflows/test_core.yml b/.github/workflows/test_core.yml index 4ef1728e..b10da64b 100644 --- a/.github/workflows/test_core.yml +++ b/.github/workflows/test_core.yml @@ -22,6 +22,8 @@ jobs: uses: actions/setup-python@v1 with: python-version: ${{ matrix.python-version }} + - name: Fix setuptools_scm + run: pip install "setuptools_scm<7" - name: Install dependencies run: make devenv - name: Test diff --git a/.github/workflows/test_win.yml b/.github/workflows/test_win.yml index b1aa9218..273cf9f1 100644 --- a/.github/workflows/test_win.yml +++ b/.github/workflows/test_win.yml @@ -14,7 +14,7 @@ jobs: runs-on: windows-latest strategy: matrix: - python-version: [3.7, 3.8, 3.9, "3.10"] + python-version: [3.7, 3.8, 3.9] steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/fugue/dataframe/dataframe.py b/fugue/dataframe/dataframe.py index d2a16c45..2d648200 100644 --- a/fugue/dataframe/dataframe.py +++ b/fugue/dataframe/dataframe.py @@ -1,12 +1,12 @@ import json from abc import ABC, abstractmethod -from threading import RLock from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union import pandas as pd import pyarrow as pa from fugue.collections.yielded import Yielded from fugue.exceptions import FugueDataFrameEmptyError, FugueDataFrameOperationError +from triad import SerializableRLock from triad.collections.dict import ParamDict from triad.collections.schema import Schema from triad.exceptions import InvalidOperationError @@ -28,7 +28,7 @@ class DataFrame(ABC): implementing a new :class:`~fugue.execution.execution_engine.ExecutionEngine` """ - _SHOW_LOCK = RLock() + _SHOW_LOCK = SerializableRLock() def __init__(self, schema: Any = None, metadata: Any = None): if not callable(schema): @@ -45,7 +45,7 @@ def __init__(self, schema: Any = None, metadata: Any = None): else ParamDict(metadata, deep=True) ) self._metadata.set_readonly() - self._lazy_schema_lock = RLock() + self._lazy_schema_lock = SerializableRLock() @property def metadata(self) -> ParamDict: diff --git a/fugue/execution/execution_engine.py b/fugue/execution/execution_engine.py index 680e44d9..4b1854ae 100644 --- a/fugue/execution/execution_engine.py +++ b/fugue/execution/execution_engine.py @@ -1,6 +1,5 @@ import logging from abc import ABC, abstractmethod -from threading import RLock from typing import Any, Callable, Dict, Iterable, List, Optional, Union from uuid import uuid4 @@ -17,7 +16,7 @@ from fugue.dataframe.utils import deserialize_df, serialize_df from fugue.exceptions import FugueBug from fugue.rpc import RPCServer, make_rpc_server -from triad import ParamDict, Schema, assert_or_throw +from triad import ParamDict, Schema, SerializableRLock, assert_or_throw from triad.collections.fs import FileSystem from triad.exceptions import InvalidOperationError from triad.utils.convert import to_size @@ -83,7 +82,7 @@ def __init__(self, conf: Any): self._conf = ParamDict({**_FUGUE_GLOBAL_CONF, **_conf}) self._compile_conf = ParamDict() self._rpc_server = make_rpc_server(self.conf) - self._engine_start_lock = RLock() + self._engine_start_lock = SerializableRLock() self._engine_start = 0 self._sql_engine: Optional[SQLEngine] = None diff --git a/fugue/extensions/_builtins/outputters.py b/fugue/extensions/_builtins/outputters.py index 4a515295..051cdf01 100644 --- a/fugue/extensions/_builtins/outputters.py +++ b/fugue/extensions/_builtins/outputters.py @@ -1,4 +1,3 @@ -from threading import RLock from typing import Callable, List, Optional, no_type_check from fugue.collections.partition import PartitionCursor @@ -11,6 +10,7 @@ from fugue.extensions.transformer.convert import _to_output_transformer from fugue.extensions.transformer.transformer import CoTransformer, Transformer from fugue.rpc import EmptyRPCHandler, to_rpc_handler +from triad import SerializableRLock from triad.collections.dict import ParamDict from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw @@ -18,7 +18,7 @@ class Show(Outputter): - LOCK = RLock() + LOCK = SerializableRLock() _hook: Optional[Callable] = None def process(self, dfs: DataFrames) -> None: diff --git a/fugue/rpc/base.py b/fugue/rpc/base.py index 5af904c7..f51fe682 100644 --- a/fugue/rpc/base.py +++ b/fugue/rpc/base.py @@ -1,12 +1,11 @@ +import pickle from abc import ABC, abstractmethod -from threading import RLock +from types import LambdaType from typing import Any, Callable, Dict from uuid import uuid4 -from triad import ParamDict, assert_or_throw, to_uuid -from triad.utils.convert import to_type, get_full_type_path -import pickle -from types import LambdaType +from triad import ParamDict, SerializableRLock, assert_or_throw, to_uuid +from triad.utils.convert import get_full_type_path, to_type class RPCClient(object): @@ -20,7 +19,7 @@ class RPCHandler(RPCClient): """RPC handler hosting the real logic on driver side""" def __init__(self): - self._rpchandler_lock = RLock() + self._rpchandler_lock = SerializableRLock() self._running = 0 @property diff --git a/fugue/workflow/_workflow_context.py b/fugue/workflow/_workflow_context.py index e747f275..fdc2008f 100644 --- a/fugue/workflow/_workflow_context.py +++ b/fugue/workflow/_workflow_context.py @@ -1,4 +1,3 @@ -from threading import RLock from typing import Any, Dict from uuid import uuid4 @@ -14,6 +13,7 @@ from fugue.execution.execution_engine import ExecutionEngine from fugue.execution.factory import make_execution_engine from fugue.workflow._checkpoint import CheckpointPath +from triad import SerializableRLock class FugueWorkflowContext(WorkflowContext): @@ -26,7 +26,7 @@ def __init__( ): ee = make_execution_engine(execution_engine) self._fugue_engine = ee - self._lock = RLock() + self._lock = SerializableRLock() self._results: Dict[Any, DataFrame] = {} self._execution_id = "" self._checkpoint_path = CheckpointPath(self.execution_engine) diff --git a/fugue/workflow/workflow.py b/fugue/workflow/workflow.py index ad7b99ae..ccdf8cdd 100644 --- a/fugue/workflow/workflow.py +++ b/fugue/workflow/workflow.py @@ -1,6 +1,5 @@ import sys from collections import defaultdict -from threading import RLock from typing import Any, Callable, Dict, Iterable, List, Optional, Set, TypeVar, Union from uuid import uuid4 @@ -65,7 +64,13 @@ from fugue.workflow._checkpoint import FileCheckpoint, WeakCheckpoint from fugue.workflow._tasks import Create, CreateData, FugueTask, Output, Process from fugue.workflow._workflow_context import FugueWorkflowContext -from triad import ParamDict, Schema, assert_or_throw, extensible_class +from triad import ( + ParamDict, + Schema, + SerializableRLock, + assert_or_throw, + extensible_class, +) _DEFAULT_IGNORE_ERRORS: List[Any] = [] @@ -1435,7 +1440,7 @@ class FugueWorkflow: """ def __init__(self, *args: Any, **kwargs: Any): - self._lock = RLock() + self._lock = SerializableRLock() self._spec = WorkflowSpec() self._workflow_ctx = self._to_ctx(*args, **kwargs) self._computed = False diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index 22f205d6..4e7cf911 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -1,6 +1,5 @@ import logging -from threading import RLock -from typing import Any, Callable, Dict, List, Optional, Union, Iterable +from typing import Any, Callable, Dict, Iterable, List, Optional, Union import duckdb import pyarrow as pa @@ -25,6 +24,7 @@ ExecutionEngine, SQLEngine, ) +from triad import SerializableRLock from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw @@ -32,7 +32,6 @@ from fugue_duckdb._utils import encode_value_to_expr, get_temp_df_name from fugue_duckdb.dataframe import DuckDataFrame - _FUGUE_DUCKDB_PRAGMA_CONFIG_PREFIX = "fugue.duckdb.pragma." @@ -89,7 +88,7 @@ def __init__( self._native_engine = NativeExecutionEngine(conf) self._con = connection or duckdb.connect() self._external_con = connection is not None - self._context_lock = RLock() + self._context_lock = SerializableRLock() try: for pg in list(self._get_pragmas()): # transactional diff --git a/fugue_spark/dataframe.py b/fugue_spark/dataframe.py index eff0ee3a..01f6bf81 100644 --- a/fugue_spark/dataframe.py +++ b/fugue_spark/dataframe.py @@ -1,4 +1,3 @@ -from threading import RLock from typing import Any, Dict, Iterable, List, Optional import pandas as pd @@ -12,10 +11,12 @@ PandasDataFrame, ) from fugue.exceptions import FugueDataFrameOperationError -from fugue_spark._utils.convert import to_cast_expression, to_schema, to_type_safe_input +from triad import SerializableRLock from triad.collections.schema import SchemaError from triad.utils.assertion import assert_or_throw +from fugue_spark._utils.convert import to_cast_expression, to_schema, to_type_safe_input + class SparkDataFrame(DataFrame): """DataFrame that wraps Spark DataFrame. Please also read @@ -37,7 +38,7 @@ class SparkDataFrame(DataFrame): def __init__( # noqa: C901 self, df: Any = None, schema: Any = None, metadata: Any = None ): - self._lock = RLock() + self._lock = SerializableRLock() if isinstance(df, ps.DataFrame): if schema is not None: schema = to_schema(schema).assert_not_empty() diff --git a/setup.cfg b/setup.cfg index e8b48d7e..f67dde73 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,7 +25,7 @@ omit = fugue_sql/_antlr/* [flake8] -ignore = E24,E203,W503,C401,C408,A001,A003,W504,C407,C405 +ignore = E24,E203,W503,C401,C408,A001,A003,W504,C407,C405,B023 max-line-length = 88 format = pylint exclude = .svc,CVS,.bzr,.hg,.git,__pycache__,venv,tests/*,docs/* diff --git a/setup.py b/setup.py index 6ca9025e..e443b6fa 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ def get_version() -> str: keywords="distributed spark dask sql dsl domain specific language", url="http://github.com/fugue-project/fugue", install_requires=[ - "triad>=0.6.1", + "triad>=0.6.4", "adagio>=0.2.4", "qpd==0.3.0.dev2", "fugue-sql-antlr>=0.1.0", diff --git a/tests/fugue/dataframe/test_utils.py b/tests/fugue/dataframe/test_utils.py index 0ae76d37..1ae235d6 100644 --- a/tests/fugue/dataframe/test_utils.py +++ b/tests/fugue/dataframe/test_utils.py @@ -1,6 +1,5 @@ import concurrent.futures import os -from threading import RLock import numpy as np import pyarrow as pa