Skip to content

Commit

Permalink
Support kaggle sqlite (#8)
Browse files Browse the repository at this point in the history
* support kaggle sqlite

* update

* update

* update
  • Loading branch information
goodwanghan authored Nov 26, 2020
1 parent a6211fc commit bd2c531
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8]
python-version: [3.7]

steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Fugue for Kaggle users

## Release History

### 0.0.7

* Add Kaggle sqlite data handling logic

### 0.0.6

* Fix rendering issue
Expand Down
5 changes: 4 additions & 1 deletion fuggle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@
from fuggle_version import __version__

from fuggle._setup import setup
from fuggle.execution_engine import KaggleNativeExecutionEngine
from fuggle.execution_engine import (
KaggleNativeExecutionEngine,
KaggleSparkExecutionEngine,
)
from fuggle.outputters import Plot, PlotBar, PlotBarH, PlotLine
9 changes: 6 additions & 3 deletions fuggle/_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from pyspark.sql import SparkSession
from triad.utils.convert import get_caller_global_local_vars, to_instance

from fuggle.execution_engine import KaggleNativeExecutionEngine
from fuggle.execution_engine import (
KaggleNativeExecutionEngine,
KaggleSparkExecutionEngine,
)
import inspect


Expand All @@ -32,7 +35,7 @@ def make_engine(self, engine: Any) -> ExecutionEngine:
.config("fugue.spark.use_pandas_udf", True)
.getOrCreate()
)
return SparkExecutionEngine(spark_session)
return KaggleSparkExecutionEngine(spark_session)
return to_instance(engine, ExecutionEngine)

@property
Expand All @@ -43,7 +46,7 @@ def default_engine(self) -> ExecutionEngine:
ENGINE_FACTORY = EngineFactory("native")


