Skip to content

Commit

Permalink
Replace RLock with SerializableRLock (#335)
Browse files Browse the repository at this point in the history
* Replace RLock with SerializableRLock

* fix build

* fix build

* fix build

* fix build
  • Loading branch information
goodwanghan authored Jul 10, 2022
1 parent 7bbeabd commit 52bd892
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 30 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test_core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Fix setuptools_scm
run: pip install "setuptools_scm<7"
- name: Install dependencies
run: make devenv
- name: Test
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: windows-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9, "3.10"]
python-version: [3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
6 changes: 3 additions & 3 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
from abc import ABC, abstractmethod
from threading import RLock
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import pandas as pd
import pyarrow as pa
from fugue.collections.yielded import Yielded
from fugue.exceptions import FugueDataFrameEmptyError, FugueDataFrameOperationError
from triad import SerializableRLock
from triad.collections.dict import ParamDict
from triad.collections.schema import Schema
from triad.exceptions import InvalidOperationError
Expand All @@ -28,7 +28,7 @@ class DataFrame(ABC):
implementing a new :class:`~fugue.execution.execution_engine.ExecutionEngine`
"""

_SHOW_LOCK = RLock()
_SHOW_LOCK = SerializableRLock()

def __init__(self, schema: Any = None, metadata: Any = None):
if not callable(schema):
Expand All @@ -45,7 +45,7 @@ def __init__(self, schema: Any = None, metadata: Any = None):
else ParamDict(metadata, deep=True)
)
self._metadata.set_readonly()
self._lazy_schema_lock = RLock()
self._lazy_schema_lock = SerializableRLock()

@property
def metadata(self) -> ParamDict:
Expand Down
5 changes: 2 additions & 3 deletions fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from abc import ABC, abstractmethod
from threading import RLock
from typing import Any, Callable, Dict, Iterable, List, Optional, Union
from uuid import uuid4

Expand All @@ -17,7 +16,7 @@
from fugue.dataframe.utils import deserialize_df, serialize_df
from fugue.exceptions import FugueBug
from fugue.rpc import RPCServer, make_rpc_server
from triad import ParamDict, Schema, assert_or_throw
from triad import ParamDict, Schema, SerializableRLock, assert_or_throw
from triad.collections.fs import FileSystem
from triad.exceptions import InvalidOperationError
from triad.utils.convert import to_size
Expand Down Expand Up @@ -83,7 +82,7 @@ def __init__(self, conf: Any):
self._conf = ParamDict({**_FUGUE_GLOBAL_CONF, **_conf})
self._compile_conf = ParamDict()
self._rpc_server = make_rpc_server(self.conf)
self._engine_start_lock = RLock()
self._engine_start_lock = SerializableRLock()
self._engine_start = 0
self._sql_engine: Optional[SQLEngine] = None

Expand Down
4 changes: 2 additions & 2 deletions fugue/extensions/_builtins/outputters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from threading import RLock
from typing import Callable, List, Optional, no_type_check

from fugue.collections.partition import PartitionCursor
Expand All @@ -11,14 +10,15 @@
from fugue.extensions.transformer.convert import _to_output_transformer
from fugue.extensions.transformer.transformer import CoTransformer, Transformer
from fugue.rpc import EmptyRPCHandler, to_rpc_handler
from triad import SerializableRLock
from triad.collections.dict import ParamDict
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_type


class Show(Outputter):
LOCK = RLock()
LOCK = SerializableRLock()
_hook: Optional[Callable] = None

def process(self, dfs: DataFrames) -> None:
Expand Down
11 changes: 5 additions & 6 deletions fugue/rpc/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import pickle
from abc import ABC, abstractmethod
from threading import RLock
from types import LambdaType
from typing import Any, Callable, Dict
from uuid import uuid4

from triad import ParamDict, assert_or_throw, to_uuid
from triad.utils.convert import to_type, get_full_type_path
import pickle
from types import LambdaType
from triad import ParamDict, SerializableRLock, assert_or_throw, to_uuid
from triad.utils.convert import get_full_type_path, to_type


class RPCClient(object):
Expand All @@ -20,7 +19,7 @@ class RPCHandler(RPCClient):
"""RPC handler hosting the real logic on driver side"""

def __init__(self):
self._rpchandler_lock = RLock()
self._rpchandler_lock = SerializableRLock()
self._running = 0

@property
Expand Down
4 changes: 2 additions & 2 deletions fugue/workflow/_workflow_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from threading import RLock
from typing import Any, Dict
from uuid import uuid4

Expand All @@ -14,6 +13,7 @@
from fugue.execution.execution_engine import ExecutionEngine
from fugue.execution.factory import make_execution_engine
from fugue.workflow._checkpoint import CheckpointPath
from triad import SerializableRLock


class FugueWorkflowContext(WorkflowContext):
Expand All @@ -26,7 +26,7 @@ def __init__(
):
ee = make_execution_engine(execution_engine)
self._fugue_engine = ee
self._lock = RLock()
self._lock = SerializableRLock()
self._results: Dict[Any, DataFrame] = {}
self._execution_id = ""
self._checkpoint_path = CheckpointPath(self.execution_engine)
Expand Down
11 changes: 8 additions & 3 deletions fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys
from collections import defaultdict
from threading import RLock
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, TypeVar, Union
from uuid import uuid4

Expand Down Expand Up @@ -65,7 +64,13 @@
from fugue.workflow._checkpoint import FileCheckpoint, WeakCheckpoint
from fugue.workflow._tasks import Create, CreateData, FugueTask, Output, Process
from fugue.workflow._workflow_context import FugueWorkflowContext
from triad import ParamDict, Schema, assert_or_throw, extensible_class
from triad import (
ParamDict,
Schema,
SerializableRLock,
assert_or_throw,
extensible_class,
)

_DEFAULT_IGNORE_ERRORS: List[Any] = []

Expand Down Expand Up @@ -1435,7 +1440,7 @@ class FugueWorkflow:
"""

def __init__(self, *args: Any, **kwargs: Any):
self._lock = RLock()
self._lock = SerializableRLock()
self._spec = WorkflowSpec()
self._workflow_ctx = self._to_ctx(*args, **kwargs)
self._computed = False
Expand Down
7 changes: 3 additions & 4 deletions fugue_duckdb/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from threading import RLock
from typing import Any, Callable, Dict, List, Optional, Union, Iterable
from typing import Any, Callable, Dict, Iterable, List, Optional, Union

import duckdb
import pyarrow as pa
Expand All @@ -25,14 +24,14 @@
ExecutionEngine,
SQLEngine,
)
from triad import SerializableRLock
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw

