Skip to content

Commit

Permalink
Initial version (#3)
Browse files Browse the repository at this point in the history
* add dependency

* update:

* update

* update

* update
  • Loading branch information
goodwanghan authored Nov 16, 2020
1 parent 364ef7e commit 369a9e1
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8]
python-version: [3.7, 3.8]

steps:
- uses: actions/checkout@v2
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ devenv:
dev:
pip3 install -r requirements.txt

jupyter:
jupyter notebook --port=8888 --ip=0.0.0.0 --no-browser --allow-root --NotebookApp.token='' --NotebookApp.password=''

docs:
rm -rf docs/api
rm -rf docs/build
Expand Down
3 changes: 3 additions & 0 deletions fuggle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# flake8: noqa
from fuggle_version import __version__

from fuggle._setup import setup
from fuggle.execution_engine import KaggleNativeExecutionEngine
111 changes: 111 additions & 0 deletions fuggle/_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# flake8: noqa
import html
from datetime import datetime
from typing import Any, List

import pandas as pd
from fugue import ExecutionEngine, Schema
from fugue.extensions._builtins.outputters import Show
from fugue_spark import SparkExecutionEngine
from fugue_sql import FugueSQLWorkflow
from IPython.core.magic import register_cell_magic
from IPython.display import HTML, Javascript, display
from pyspark.sql import SparkSession
from triad.utils.convert import to_instance

from fuggle.execution_engine import KaggleNativeExecutionEngine


class EngineFactory(object):
def __init__(self, default_engine: Any = None):
self._default_engine = self.make_engine(default_engine)

def make_engine(self, engine: Any) -> ExecutionEngine:
if engine is None or (isinstance(engine, str) and engine in ["native", ""]):
return KaggleNativeExecutionEngine(use_sqlite=False)
if isinstance(engine, str) and engine == "spark":
spark_session = (
SparkSession.builder.master("local[*]")
.config("spark.driver.memory", "12g")
.config("fugue.spark.use_pandas_udf", True)
.getOrCreate()
)
return SparkExecutionEngine(spark_session)
return to_instance(engine, ExecutionEngine)

@property
def default_engine(self) -> ExecutionEngine:
return self._default_engine


ENGINE_FACTORY = EngineFactory("native")


HIGHLIGHT_JS = """
require(["codemirror/lib/codemirror"]);
function set(str) {
var obj = {}, words = str.split(" ");
for (var i = 0; i < words.length; ++i) obj[words[i]] = true;
return obj;
}
var fugue_keywords = "fill hash rand even presort persist broadcast params process output outtransform rowcount concurrency prepartition zip print title save append parquet csv json single checkpoint weak strong deterministic yield";
CodeMirror.defineMIME("text/x-mssql", {
name: "sql",
keywords: set(fugue_keywords + " add after all alter analyze and anti archive array as asc at between bucket buckets by cache cascade case cast change clear cluster clustered codegen collection column columns comment commit compact compactions compute concatenate cost create cross cube current current_date current_timestamp database databases datata dbproperties defined delete delimited deny desc describe dfs directories distinct distribute drop else end escaped except exchange exists explain export extended external false fields fileformat first following for format formatted from full function functions global grant group grouping having if ignore import in index indexes inner inpath inputformat insert intersect interval into is items join keys last lateral lazy left like limit lines list load local location lock locks logical macro map minus msck natural no not null nulls of on optimize option options or order out outer outputformat over overwrite partition partitioned partitions percent preceding principals purge range recordreader recordwriter recover reduce refresh regexp rename repair replace reset restrict revoke right rlike role roles rollback rollup row rows schema schemas select semi separated serde serdeproperties set sets show skewed sort sorted start statistics stored stratify struct table tables tablesample tblproperties temp temporary terminated then to touch transaction transactions transform true truncate unarchive unbounded uncache union unlock unset use using values view when where window with"),
builtin: set("tinyint smallint int bigint boolean float double string binary timestamp decimal array map struct uniontype delimited serde sequencefile textfile rcfile inputformat outputformat"),
atoms: set("false true null unknown"),
operatorChars: /^[*\/+\-%<>!=&|^\/#@?~]/,
dateSQL: set("datetime date time timestamp"),
support: set("ODBCdotTable doubleQuote binaryNumber hexNumber commentSlashSlash commentHash")
});
require(['notebook/js/codecell'], function(codecell) {
codecell.CodeCell.options_default.highlight_modes['magic_text/x-mssql'] = {'reg':[/%%fsql/]} ;
Jupyter.notebook.events.one('kernel_ready.Kernel', function(){
Jupyter.notebook.get_cells().map(function(cell){
if (cell.cell_type == 'code'){ cell.auto_highlight(); } }) ;
});
});
"""


