diff --git a/README.md b/README.md index e7d7b87..c9842c9 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,25 @@ cursor = engine.execute(query) print([row for row in cursor]) ``` +### Using with Apache Superset + +[Apache Superset](https://github.com/apache/superset) starting from [version 1.5](https://github.com/apache/superset/blob/1c1beb653a52c1fcc67a97e539314f138117c6ba/RELEASING/release-notes-1-5/README.md) also supports Kusto database engine spec. \ +When connecting to a new data source you may choose a data source type either KustoSQL or KustoKQL depending on the dialect you want to use. + +There are following connection string formats: + +```shell +# KustoSQL +kustosql+https:///?azure_ad_client_id=&azure_ad_client_secret=&azure_ad_tenant_id=&msi=False + +# KustoKQL +kustokql+https:///?azure_ad_client_id=&azure_ad_client_secret=&azure_ad_tenant_id=&msi=False +``` +> Important notice on package version compatibility. \ +> Apache Superset stable releases 1.5 and 2.0 dependent on `sqlalchemy==1.3.24`. If you want to use `sqlalchemy-kusto` with these versions you need to install version `1.*` of the package. +> +> Current `master` branch of the `apache/superset` dependent on `sqlalchemy==1.4.36`. If you want to use `sqlalchemy-kusto` with the latest unstable version of `apache/superset`, you need to install version `2.*` of the package. + ## Contributing Please see the [CONTRIBUTING.md](.github/CONTRIBUTING.md) for development setup and contributing process guidelines. diff --git a/setup.py b/setup.py index 7cdba68..7b4b9ca 100644 --- a/setup.py +++ b/setup.py @@ -2,11 +2,12 @@ NAME = "sqlalchemy-kusto" DESCRIPTION = "Azure Data Explorer (Kusto) dialect for SQLAlchemy" -VERSION = "1.1.0" +VERSION = "2.0.0" REQUIREMENTS = [ - "azure-kusto-data==2.1.1", - "sqlalchemy==1.3.24", + "azure-kusto-data==3.*", + "sqlalchemy==1.4.*", + "typing-extensions~=3.10", ] EXTRAS = { "dev": [ diff --git a/sqlalchemy_kusto/dialect_base.py b/sqlalchemy_kusto/dialect_base.py index 95e8def..335d602 100644 --- a/sqlalchemy_kusto/dialect_base.py +++ b/sqlalchemy_kusto/dialect_base.py @@ -124,7 +124,7 @@ def get_view_names(self, connection: Connection, schema: Optional[str] = None, * result = connection.execute(".show materialized-views | project Name") return [row.Name for row in result] - def get_pk_constraint(self, conn: Connection, table_name: str, schema: Optional[str] = None, **kw): + def get_pk_constraint(self, connection: Connection, table_name: str, schema: Optional[str] = None, **kw): return {"constrained_columns": [], "name": None} def get_foreign_keys(self, connection, table_name, schema=None, **kwargs): diff --git a/sqlalchemy_kusto/dialect_kql.py b/sqlalchemy_kusto/dialect_kql.py index 876f917..c60b43a 100644 --- a/sqlalchemy_kusto/dialect_kql.py +++ b/sqlalchemy_kusto/dialect_kql.py @@ -39,24 +39,24 @@ class KustoKqlCompiler(compiler.SQLCompiler): def visit_select( self, - select: selectable.Select, + select_stmt: selectable.Select, asfrom=False, - parens=True, + insert_into=True, fromhints=None, compound_index: int = 0, - nested_join_translation=False, select_wraps_for=None, lateral=False, + from_linter=None, **kwargs, ): - logger.debug("Incoming query: %s", select) + logger.debug("Incoming query: %s", select_stmt) - if len(select.froms) != 1: + if len(select_stmt.get_final_froms()) != 1: raise NotSupportedError('Only single "select from" query is supported in kql compiler') compiled_query_lines = [] - from_object = select.froms[0] + from_object = select_stmt.get_final_froms()[0] if hasattr(from_object, "element"): query = self._get_most_inner_element(from_object.element) (main, lets) = self._extract_let_statements(query.text) @@ -67,23 +67,24 @@ def visit_select( if from_object.schema is not None: unquoted_schema = from_object.schema.strip("\"'") compiled_query_lines.append(f'database("{unquoted_schema}").') - compiled_query_lines.append(from_object.name) + unquoted_name = from_object.name.strip("\"'") + compiled_query_lines.append(f'["{unquoted_name}"]') else: compiled_query_lines.append(self._convert_schema_in_statement(from_object.text)) - if select._whereclause is not None: - where_clause = select._whereclause._compiler_dispatch(self, **kwargs) + if select_stmt._whereclause is not None: + where_clause = select_stmt._whereclause._compiler_dispatch(self, **kwargs) if where_clause: compiled_query_lines.append(f"| where {where_clause}") - projections = self._get_projection_or_summarize(select) + projections = self._get_projection_or_summarize(select_stmt) if projections: compiled_query_lines.append(projections) - if select._limit_clause is not None: # pylint: disable=protected-access + if select_stmt._limit_clause is not None: # pylint: disable=protected-access kwargs["literal_execute"] = True compiled_query_lines.append( - f"| take {self.process(select._limit_clause, **kwargs)}" + f"| take {self.process(select_stmt._limit_clause, **kwargs)}" ) # pylint: disable=protected-access compiled_query_lines = list(filter(None, compiled_query_lines)) @@ -155,26 +156,34 @@ def _convert_schema_in_statement(query: str) -> str: Converts schema in the query from SQL notation to KQL notation. Returns converted query. Examples: - - schema.table -> database("schema").table - - schema."table.name" -> database("schema")."table.name" - - "schema.name".table -> database("schema.name").table - - "schema.name"."table.name" -> database("schema.name")."table.name" - - "schema name"."table name" -> database("schema name")."table name" - - "table.name" -> "table.name" - - MyTable -> MyTable + - schema.table -> database("schema").["table"] + - schema."table.name" -> database("schema").{"table.name"] + - "schema.name".table -> database("schema.name").["table"] + - "schema.name"."table.name" -> database("schema.name").["table.name"] + - "schema name"."table name" -> database("schema name").["table name"] + - "table.name" -> ["table.name"] + - MyTable -> ["MyTable"] + - ["schema"].["table"] -> database("schema").["table"] + - ["table"] -> ["table"] """ - pattern = r"^([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")?\.?([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")" + pattern = r"^\[?([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")?\]?\.?\[?([a-zA-Z0-9]+\b|\"[a-zA-Z0-9 \-_.]+\")\]?" match = re.search(pattern, query) - - if not match or not match.group(1): + if not match: return query + original = match.group(0) + unquoted_table = match.group(2).strip("\"'") + + if not match.group(1): + return query.replace(original, f'["{unquoted_table}"]', 1) + unquoted_schema = match.group(1).strip("\"'") - return query.replace(query, f'database("{unquoted_schema}").{match.group(2)}', 1) + return query.replace(original, f'database("{unquoted_schema}").["{unquoted_table}"]', 1) class KustoKqlHttpsDialect(KustoBaseDialect): name = "kustokql" statement_compiler = KustoKqlCompiler preparer = KustoKqlIdentifierPreparer + supports_statement_cache = True diff --git a/sqlalchemy_kusto/dialect_sql.py b/sqlalchemy_kusto/dialect_sql.py index 2b9f63e..6404080 100644 --- a/sqlalchemy_kusto/dialect_sql.py +++ b/sqlalchemy_kusto/dialect_sql.py @@ -34,3 +34,6 @@ def delete_extra_from_clause(self, update_stmt, from_table, extra_froms, from_hi class KustoSqlHttpsDialect(KustoBaseDialect): name = "kustosql" statement_compiler = KustoSqlCompiler + # For some reason supports_statement_cache doesn't work when defined in the KustoBaseDialect. + # Need to investigate why it happens. + supports_statement_cache = True diff --git a/tests/unit/test_dialect_kql.py b/tests/unit/test_dialect_kql.py index 4ae6f5b..e621075 100644 --- a/tests/unit/test_dialect_kql.py +++ b/tests/unit/test_dialect_kql.py @@ -22,10 +22,10 @@ def test_compiler_with_projection(): query_compiled = str(query.compile(engine)).replace("\n", "") query_expected = ( - "let virtual_table = (logs | take 10);" + 'let virtual_table = (["logs"] | take 10);' "virtual_table" "| project id = Id, tId = TypeId, Type" - "| take %(param_1)s" + "| take __[POSTCOMPILE_param_1]" ) assert query_compiled == query_expected @@ -42,7 +42,7 @@ def test_compiler_with_star(): query = query.limit(10) query_compiled = str(query.compile(engine)).replace("\n", "") - query_expected = "let virtual_table = (logs | take 10);" "virtual_table" "| take %(param_1)s" + query_expected = 'let virtual_table = (["logs"] | take 10);' "virtual_table" "| take __[POSTCOMPILE_param_1]" assert query_compiled == query_expected @@ -50,7 +50,7 @@ def test_compiler_with_star(): def test_select_from_text(): query = select([column("Field1"), column("Field2")]).select_from(text("logs")).limit(100) query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") - query_expected = "logs" "| project Field1, Field2" "| take 100" + query_expected = '["logs"]' "| project Field1, Field2" "| take 100" assert query_compiled == query_expected @@ -67,7 +67,7 @@ def test_use_table(): query = stream.select().limit(5) query_compiled = str(query.compile(engine)).replace("\n", "") - query_expected = "logs" "| project Field1, Field2" "| take %(param_1)s" + query_expected = '["logs"]' "| project Field1, Field2" "| take __[POSTCOMPILE_param_1]" assert query_compiled == query_expected @@ -78,7 +78,7 @@ def test_limit(): query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") - query_expected = "let inner_qry = (logs);" "inner_qry" "| take 5" + query_expected = 'let inner_qry = (["logs"]);' "inner_qry" "| take 5" assert query_compiled == query_expected @@ -98,7 +98,7 @@ def test_select_count(): query_compiled = str(query.compile(engine, compile_kwargs={"literal_binds": True})).replace("\n", "") query_expected = ( - "let inner_qry = (logs);" + 'let inner_qry = (["logs"]);' "inner_qry" "| where Field1 > 1 and Field2 < 2" "| summarize count = count()" @@ -117,7 +117,7 @@ def test_select_with_let(): query_expected = ( "let x = 5;" "let y = 3;" - "let inner_qry = (MyTable | where Field1 == x and Field2 == y);" + 'let inner_qry = (["MyTable"] | where Field1 == x and Field2 == y);' "inner_qry" "| take 5" ) @@ -140,9 +140,9 @@ def test_quotes(): # fmt: off query_expected = ( - "logs" + '["logs"]' '| project ["Field1"], ["Field2"]' - "| take %(param_1)s" + "| take __[POSTCOMPILE_param_1]" ) # fmt: on @@ -152,13 +152,13 @@ def test_quotes(): @pytest.mark.parametrize( "schema_name,table_name,expected_table_name", [ - ("schema", "table", 'database("schema").table'), - ("schema", '"table.name"', 'database("schema")."table.name"'), - ('"schema.name"', "table", 'database("schema.name").table'), - ('"schema.name"', '"table.name"', 'database("schema.name")."table.name"'), - ('"schema name"', '"table name"', 'database("schema name")."table name"'), - (None, '"table.name"', '"table.name"'), - (None, "MyTable", "MyTable"), + ("schema", "table", 'database("schema").["table"]'), + ("schema", '"table.name"', 'database("schema").["table.name"]'), + ('"schema.name"', "table", 'database("schema.name").["table"]'), + ('"schema.name"', '"table.name"', 'database("schema.name").["table.name"]'), + ('"schema name"', '"table name"', 'database("schema name").["table name"]'), + (None, '"table.name"', '["table.name"]'), + (None, "MyTable", '["MyTable"]'), ], ) def test_schema_from_metadata(table_name: str, schema_name: str, expected_table_name: str): @@ -171,20 +171,22 @@ def test_schema_from_metadata(table_name: str, schema_name: str, expected_table_ query_compiled = str(query.compile(engine)).replace("\n", "") - query_expected = f"{expected_table_name}| take %(param_1)s" + query_expected = f"{expected_table_name}| take __[POSTCOMPILE_param_1]" assert query_compiled == query_expected @pytest.mark.parametrize( "query_table_name,expected_table_name", [ - ("schema.table", 'database("schema").table'), - ('schema."table.name"', 'database("schema")."table.name"'), - ('"schema.name".table', 'database("schema.name").table'), - ('"schema.name"."table.name"', 'database("schema.name")."table.name"'), - ('"schema name"."table name"', 'database("schema name")."table name"'), - ('"table.name"', '"table.name"'), - ("MyTable", "MyTable"), + ("schema.table", 'database("schema").["table"]'), + ('schema."table.name"', 'database("schema").["table.name"]'), + ('"schema.name".table', 'database("schema.name").["table"]'), + ('"schema.name"."table.name"', 'database("schema.name").["table.name"]'), + ('"schema name"."table name"', 'database("schema name").["table name"]'), + ('"table.name"', '["table.name"]'), + ("MyTable", '["MyTable"]'), + ('["schema"].["table"]', 'database("schema").["table"]'), + ('["table"]', '["table"]'), ], ) def test_schema_from_query(query_table_name: str, expected_table_name: str):