Skip to content

Commit

Permalink
CORE-426 Optimize duplicate query (sodadata#1781)
Browse files Browse the repository at this point in the history
* CORE-426 Optimize duplicate query
  • Loading branch information
m1n0 authored Feb 6, 2023
1 parent 13f3e20 commit f000c70
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 13 deletions.
9 changes: 3 additions & 6 deletions soda/core/soda/execution/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,20 +713,17 @@ def sql_get_duplicates_count(
column_names: str,
table_name: str,
filter: str,
exclude_patterns: list[str] | None = None,
) -> str | None:
main_query_columns = f"{column_names}, frequency" if exclude_patterns else "*"
sql = dedent(
f"""
WITH frequencies AS (
SELECT {column_names}, COUNT(*) AS frequency
SELECT COUNT(*) AS frequency
FROM {table_name}
WHERE {filter}
GROUP BY {column_names})
SELECT {main_query_columns}
SELECT count(*)
FROM frequencies
WHERE frequency > 1
ORDER BY frequency DESC"""
WHERE frequency > 1"""
)

return sql
Expand Down
8 changes: 4 additions & 4 deletions soda/core/soda/execution/query/duplicates_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def __init__(self, partition: "Partition", metric: "Metric"):
column_names,
table_name,
values_filter,
exclude_patterns=exclude_patterns,
)
)

Expand Down Expand Up @@ -96,10 +95,11 @@ def __init__(self, partition: "Partition", metric: "Metric"):
)

def execute(self):
self.fetchall()
self.metric.set_value(len(self.rows))
self.fetchone()
duplicates_count = self.row[0]
self.metric.set_value(duplicates_count)

if self.rows:
if duplicates_count:
sample_query = SampleQuery(
self.data_source_scan,
self.metric,
Expand Down
6 changes: 3 additions & 3 deletions soda/core/tests/data_source/test_duplicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_duplicates_single_column(data_source_fixture: DataSourceFixture):
scan.assert_all_checks_pass()

# This is a simple use case, verify that * is used in the main query.
scan.assert_log("SELECT *")
scan.assert_log("count(*)")


def test_duplicates_multiple_columns(data_source_fixture: DataSourceFixture):
Expand Down Expand Up @@ -51,8 +51,8 @@ def test_duplicates_with_exclude_columns(data_source_fixture: DataSourceFixture)
scan.assert_all_checks_pass()

# Exclude columns present, query should list the columns explicitly
scan.assert_log("SELECT cat, frequency")
scan.assert_no_log("SELECT *")
scan.assert_log("cat, frequency")
scan.assert_no_log(" * ")


def test_duplicates_with_filter(data_source_fixture: DataSourceFixture):
Expand Down

0 comments on commit f000c70

Please sign in to comment.