def register_magic(default_engine: Any) -> None:
engine = ENGINE_FACTORY.make_engine(default_engine)
display(HTML(f"<strong>{engine} is set as backend<strong>"))

@register_cell_magic
def fsql(line: Any, cell: Any) -> None: # type: ignore
start = datetime.now()
try:
dag = FugueSQLWorkflow()
dag(cell)
dag.run(engine if line == "" else ENGINE_FACTORY.make_engine(line))
finally:
sec = (datetime.now() - start).total_seconds()
display(HTML(f"<small><u>{sec} seconds</u></small>"))


def set_print_hook() -> None:
def pprint(
schema: Schema, head_rows: List[List[Any]], title: Any, rows: int, count: int
):
if title is not None:
display(HTML(f"<h3>{html.escape(title)}</h3>"))
pdf = pd.DataFrame(head_rows, columns=list(schema.names))
display(pdf)
if count >= 0:
display(HTML(f"<strong>total count: {count}</strong>"))
display(HTML(f"<small>schema: {schema}</small>"))

Show.set_hook(pprint)


def setup(default_engine: Any = None) -> Any:
register_magic(default_engine)
set_print_hook()
return Javascript(HIGHLIGHT_JS)
32 changes: 32 additions & 0 deletions fuggle/execution_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Any

from fugue import (
DataFrame,
DataFrames,
ExecutionEngine,
NativeExecutionEngine,
PandasDataFrame,
SQLEngine,
)
from qpd_pandas import run_sql_on_pandas


class QPDPandasEngine(SQLEngine):
"""QPD execution implementation.
:param execution_engine: the execution engine this sql engine will run on
"""

def __init__(self, execution_engine: ExecutionEngine):
super().__init__(execution_engine)

def select(self, dfs: DataFrames, statement: str) -> DataFrame:
pd_dfs = {k: self.execution_engine.to_df(v).as_pandas() for k, v in dfs.items()}
df = run_sql_on_pandas(statement, pd_dfs)
return PandasDataFrame(df)


