Skip to content

Commit

Permalink
Merge pull request #11 from dodopizza/sqlalchemy-1.4
Browse files Browse the repository at this point in the history
Migration to sqlalchemy 1.4.*
  • Loading branch information
Ceridan authored Sep 8, 2022
2 parents 0a2353a + 6f5c1dd commit f1b4dc8
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 52 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ cursor = engine.execute(query)
print([row for row in cursor])
```

### Using with Apache Superset

[Apache Superset](https://github.com/apache/superset) starting from [version 1.5](https://github.com/apache/superset/blob/1c1beb653a52c1fcc67a97e539314f138117c6ba/RELEASING/release-notes-1-5/README.md) also supports Kusto database engine spec. \
When connecting to a new data source you may choose a data source type either KustoSQL or KustoKQL depending on the dialect you want to use.

There are following connection string formats:

```shell
# KustoSQL
kustosql+https://<CLUSTER_URL>/<DATABASE>?azure_ad_client_id=<CLIENT_ID>&azure_ad_client_secret=<CLIENT_SECRET>&azure_ad_tenant_id=<TENANT_ID>&msi=False

# KustoKQL
kustokql+https://<CLUSTER_URL>/<DATABASE>?azure_ad_client_id=<CLIENT_ID>&azure_ad_client_secret=<CLIENT_SECRET>&azure_ad_tenant_id=<TENANT_ID>&msi=False
```
> Important notice on package version compatibility. \
> Apache Superset stable releases 1.5 and 2.0 dependent on `sqlalchemy==1.3.24`. If you want to use `sqlalchemy-kusto` with these versions you need to install version `1.*` of the package.
>
> Current `master` branch of the `apache/superset` dependent on `sqlalchemy==1.4.36`. If you want to use `sqlalchemy-kusto` with the latest unstable version of `apache/superset`, you need to install version `2.*` of the package.
## Contributing

Please see the [CONTRIBUTING.md](.github/CONTRIBUTING.md) for development setup and contributing process guidelines.
Expand Down
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

NAME = "sqlalchemy-kusto"
DESCRIPTION = "Azure Data Explorer (Kusto) dialect for SQLAlchemy"
VERSION = "1.1.0"
VERSION = "2.0.0"

REQUIREMENTS = [
"azure-kusto-data==2.1.1",
"sqlalchemy==1.3.24",
"azure-kusto-data==3.*",
"sqlalchemy==1.4.*",
"typing-extensions~=3.10",
]
EXTRAS = {
"dev": [
Expand Down
2 changes: 1 addition & 1 deletion sqlalchemy_kusto/dialect_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_view_names(self, connection: Connection, schema: Optional[str] = None, *
result = connection.execute(".show materialized-views | project Name")
return [row.Name for row in result]

def get_pk_constraint(self, conn: Connection, table_name: str, schema: Optional[str] = None, **kw):
def get_pk_constraint(self, connection: Connection, table_name: str, schema: Optional[str] = None, **kw):
return {"constrained_columns": [], "name": None}

def get_foreign_keys(self, connection, table_name, schema=None, **kwargs):
Expand Down
55 changes: 32 additions & 23 deletions sqlalchemy_kusto/dialect_kql.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,24 @@ class KustoKqlCompiler(compiler.SQLCompiler):

def visit_select(
self,
select: selectable.Select,
select_stmt: selectable.Select,
asfrom=False,
parens=True,
insert_into=True,
fromhints=None,
compound_index: int = 0,
nested_join_translation=False,
select_wraps_for=None,
lateral=False,
from_linter=None,
**kwargs,
):
logger.debug("Incoming query: %s", select)
logger.debug("Incoming query: %s", select_stmt)

if len(select.froms) != 1:
if len(select_stmt.get_final_froms()) != 1:
raise NotSupportedError('Only single "select from" query is supported in kql compiler')

compiled_query_lines = []

from_object = select.froms[0]
from_object = select_stmt.get_final_froms()[0]
if hasattr(from_object, "element"):
query = self._get_most_inner_element(from_object.element)
(main, lets) = self._extract_let_statements(query.text)
Expand All @@ -67,23 +67,24 @@ def visit_select(
if from_object.schema is not None:
unquoted_schema = from_object.schema.strip("\"'")
compiled_query_lines.append(f'database("{unquoted_schema}").')
compiled_query_lines.append(from_object.name)
unquoted_name = from_object.name.strip("\"'")
compiled_query_lines.append(f'["{unquoted_name}"]')
else:
compiled_query_lines.append(self._convert_schema_in_statement(from_object.text))

if select._whereclause is not None:
where_clause = select._whereclause._compiler_dispatch(self, **kwargs)
if select_stmt._whereclause is not None:
where_clause = select_stmt._whereclause._compiler_dispatch(self, **kwargs)
if where_clause:
compiled_query_lines.append(f"| where {where_clause}")

projections = self._get_projection_or_summarize(select)
projections = self._get_projection_or_summarize(select_stmt)
if projections:
compiled_query_lines.append(projections)

if select._limit_clause is not None: # pylint: disable=protected-access
if select_stmt._limit_clause is not None: # pylint: disable=protected-access
kwargs["literal_execute"] = True
compiled_query_lines.append(
f"| take {self.process(select._limit_clause, **kwargs)}"
f"| take {self.process(select_stmt._limit_clause, **kwargs)}"
) # pylint: disable=protected-access

compiled_query_lines = list(filter(None, compiled_query_lines))
Expand Down Expand Up @@ -155,26 +156,34 @@ def _convert_schema_in_statement(query: str) -> str:
Converts schema in the query from SQL notation to KQL notation. Returns converted query.
Examples:
- schema.table -> database("schema").table
- schema."table.name" -> database("schema")."table.name"
- "schema.name".table -> database("schema.name").table
- "schema.name"."table.name" -> database("schema.name")."table.name"
- "schema name"."table name" -> database("schema name")."table name"
- "table.name" -> "table.name"
- MyTable -> MyTable
- schema.table -> database("schema").["table"]
- schema."table.name" -> database("schema").{"table.name"]
- "schema.name".table -> database("schema.name").["table"]
- "schema.name"."table.name" -> database("schema.name").["table.name"]
- "schema name"."table name" -> database("schema name").["table name"]
- "table.name" -> ["table.name"]
- MyTable -> ["MyTable"]
- ["schema"].["table"] -> database("schema").["table"]
- ["table"] -> ["table"]
"""

pattern = r"^([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")?\.?([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")"
pattern = r"^\[?([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")?\]?\.?\[?([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")\]?"
match = re.search(pattern, query)

if not match or not match.group(1):
if not match:
return query

original = match.group(0)
unquoted_table = match.group(2).strip("\"'")

if not match.group(1):
return query.replace(original, f'["{unquoted_table}"]', 1)

unquoted_schema = match.group(1).strip("\"'")
return query.replace(query, f'database("{unquoted_schema}").{match.group(2)}', 1)
return query.replace(original, f'database("{unquoted_schema}").["{unquoted_table}"]', 1)


class KustoKqlHttpsDialect(KustoBaseDialect):
name = "kustokql"
statement_compiler = KustoKqlCompiler
preparer = KustoKqlIdentifierPreparer
supports_statement_cache = True
3 changes: 3 additions & 0 deletions sqlalchemy_kusto/dialect_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ def delete_extra_from_clause(self, update_stmt, from_table, extra_froms, from_hi
class KustoSqlHttpsDialect(KustoBaseDialect):
name = "kustosql"
statement_compiler = KustoSqlCompiler
# For some reason supports_statement_cache doesn't work when defined in the KustoBaseDialect.
# Need to investigate why it happens.
supports_statement_cache = True
52 changes: 27 additions & 25 deletions tests/unit/test_dialect_kql.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def test_compiler_with_projection():

query_compiled = str(query.compile(engine)).replace("\n", "")
query_expected = (
"let virtual_table = (logs | take 10);"
'let virtual_table = (["logs"] | take 10);'
"virtual_table"
"| project id = Id, tId = TypeId, Type"
"| take %(param_1)s"
"| take __[POSTCOMPILE_param_1]"
)

assert query_compiled == query_expected
Expand All @@ -42,15 +42,15 @@ def test_compiler_with_star():
query = query.limit(10)

query_compiled = str(query.compile(engine)).replace("\n", "")
query_expected = "let virtual_table = (logs | take 10);" "virtual_table" "| take %(param_1)s"
query_expected = 'let virtual_table = (["logs"] | take 10);' "virtual_table" "| take __[POSTCOMPILE_param_1]"

assert query_compiled == query_expected


def test_select_from_text():
query = select([column("Field1"), column("Field2")]).select_from(text("logs")).limit(100)
query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "")
query_expected = "logs" "| project Field1, Field2" "| take 100"
query_expected = '["logs"]' "| project Field1, Field2" "| take 100"

assert query_compiled == query_expected

Expand All @@ -67,7 +67,7 @@ def test_use_table():
query = stream.select().limit(5)
query_compiled = str(query.compile(engine)).replace("\n", "")

query_expected = "logs" "| project Field1, Field2" "| take %(param_1)s"
query_expected = '["logs"]' "| project Field1, Field2" "| take __[POSTCOMPILE_param_1]"
assert query_compiled == query_expected


Expand All @@ -78,7 +78,7 @@ def test_limit():

query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "")

query_expected = "let inner_qry = (logs);" "inner_qry" "| take 5"
query_expected = 'let inner_qry = (["logs"]);' "inner_qry" "| take 5"

assert query_compiled == query_expected

Expand All @@ -98,7 +98,7 @@ def test_select_count():
query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "")

query_expected = (
"let inner_qry = (logs);"
'let inner_qry = (["logs"]);'
"inner_qry"
"| where Field1 > 1 and Field2 < 2"
"| summarize count = count()"
Expand All @@ -117,7 +117,7 @@ def test_select_with_let():
query_expected = (
"let x = 5;"
"let y = 3;"
"let inner_qry = (MyTable | where Field1 == x and Field2 == y);"
'let inner_qry = (["MyTable"] | where Field1 == x and Field2 == y);'
"inner_qry"
"| take 5"
)
Expand All @@ -140,9 +140,9 @@ def test_quotes():

# fmt: off
query_expected = (
"logs"
'["logs"]'
'| project ["Field1"], ["Field2"]'
"| take %(param_1)s"
"| take __[POSTCOMPILE_param_1]"
)
# fmt: on

Expand All @@ -152,13 +152,13 @@ def test_quotes():
@pytest.mark.parametrize(
"schema_name,table_name,expected_table_name",
[
("schema", "table", 'database("schema").table'),
("schema", '"table.name"', 'database("schema")."table.name"'),
('"schema.name"', "table", 'database("schema.name").table'),
('"schema.name"', '"table.name"', 'database("schema.name")."table.name"'),
('"schema name"', '"table name"', 'database("schema name")."table name"'),
(None, '"table.name"', '"table.name"'),
(None, "MyTable", "MyTable"),
("schema", "table", 'database("schema").["table"]'),
("schema", '"table.name"', 'database("schema").["table.name"]'),
('"schema.name"', "table", 'database("schema.name").["table"]'),
('"schema.name"', '"table.name"', 'database("schema.name").["table.name"]'),
('"schema name"', '"table name"', 'database("schema name").["table name"]'),
(None, '"table.name"', '["table.name"]'),
(None, "MyTable", '["MyTable"]'),
],
)
def test_schema_from_metadata(table_name: str, schema_name: str, expected_table_name: str):
Expand All @@ -171,20 +171,22 @@ def test_schema_from_metadata(table_name: str, schema_name: str, expected_table_

query_compiled = str(query.compile(engine)).replace("\n", "")

query_expected = f"{expected_table_name}| take %(param_1)s"
query_expected = f"{expected_table_name}| take __[POSTCOMPILE_param_1]"
assert query_compiled == query_expected


@pytest.mark.parametrize(
"query_table_name,expected_table_name",
[
("schema.table", 'database("schema").table'),
('schema."table.name"', 'database("schema")."table.name"'),
('"schema.name".table', 'database("schema.name").table'),
('"schema.name"."table.name"', 'database("schema.name")."table.name"'),
('"schema name"."table name"', 'database("schema name")."table name"'),
('"table.name"', '"table.name"'),
("MyTable", "MyTable"),
("schema.table", 'database("schema").["table"]'),
('schema."table.name"', 'database("schema").["table.name"]'),
('"schema.name".table', 'database("schema.name").["table"]'),
('"schema.name"."table.name"', 'database("schema.name").["table.name"]'),
('"schema name"."table name"', 'database("schema name").["table name"]'),
('"table.name"', '["table.name"]'),
("MyTable", '["MyTable"]'),
('["schema"].["table"]', 'database("schema").["table"]'),
('["table"]', '["table"]'),
],
)
def test_schema_from_query(query_table_name: str, expected_table_name: str):
Expand Down

0 comments on commit f1b4dc8

Please sign in to comment.