From e60e4fba2028d2943d11d0207572dd6e55b12d98 Mon Sep 17 00:00:00 2001 From: Mikhail Kumachev Date: Wed, 5 Jan 2022 00:00:03 +0300 Subject: [PATCH 1/4] feat: [KQL] Add support for "schema" (used in cross-database joins) --- sqlalchemy_kusto/dialect_kql.py | 22 ++++++++++++++++-- tests/unit/test_dialect_kql.py | 41 +++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/sqlalchemy_kusto/dialect_kql.py b/sqlalchemy_kusto/dialect_kql.py index 94b11d9..18a02bd 100644 --- a/sqlalchemy_kusto/dialect_kql.py +++ b/sqlalchemy_kusto/dialect_kql.py @@ -60,12 +60,14 @@ def visit_select( query = self._get_most_inner_element(from_object.element) (main, lets) = self._extract_let_statements(query.text) compiled_query_lines.extend(lets) - compiled_query_lines.append(f"let {from_object.name} = ({main});") + compiled_query_lines.append(f"let {from_object.name} = ({self._convert_schema_in_statement(main)});") compiled_query_lines.append(from_object.name) elif hasattr(from_object, "name"): + if from_object.schema is not None: + compiled_query_lines.append(f'database("{from_object.schema}").') compiled_query_lines.append(from_object.name) else: - compiled_query_lines.append(from_object.text) + compiled_query_lines.append(self._convert_schema_in_statement(from_object.text)) if select._whereclause is not None: where_clause = select._whereclause._compiler_dispatch(self, **kwargs) @@ -145,6 +147,22 @@ def _build_column_projection(column_name: str, column_alias: str = None): """Generates column alias semantic for project statement""" return f"{column_alias} = {column_name}" if column_alias else column_name + @staticmethod + def _convert_schema_in_statement(query: str) -> str: + """ + Converts schema in the query from SQL notation to KQL notation. Returns converted query. + + Example: + schema.table_name -> database("schema").table_name + """ + query_parts = query.split() + + if "." in query_parts[0]: + schema, table_name = query_parts[0].split(".") + query_parts[0] = f'database("{schema}").{table_name}' + + return " ".join(query_parts) + class KustoKqlHttpsDialect(KustoBaseDialect): name = "kustokql" diff --git a/tests/unit/test_dialect_kql.py b/tests/unit/test_dialect_kql.py index 99b4c8c..f54dfce 100644 --- a/tests/unit/test_dialect_kql.py +++ b/tests/unit/test_dialect_kql.py @@ -146,3 +146,44 @@ def test_quotes(): # fmt: on assert query_compiled == query_expected + + +def test_schema_from_metadata(): + quote = engine.dialect.identifier_preparer.quote + metadata = MetaData(schema="mydb") + stream = Table( + "logs", + metadata, + Column(quote("Field1"), String), + Column(quote("Field2"), String), + ) + query = stream.select().limit(5) + + query_compiled = str(query.compile(engine)).replace("\n", "") + + # fmt: off + query_expected = ( + 'database("mydb").logs' + '| project ["Field1"], ["Field2"]' + "| take %(param_1)s" + ) + # fmt: on + + assert query_compiled == query_expected + + +def test_schema_from_query(): + kql_query = "let x = 5; let y = 3; mydb.MyTable | where Field1 == x and Field2 == y" + query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")).limit(5) + + query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") + + query_expected = ( + "let x = 5;" + "let y = 3;" + 'let inner_qry = (database("mydb").MyTable | where Field1 == x and Field2 == y);' + "inner_qry" + "| take 5" + ) + + assert query_compiled == query_expected From 09444c9c5ac8cb2a91a83419caabcbd91dd5a86e Mon Sep 17 00:00:00 2001 From: Mikhail Kumachev Date: Mon, 10 Jan 2022 16:25:01 +0300 Subject: [PATCH 2/4] feat: [KQL] Handle situation when query contains schema and dot-separated table name --- sqlalchemy_kusto/dialect_kql.py | 12 +++++++++--- tests/unit/test_dialect_kql.py | 34 ++++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/sqlalchemy_kusto/dialect_kql.py b/sqlalchemy_kusto/dialect_kql.py index 18a02bd..128f93e 100644 --- a/sqlalchemy_kusto/dialect_kql.py +++ b/sqlalchemy_kusto/dialect_kql.py @@ -157,9 +157,15 @@ def _convert_schema_in_statement(query: str) -> str: """ query_parts = query.split() - if "." in query_parts[0]: - schema, table_name = query_parts[0].split(".") - query_parts[0] = f'database("{schema}").{table_name}' + # We handle the following possible situations here: + # - mydb.MyTable (schema with table name); + # - mydb."my.table" (schema with dot-separated table name, it should be quoted by Kusto syntax rules); + # - "my.table" (dot-separated table name without schema) + if "." in query_parts[0] and not query_parts[0].startswith('"'): + table_name_parts = query_parts[0].split(".") + schema = table_name_parts[0] + table_name_parts[0] = f'database("{schema}")' + query_parts[0] = ".".join(table_name_parts) return " ".join(query_parts) diff --git a/tests/unit/test_dialect_kql.py b/tests/unit/test_dialect_kql.py index f54dfce..b88f7b1 100644 --- a/tests/unit/test_dialect_kql.py +++ b/tests/unit/test_dialect_kql.py @@ -173,17 +173,33 @@ def test_schema_from_metadata(): def test_schema_from_query(): - kql_query = "let x = 5; let y = 3; mydb.MyTable | where Field1 == x and Field2 == y" - query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")).limit(5) + kql_query = "mydb.MyTable | limit 100" + query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")) query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") - query_expected = ( - "let x = 5;" - "let y = 3;" - 'let inner_qry = (database("mydb").MyTable | where Field1 == x and Field2 == y);' - "inner_qry" - "| take 5" - ) + query_expected = 'let inner_qry = (database("mydb").MyTable | limit 100);' "inner_qry" + + assert query_compiled == query_expected + + +def test_schema_with_table_name_contains_dots(): + kql_query = 'mydb."my.table" | limit 100' + query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")) + + query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") + + query_expected = 'let inner_qry = (database("mydb")."my.table" | limit 100);' "inner_qry" + + assert query_compiled == query_expected + + +def test_with_table_name_contains_dots_without_schema(): + kql_query = '"my.table" | limit 100' + query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")) + + query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") + + query_expected = 'let inner_qry = ("my.table" | limit 100);' "inner_qry" assert query_compiled == query_expected From 453d67ab7189254b8b83e2de925af9cc224262ec Mon Sep 17 00:00:00 2001 From: Mikhail Kumachev Date: Mon, 10 Jan 2022 16:52:51 +0100 Subject: [PATCH 3/4] feat: [KQL] Handle all possible combinations with schema and table using RegEx --- sqlalchemy_kusto/dialect_kql.py | 36 ++++++++-------- tests/unit/test_dialect_kql.py | 73 +++++++++++++++------------------ 2 files changed, 52 insertions(+), 57 deletions(-) diff --git a/sqlalchemy_kusto/dialect_kql.py b/sqlalchemy_kusto/dialect_kql.py index 128f93e..7d80088 100644 --- a/sqlalchemy_kusto/dialect_kql.py +++ b/sqlalchemy_kusto/dialect_kql.py @@ -1,4 +1,5 @@ import logging +import re from typing import List, Optional, Tuple from sqlalchemy import Column, exc @@ -64,7 +65,8 @@ def visit_select( compiled_query_lines.append(from_object.name) elif hasattr(from_object, "name"): if from_object.schema is not None: - compiled_query_lines.append(f'database("{from_object.schema}").') + unquoted_schema = from_object.schema.strip('"') + compiled_query_lines.append(f'database("{unquoted_schema}").') compiled_query_lines.append(from_object.name) else: compiled_query_lines.append(self._convert_schema_in_statement(from_object.text)) @@ -152,22 +154,24 @@ def _convert_schema_in_statement(query: str) -> str: """ Converts schema in the query from SQL notation to KQL notation. Returns converted query. - Example: - schema.table_name -> database("schema").table_name + Examples: + - schema.table -> database("schema").table + - schema."table.name" -> database("schema")."table.name" + - "schema.name".table -> database("schema.name").table + - "schema.name"."table.name" -> database("schema.name")."table.name" + - "schema name"."table name" -> database("schema name")."table name" + - "table.name" -> "table.name" + - MyTable -> MyTable """ - query_parts = query.split() - - # We handle the following possible situations here: - # - mydb.MyTable (schema with table name); - # - mydb."my.table" (schema with dot-separated table name, it should be quoted by Kusto syntax rules); - # - "my.table" (dot-separated table name without schema) - if "." in query_parts[0] and not query_parts[0].startswith('"'): - table_name_parts = query_parts[0].split(".") - schema = table_name_parts[0] - table_name_parts[0] = f'database("{schema}")' - query_parts[0] = ".".join(table_name_parts) - - return " ".join(query_parts) + + pattern = r"^([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")?\.?([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")" + match = re.search(pattern, query) + + if not match or not match.group(1): + return query + + unquoted_schema = match.group(1).strip('"') + return query.replace(query, f'database("{unquoted_schema}").{match.group(2)}', 1) class KustoKqlHttpsDialect(KustoBaseDialect): diff --git a/tests/unit/test_dialect_kql.py b/tests/unit/test_dialect_kql.py index b88f7b1..4ae6f5b 100644 --- a/tests/unit/test_dialect_kql.py +++ b/tests/unit/test_dialect_kql.py @@ -1,3 +1,4 @@ +import pytest import sqlalchemy as sa from sqlalchemy import Column, MetaData, String, Table, column, create_engine, literal_column, select, text from sqlalchemy.sql.selectable import TextAsFrom @@ -148,58 +149,48 @@ def test_quotes(): assert query_compiled == query_expected -def test_schema_from_metadata(): - quote = engine.dialect.identifier_preparer.quote - metadata = MetaData(schema="mydb") +@pytest.mark.parametrize( + "schema_name,table_name,expected_table_name", + [ + ("schema", "table", 'database("schema").table'), + ("schema", '"table.name"', 'database("schema")."table.name"'), + ('"schema.name"', "table", 'database("schema.name").table'), + ('"schema.name"', '"table.name"', 'database("schema.name")."table.name"'), + ('"schema name"', '"table name"', 'database("schema name")."table name"'), + (None, '"table.name"', '"table.name"'), + (None, "MyTable", "MyTable"), + ], +) +def test_schema_from_metadata(table_name: str, schema_name: str, expected_table_name: str): + metadata = MetaData(schema=schema_name) if schema_name else MetaData() stream = Table( - "logs", + table_name, metadata, - Column(quote("Field1"), String), - Column(quote("Field2"), String), ) query = stream.select().limit(5) query_compiled = str(query.compile(engine)).replace("\n", "") - # fmt: off - query_expected = ( - 'database("mydb").logs' - '| project ["Field1"], ["Field2"]' - "| take %(param_1)s" - ) - # fmt: on - - assert query_compiled == query_expected - - -def test_schema_from_query(): - kql_query = "mydb.MyTable | limit 100" - query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")) - - query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") - - query_expected = 'let inner_qry = (database("mydb").MyTable | limit 100);' "inner_qry" - + query_expected = f"{expected_table_name}| take %(param_1)s" assert query_compiled == query_expected -def test_schema_with_table_name_contains_dots(): - kql_query = 'mydb."my.table" | limit 100' - query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")) +@pytest.mark.parametrize( + "query_table_name,expected_table_name", + [ + ("schema.table", 'database("schema").table'), + ('schema."table.name"', 'database("schema")."table.name"'), + ('"schema.name".table', 'database("schema.name").table'), + ('"schema.name"."table.name"', 'database("schema.name")."table.name"'), + ('"schema name"."table name"', 'database("schema name")."table name"'), + ('"table.name"', '"table.name"'), + ("MyTable", "MyTable"), + ], +) +def test_schema_from_query(query_table_name: str, expected_table_name: str): + query = select("*").select_from(TextAsFrom(text(query_table_name), ["*"]).alias("inner_qry")).limit(5) query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") - query_expected = 'let inner_qry = (database("mydb")."my.table" | limit 100);' "inner_qry" - - assert query_compiled == query_expected - - -def test_with_table_name_contains_dots_without_schema(): - kql_query = '"my.table" | limit 100' - query = select("*").select_from(TextAsFrom(text(kql_query), ["*"]).alias("inner_qry")) - - query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") - - query_expected = 'let inner_qry = ("my.table" | limit 100);' "inner_qry" - + query_expected = f"let inner_qry = ({expected_table_name});inner_qry| take 5" assert query_compiled == query_expected From cd00a0b6263470a95dc9da70e6d849a86318d44c Mon Sep 17 00:00:00 2001 From: Mikhail Kumachev Date: Mon, 10 Jan 2022 16:59:10 +0100 Subject: [PATCH 4/4] fix: Strip both types of quotes --- sqlalchemy_kusto/dialect_kql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlalchemy_kusto/dialect_kql.py b/sqlalchemy_kusto/dialect_kql.py index 7d80088..876f917 100644 --- a/sqlalchemy_kusto/dialect_kql.py +++ b/sqlalchemy_kusto/dialect_kql.py @@ -65,7 +65,7 @@ def visit_select( compiled_query_lines.append(from_object.name) elif hasattr(from_object, "name"): if from_object.schema is not None: - unquoted_schema = from_object.schema.strip('"') + unquoted_schema = from_object.schema.strip("\"'") compiled_query_lines.append(f'database("{unquoted_schema}").') compiled_query_lines.append(from_object.name) else: @@ -170,7 +170,7 @@ def _convert_schema_in_statement(query: str) -> str: if not match or not match.group(1): return query - unquoted_schema = match.group(1).strip('"') + unquoted_schema = match.group(1).strip("\"'") return query.replace(query, f'database("{unquoted_schema}").{match.group(2)}', 1)