class KaggleNativeExecutionEngine(NativeExecutionEngine):
def __init__(self, conf: Any = None, use_sqlite: bool = False):
super().__init__(conf)
if not use_sqlite:
self._default_sql_engine = QPDPandasEngine(self)
126 changes: 126 additions & 0 deletions fuggle_examples/setup.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"sys.path.insert(0,\"/root/workspace/fuggle\")\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from fuggle import setup\n",
"from fugue import NativeExecutionEngine"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": "<strong>NativeExecutionEngine is set as backend<strong>",
"text/plain": "<IPython.core.display.HTML object>"
},
"metadata": {
"transient": {}
},
"output_type": "display_data"
},
{
"data": {
"application/javascript": "\nrequire([\"codemirror/lib/codemirror\"]);\n\nfunction set(str) {\n var obj = {}, words = str.split(\" \");\n for (var i = 0; i < words.length; ++i) obj[words[i]] = true;\n return obj;\n }\n\nvar fugue_keywords = \"fill hash rand even presort persist broadcast params process output outtransform rowcount concurrency prepartition zip print title save append parquet csv json single checkpoint weak strong deterministic yield\";\n\nCodeMirror.defineMIME(\"text/x-mssql\", {\n name: \"sql\",\n keywords: set(fugue_keywords + \" add after all alter analyze and anti archive array as asc at between bucket buckets by cache cascade case cast change clear cluster clustered codegen collection column columns comment commit compact compactions compute concatenate cost create cross cube current current_date current_timestamp database databases datata dbproperties defined delete delimited deny desc describe dfs directories distinct distribute drop else end escaped except exchange exists explain export extended external false fields fileformat first following for format formatted from full function functions global grant group grouping having if ignore import in index indexes inner inpath inputformat insert intersect interval into is items join keys last lateral lazy left like limit lines list load local location lock locks logical macro map minus msck natural no not null nulls of on optimize option options or order out outer outputformat over overwrite partition partitioned partitions percent preceding principals purge range recordreader recordwriter recover reduce refresh regexp rename repair replace reset restrict revoke right rlike role roles rollback rollup row rows schema schemas select semi separated serde serdeproperties set sets show skewed sort sorted start statistics stored stratify struct table tables tablesample tblproperties temp temporary terminated then to touch transaction transactions transform true truncate unarchive unbounded uncache union unlock unset use using values view when where window with\"),\n builtin: set(\"tinyint smallint int bigint boolean float double string binary timestamp decimal array map struct uniontype delimited serde sequencefile textfile rcfile inputformat outputformat\"),\n atoms: set(\"false true null unknown\"),\n operatorChars: /^[*\\/+\\-%<>!=&|^\\/#@?~]/,\n dateSQL: set(\"datetime date time timestamp\"),\n support: set(\"ODBCdotTable doubleQuote binaryNumber hexNumber commentSlashSlash commentHash\")\n });\n\n\nrequire(['notebook/js/codecell'], function(codecell) {\n codecell.CodeCell.options_default.highlight_modes['magic_text/x-mssql'] = {'reg':[/%%fsql/]} ;\n Jupyter.notebook.events.one('kernel_ready.Kernel', function(){\n Jupyter.notebook.get_cells().map(function(cell){\n if (cell.cell_type == 'code'){ cell.auto_highlight(); } }) ;\n });\n });\n\n",
"text/plain": "<IPython.core.display.Javascript object>"
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"setup()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>a</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td>0</td>\n </tr>\n </tbody>\n</table>\n</div>",
"text/plain": " a\n0 0"
},
"metadata": {
"transient": {}
},
"output_type": "display_data"
},
{
"data": {
"text/html": "<strong>total count: 1</strong>",
"text/plain": "<IPython.core.display.HTML object>"
},
"metadata": {
"transient": {}
},
"output_type": "display_data"
},
{
"data": {
"text/html": "<small>schema: a:int</small>",
"text/plain": "<IPython.core.display.HTML object>"
},
"metadata": {
"transient": {}
},
"output_type": "display_data"
},
{
"data": {
"text/html": "<small><u>0.129279 seconds</u></small>",
"text/plain": "<IPython.core.display.HTML object>"
},
"metadata": {
"transient": {}
},
"output_type": "display_data"
}
],
"source": [
"%%fsql\n",
"CREATE [[0]] SCHEMA a:int\n",
"PRINT ROWCOUNT"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
2 changes: 1 addition & 1 deletion fuggle_version/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.1"
__version__ = "0.0.2"
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ addopts =
--cov-report=term-missing:skip-covered
-vvv

[coverage:run]
omit =
fuggle/_setup.py

[flake8]
ignore = E24,E203,W503
max-line-length = 88
Expand Down
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
author_email="[email protected]",
keywords="fugue kaggle",
url="http://github.com/fugue-project/fuggle",
install_requires=["fugue[spark]>=0.4.7"],
install_requires=["fugue[spark]>=0.4.7", "notebook", "kaggle"],
extras_require={},
classifiers=[
# "3 - Alpha", "4 - Beta" or "5 - Production/Stable"
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3 :: Only",
],
python_requires=">=3.6",
python_requires=">=3.7",
)
2 changes: 0 additions & 2 deletions tests/test_dummy.py

This file was deleted.

19 changes: 19 additions & 0 deletions tests/test_execution_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from fugue_test.builtin_suite import BuiltInTests
from fugue_test.execution_suite import ExecutionEngineTests
from fuggle import KaggleNativeExecutionEngine


class KaggleNativeExecutionEngineTests(ExecutionEngineTests.Tests):
def make_engine(self):
e = KaggleNativeExecutionEngine(dict(test=True))
return e

def test_map_with_dict_col(self):
# TODO: add back
return


class KaggleNativeExecutionEngineBuiltInTests(BuiltInTests.Tests):
def make_engine(self):
e = KaggleNativeExecutionEngine(dict(test=True))
return e

0 comments on commit 369a9e1

Please sign in to comment.