From 4aa31651b1d8e7e65caaf11b25f0fd2527408137 Mon Sep 17 00:00:00 2001 From: Han Wang <goodwanghan@gmail.com> Date: Fri, 3 Sep 2021 10:55:25 -0700 Subject: [PATCH] 0.3.1 (#33) * update * update * update --- README.md | 5 +++++ fuggle/execution_engine.py | 16 +++++++++++++--- fuggle_version/__init__.py | 2 +- setup.py | 3 ++- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 78d5bf2..7b29cf2 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,11 @@ Fugue for Kaggle users ## Release History +### 0.3.2 + +* Use Fugue 0.6.2 +* Make spark config smarter + ### 0.3.1 * Use Fugue 0.6.0 diff --git a/fuggle/execution_engine.py b/fuggle/execution_engine.py index aa6d621..cbb485b 100644 --- a/fuggle/execution_engine.py +++ b/fuggle/execution_engine.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Iterable, Optional, Tuple import pandas as pd +import psutil from fugue import ( DataFrame, DataFrames, @@ -86,8 +87,8 @@ def __init__(self, spark_session: Optional[SparkSession] = None, conf: Any = Non configs = _process_confs( { "fugue.spark.use_pandas_udf": True, - "spark.driver.memory": "14g", - "spark.sql.shuffle.partitions": "16", + "spark.driver.memory": _get_optimal_mem(), + "spark.sql.shuffle.partitions": _get_optimal_partition(), "spark.sql.execution.arrow.pyspark.fallback.enabled": True, "spark.driver.extraJavaOptions": "-Dio.netty.tryReflectionSetAccessible=true", # noqa: E501 "spark.executor.extraJavaOptions": "-Dio.netty.tryReflectionSetAccessible=true", # noqa: E501 @@ -105,7 +106,7 @@ def __init__(self, spark_session: Optional[SparkSession] = None, conf: Any = Non class KaggleDaskExecutionEngine(DaskExecutionEngine): def __init__(self, conf: Any = None): configs = _process_confs( - {FUGUE_DASK_CONF_DATAFRAME_DEFAULT_PARTITIONS: 16}, + {FUGUE_DASK_CONF_DATAFRAME_DEFAULT_PARTITIONS: _get_optimal_partition()}, ParamDict(conf), ) super().__init__(conf=configs) @@ -173,3 +174,12 @@ def _process_conf(conf: Dict[str, Any]) -> Iterable[Tuple[str, Any]]: yield "fugue.rpc.server", "fugue.rpc.base.NativeRPCServer" else: yield k, v + + +def _get_optimal_mem(ratio: float = 0.8) -> str: + mem = psutil.virtual_memory() + return str(int(mem.total * ratio / 1024 / 1024)) + "m" + + +def _get_optimal_partition() -> int: + return psutil.cpu_count() * 4 diff --git a/fuggle_version/__init__.py b/fuggle_version/__init__.py index 260c070..f9aa3e1 100644 --- a/fuggle_version/__init__.py +++ b/fuggle_version/__init__.py @@ -1 +1 @@ -__version__ = "0.3.1" +__version__ = "0.3.2" diff --git a/setup.py b/setup.py index d5b9a9d..820cc57 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ keywords="fugue kaggle sql spark dask pandas", url="http://github.com/fugue-project/fuggle", install_requires=[ - "fugue[spark,dask,sql]==0.6.0", + "fugue[spark,dask,sql]==0.6.2", "tune[all]==0.0.6", "notebook", "kaggle", @@ -26,6 +26,7 @@ "qpd", "dask[dataframe]", "pandavro", + "psutil", ], extras_require={ "dasksql": ["dask-sql!=0.3.4"],