HIGHLIGHT_JS = """
HIGHLIGHT_JS = r"""
require(["codemirror/lib/codemirror"]);
function set(str) {
Expand Down
21 changes: 21 additions & 0 deletions fuggle/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import re
from typing import Callable, Optional, Tuple
from triad.utils.assertion import assert_or_throw

SQLITE_TABLE_REGEX = re.compile(r"([0-9a-zA-Z\-_]+\.sqlite3?)\.([0-9a-zA-Z\-_]+)")


def transform_sqlite_sql(
sql: str, validate: Callable[[str], None]
) -> Tuple[str, Optional[str]]:
tables = {m.group(1) for m in SQLITE_TABLE_REGEX.finditer(sql)}
assert_or_throw(
len(tables) <= 1,
NotImplementedError("can't have multiple sources in one statement", tables),
)
if len(tables) > 0:
validate(list(tables)[0])
return (
SQLITE_TABLE_REGEX.sub(r"\2", sql),
None if len(tables) == 0 else list(tables)[0],
)
49 changes: 47 additions & 2 deletions fuggle/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
from typing import Any
import os
import sqlite3
from typing import Any, Optional

import pandas as pd
from fugue import (
DataFrame,
DataFrames,
ExecutionEngine,
NativeExecutionEngine,
PandasDataFrame,
SQLEngine,
SqliteEngine,
)
from fugue_spark.execution_engine import SparkExecutionEngine, SparkSQLEngine
from pyspark.sql import SparkSession
from qpd_pandas import run_sql_on_pandas
from triad.utils.assertion import assert_or_throw

from fuggle._utils import transform_sqlite_sql

DEFAULT_KAGGLE_SQLITE_PATH = ""


class QPDPandasEngine(SQLEngine):
Expand All @@ -25,8 +36,42 @@ def select(self, dfs: DataFrames, statement: str) -> DataFrame:
return PandasDataFrame(df)


class KaggleSQLEngineWrapper(SQLEngine):
def __init__(self, execution_engine: ExecutionEngine, engine: SQLEngine):
super().__init__(execution_engine)
self.engine = engine
self.database_path = execution_engine.conf.get(
"fugue.kaggle.sqlite.path", "/kaggle/input"
)

def select(self, dfs: DataFrames, statement: str) -> DataFrame:
sql, sqlite_file = transform_sqlite_sql(statement, self._validate_database)
if sqlite_file is None:
return self.engine.select(dfs, statement)
assert_or_throw(len(dfs) == 0, "sql to query sqlite can't have other tables")
with sqlite3.connect(
os.path.join(self.database_path, sqlite_file)
) as connection:
df = pd.read_sql_query(sql, connection)
return PandasDataFrame(df)

def _validate_database(self, name: str):
path = os.path.join(self.database_path, name)
assert_or_throw(self.execution_engine.fs.exists(path), FileNotFoundError(path))


class KaggleNativeExecutionEngine(NativeExecutionEngine):
def __init__(self, conf: Any = None, use_sqlite: bool = False):
super().__init__(conf)
if not use_sqlite:
self._default_sql_engine = QPDPandasEngine(self)
self._default_sql_engine = KaggleSQLEngineWrapper(
self, QPDPandasEngine(self)
)
else: # pragma: no cover
self._default_sql_engine = KaggleSQLEngineWrapper(self, SqliteEngine(self))


class KaggleSparkExecutionEngine(SparkExecutionEngine):
def __init__(self, spark_session: Optional[SparkSession] = None, conf: Any = None):
super().__init__(spark_session=spark_session, conf=conf)
self._default_sql_engine = KaggleSQLEngineWrapper(self, SparkSQLEngine(self))
2 changes: 1 addition & 1 deletion fuggle_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.6"
__version__ = "0.0.7"
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pylint
pytest
pytest-cov
pytest-mock
pytest-spark
sphinx
sphinx-rtd-theme

Expand Down
Binary file added tests/data/customer.sqlite
Binary file not shown.
78 changes: 74 additions & 4 deletions tests/test_execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os

import pytest
from fuggle import KaggleNativeExecutionEngine, KaggleSparkExecutionEngine
from fugue_sql import FugueSQLWorkflow
from fugue_test.builtin_suite import BuiltInTests
from fugue_test.execution_suite import ExecutionEngineTests
from fuggle import KaggleNativeExecutionEngine
from pyspark.sql import SparkSession


class KaggleNativeExecutionEngineTests(ExecutionEngineTests.Tests):
def make_engine(self):
e = KaggleNativeExecutionEngine(dict(test=True))
e = KaggleNativeExecutionEngine(conf={"test": True})
return e

def test_map_with_dict_col(self):
Expand All @@ -15,5 +20,70 @@ def test_map_with_dict_col(self):

class KaggleNativeExecutionEngineBuiltInTests(BuiltInTests.Tests):
def make_engine(self):
e = KaggleNativeExecutionEngine(dict(test=True))
return e
e = KaggleNativeExecutionEngine(
conf={
"test": True,
"fugue.kaggle.sqlite.path": os.path.join(os.getcwd(), "tests/data"),
}
)
return e

def dag(self) -> FugueSQLWorkflow:
return FugueSQLWorkflow(self.engine)

def test_sqlite(self):
with self.dag() as dag:
dag(
"""
SELECT COUNT(*) AS ct FROM customer.sqlite.customer
PRINT
"""
)


class KaggleSparkExecutionEngineTests(ExecutionEngineTests.Tests):
@pytest.fixture(autouse=True)
def init_session(self, spark_session):
self.spark_session = spark_session

def make_engine(self):
session = SparkSession.builder.getOrCreate()
e = KaggleSparkExecutionEngine(spark_session=session, conf={"test": True})
return e

def test_map_with_dict_col(self):
# TODO: add back
return

def test__join_outer_pandas_incompatible(self):
return


class KaggleSparkExecutionEngineBuiltInTests(BuiltInTests.Tests):
@pytest.fixture(autouse=True)
def init_session(self, spark_session):
self.spark_session = spark_session

def make_engine(self):
e = KaggleSparkExecutionEngine(
conf={
"test": True,
"fugue.kaggle.sqlite.path": os.path.join(os.getcwd(), "tests/data"),
}
)
return e

def dag(self) -> FugueSQLWorkflow:
return FugueSQLWorkflow(self.engine)

def test_sqlite(self):
with self.dag() as dag:
dag(
"""
SELECT COUNT(*) AS ct FROM customer.sqlite.customer
PRINT
"""
)

def test_repartition(self):
pass
30 changes: 30 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from fuggle._utils import transform_sqlite_sql
from pytest import raises
from triad.utils.assertion import assert_or_throw


def test_transform_sqlite_sql():
def should_not_call(s):
raise ValueError

assert ("", None) == transform_sqlite_sql("", should_not_call)
assert ("x", "a.sqlite") == transform_sqlite_sql(
"a.sqlite.x", lambda x: assert_or_throw("a.sqlite" == x)
)
assert ("b y ", "a.sqlite3") == transform_sqlite_sql(
"b a.sqlite3.y ", lambda x: assert_or_throw("a.sqlite3" == x)
)
assert ("SELECT * FROM p INNER JOIN z", "a.sqlite3") == transform_sqlite_sql(
"SELECT * FROM a.sqlite3.p INNER JOIN a.sqlite3.z",
lambda x: assert_or_throw("a.sqlite3" == x),
)
# multiple source is not allowed
with raises(NotImplementedError):
transform_sqlite_sql(
"SELECT * FROM a.sqlite.p INNER JOIN a.sqlite3.z", should_not_call
)
# validation failed
with raises(AssertionError):
transform_sqlite_sql(
"b a.sqlite3.y ", lambda x: assert_or_throw("a.sqlite" == x)
)

0 comments on commit bd2c531

Please sign in to comment.