from fugue_duckdb._io import DuckDBIO
from fugue_duckdb._utils import encode_value_to_expr, get_temp_df_name
from fugue_duckdb.dataframe import DuckDataFrame


_FUGUE_DUCKDB_PRAGMA_CONFIG_PREFIX = "fugue.duckdb.pragma."


Expand Down Expand Up @@ -89,7 +88,7 @@ def __init__(
self._native_engine = NativeExecutionEngine(conf)
self._con = connection or duckdb.connect()
self._external_con = connection is not None
self._context_lock = RLock()
self._context_lock = SerializableRLock()

try:
for pg in list(self._get_pragmas()): # transactional
Expand Down
7 changes: 4 additions & 3 deletions fugue_spark/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from threading import RLock
from typing import Any, Dict, Iterable, List, Optional

import pandas as pd
Expand All @@ -12,10 +11,12 @@
PandasDataFrame,
)
from fugue.exceptions import FugueDataFrameOperationError
from fugue_spark._utils.convert import to_cast_expression, to_schema, to_type_safe_input
from triad import SerializableRLock
from triad.collections.schema import SchemaError
from triad.utils.assertion import assert_or_throw

from fugue_spark._utils.convert import to_cast_expression, to_schema, to_type_safe_input


class SparkDataFrame(DataFrame):
"""DataFrame that wraps Spark DataFrame. Please also read
Expand All @@ -37,7 +38,7 @@ class SparkDataFrame(DataFrame):
def __init__( # noqa: C901
self, df: Any = None, schema: Any = None, metadata: Any = None
):
self._lock = RLock()
self._lock = SerializableRLock()
if isinstance(df, ps.DataFrame):
if schema is not None:
schema = to_schema(schema).assert_not_empty()
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ omit =
fugue_sql/_antlr/*

[flake8]
ignore = E24,E203,W503,C401,C408,A001,A003,W504,C407,C405
ignore = E24,E203,W503,C401,C408,A001,A003,W504,C407,C405,B023
max-line-length = 88
format = pylint
exclude = .svc,CVS,.bzr,.hg,.git,__pycache__,venv,tests/*,docs/*
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_version() -> str:
keywords="distributed spark dask sql dsl domain specific language",
url="http://github.com/fugue-project/fugue",
install_requires=[
"triad>=0.6.1",
"triad>=0.6.4",
"adagio>=0.2.4",
"qpd==0.3.0.dev2",
"fugue-sql-antlr>=0.1.0",
Expand Down
1 change: 0 additions & 1 deletion tests/fugue/dataframe/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import concurrent.futures
import os
from threading import RLock

import numpy as np
import pyarrow as pa
Expand Down

0 comments on commit 52bd892

Please sign in to comment.