From a62e181b23d57acfda6c9672311853dc78e4aece Mon Sep 17 00:00:00 2001 From: Daniel Rizk Date: Wed, 8 Jan 2025 11:05:47 -0500 Subject: [PATCH 1/2] adds intersect and setdiff --- NEWS.md | 7 ++ Project.toml | 2 +- src/TidierDB.jl | 3 +- src/docstrings.jl | 113 ++++++++++++++++++++++- src/joins_sq.jl | 145 ----------------------------- src/union_intersect_setdiff.jl | 162 +++++++++++++++++++++++++++++++++ 6 files changed, 284 insertions(+), 148 deletions(-) create mode 100644 src/union_intersect_setdiff.jl diff --git a/NEWS.md b/NEWS.md index 7cd0a81..3752f65 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,11 @@ # TidierDB.jl updates +## v0.6.2 - 2025-01-08 +- adds `@intersect` and `@setdiff` (SQLs `INTERSECT` and `EXCEPT`) respectively, with optional `all` argument +- adds support for `all` arg to `@union` (equivalent to `@union_all`) + +## v0.6.1 - 2025-01-07 +- Bumps julia LTS to 1.10 + ## v0.6.0 - 2025-01-07 - Adds support for joining on multiple columns - Adds support for inequality joins diff --git a/Project.toml b/Project.toml index 402f274..5e3c57c 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "TidierDB" uuid = "86993f9b-bbba-4084-97c5-ee15961ad48b" authors = ["Daniel Rizk and contributors"] -version = "0.6.1" +version = "0.6.2" [deps] Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" diff --git a/src/TidierDB.jl b/src/TidierDB.jl index 244ae0e..5eb0588 100644 --- a/src/TidierDB.jl +++ b/src/TidierDB.jl @@ -19,7 +19,7 @@ using GZip @slice_min, @slice_sample, @rename, copy_to, duckdb_open, duckdb_connect, @semi_join, @full_join, @anti_join, connect, from_query, @interpolate, add_interp_parameter!, update_con, @head, clickhouse, duckdb, sqlite, mysql, mssql, postgres, athena, snowflake, gbq, oracle, databricks, SQLQuery, show_tables, - t, @union, @create_view, drop_view, @compute, warnings, @relocate, @union_all + t, @union, @create_view, drop_view, @compute, warnings, @relocate, @union_all, @setdiff, @intersect abstract type SQLBackend end @@ -63,6 +63,7 @@ include("slices_sq.jl") include("windows.jl") include("view_compute.jl") include("relocate.jl") +include("union_intersect_setdiff.jl") # Unified expr_to_sql function to use right mode diff --git a/src/docstrings.jl b/src/docstrings.jl index ee1286a..0252f7e 100644 --- a/src/docstrings.jl +++ b/src/docstrings.jl @@ -1355,13 +1355,14 @@ SQLQuery("", "df_mem", "", "", "", "", "", "", false, false, 4×4 DataFrame const docstring_union = """ - @union(sql_query1, sql_query2) + @union(sql_query1, sql_query2, all = false) Combine two SQL queries using the `UNION` operator. # Arguments - `sql_query1`: The first SQL query to combine. - `sql_query2`: The second SQL query to combine. +- `all`: Defaults to false, when true it will will return duplicates. `UNION ALL` # Returns - A lazy query of all distinct rows in the second query bound to the first @@ -1420,6 +1421,18 @@ julia> @chain t(df1_table) begin 1 │ 1 10 2 │ 2 20 3 │ 3 30 + +julia> @chain t(df1_table) @union(df1_table, all = true) @collect +6×2 DataFrame + Row │ id value + │ Int64 Int64 +─────┼────────────── + 1 │ 1 10 + 2 │ 2 20 + 3 │ 3 30 + 4 │ 1 10 + 5 │ 2 20 + 6 │ 3 30 ``` """ @@ -1460,6 +1473,104 @@ julia> @chain t(df1_table) @union_all(df1_table) @collect ``` """ +const docstring_intersect = +""" + @intersect(sql_query1, sql_query2, all = false) + +Combine two SQL queries/tables using `INTERSECT` + +# Arguments +- `sql_query1`: The first SQL query to combine. +- `sql_query2`: The second SQL query to combine. +- `all`: Defaults to false, when true it will return duplicates. `INTERSECT ALL` + +# Returns +- A lazy query of all rows in the second query bound to the first + +# Examples +```jldoctest +julia> db = connect(duckdb()); + +julia> df1 = DataFrame(id = [1, 2, 2, 3, 4], + name = ["Alice", "Bob", "Bob", "Charlie", "David"]); + +julia> df2 = DataFrame( id = [2, 2, 3, 5], + name = ["Bob", "Bob", "Charlie", "Eve"]); + +julia> copy_to(db, df1, "df1"); copy_to(db, df2, "df2"); + +julia> df1_table = db_table(db, "df1"); + +julia> df2_table = db_table(db, "df2"); + +julia> @chain t(df1_table) @intersect(df2_table) @collect +2×2 DataFrame + Row │ id name + │ Int64 String +─────┼──────────────── + 1 │ 2 Bob + 2 │ 3 Charlie + +julia> @chain t(df1_table) @intersect(df2_table, all = true) @collect +3×2 DataFrame + Row │ id name + │ Int64 String +─────┼──────────────── + 1 │ 3 Charlie + 2 │ 2 Bob + 3 │ 2 Bob +``` +""" + +const docstring_setdiff = +""" + @setdiff(sql_query1, sql_query2, all = false) + +Combine two SQL queries/tables using `EXECPT` + +# Arguments +- `sql_query1`: The first SQL query to combine. +- `sql_query2`: The second SQL query to combine. +- `all`: Defaults to false, when true it will return duplicates. `EXCEPT ALL` + +# Returns +- A lazy query of all rows in the second query bound to the first + +# Examples +```jldoctest +julia> db = connect(duckdb()); + +julia> df1 = DataFrame(id = [1, 1, 2, 2, 3, 4], + name = ["Alice", "Alice", "Bob", "Bob", "Charlie", "David"]); + +julia> df2 = DataFrame(id = [2, 2, 3, 5], + name = ["Bob", "Bob", "Charlie", "Eve"]); + +julia> copy_to(db, df1, "df1"); copy_to(db, df2, "df2"); + +julia> df1_table = db_table(db, "df1"); + +julia> df2_table = db_table(db, "df2"); + +julia> @chain t(df1_table) @setdiff(df2_table) @collect +2×2 DataFrame + Row │ id name + │ Int64 String +─────┼─────────────── + 1 │ 1 Alice + 2 │ 4 David + +julia> @chain t(df1_table) @setdiff(df2_table, all = true) @collect +3×2 DataFrame + Row │ id name + │ Int64 String +─────┼─────────────── + 1 │ 1 Alice + 2 │ 1 Alice + 3 │ 4 David +``` +""" + const docstring_create_view = """ @view(sql_query, name, replace = true) diff --git a/src/joins_sq.jl b/src/joins_sq.jl index 518a8cc..f28095a 100644 --- a/src/joins_sq.jl +++ b/src/joins_sq.jl @@ -507,149 +507,4 @@ macro anti_join(sqlquery, join_table, expr... ) $as_of ) end -end - - - -""" -$docstring_union -""" -macro union(sqlquery, union_query) - return quote - sq = $(esc(sqlquery)) - uq = $(esc(union_query)) - - if isa(sq, SQLQuery) - # Determine if sq needs a new CTE - needs_new_cte_sq = !isempty(sq.select) || !isempty(sq.where) || sq.is_aggregated || !isempty(sq.ctes) - if needs_new_cte_sq - sq.cte_count += 1 - cte_name_sq = "cte_" * string(sq.cte_count) - most_recent_source_sq = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - select_sql_sq = "SELECT * FROM " * most_recent_source_sq - new_cte_sq = CTE(name=cte_name_sq, select=select_sql_sq) - push!(sq.ctes, new_cte_sq) - sq.from = cte_name_sq - end - - # Prepare the union query - if isa(uq, SQLQuery) - # Determine if uq needs a new CTE - needs_new_cte_uq = !isempty(uq.select) || !isempty(uq.where) || uq.is_aggregated || !isempty(uq.ctes) - if needs_new_cte_uq - sq.join_count +=1 - joinc = "j" * string(sq.join_count) - for cte in uq.ctes - cte.name = joinc * cte.name - end - uq.cte_count += 1 - cte_name_uq = joinc * "cte_" * string(uq.cte_count) - most_recent_source_uq = !isempty(uq.ctes) ? joinc * "cte_" * string(uq.cte_count - 1) : uq.from - select_sql_uq = finalize_query_jq(uq, most_recent_source_uq) - new_cte_uq = CTE(name=cte_name_uq, select=select_sql_uq) - push!(uq.ctes, new_cte_uq) - uq.from = cte_name_uq - end - - # Combine the queries using UNION - union_sql = "SELECT * FROM " * sq.from * " UNION SELECT * FROM " * uq.from - - # Merge CTEs and metadata - sq.ctes = vcat(sq.ctes, uq.ctes) - # sq.metadata = vcat(sq.metadata, uq.metadata) - else - # Treat uq as a table name - union_sql = "SELECT * FROM " * sq.from * " UNION SELECT * FROM " * string(uq) - # Update metadata - if current_sql_mode[] != :athena - new_metadata = get_table_metadata(sq.db, string(uq)) - else - new_metadata = get_table_metadata_athena(sq.db, string(uq), sq.athena_params) - end - # sq.metadata = vcat(sq.metadata, new_metadata) - end - - # Create a new CTE for the union - sq.cte_count += 1 - union_cte_name = "cte_" * string(sq.cte_count) - union_cte = CTE(name=union_cte_name, select=union_sql) - push!(sq.ctes, union_cte) - sq.from = union_cte_name - else - error("Expected sqlquery to be an instance of SQLQuery") - end - sq - end -end - - -""" -$docstring_union_all -""" -macro union_all(sqlquery, union_query) - return quote - sq = $(esc(sqlquery)) - uq = $(esc(union_query)) - - if isa(sq, SQLQuery) - # Determine if sq needs a new CTE - needs_new_cte_sq = !isempty(sq.select) || !isempty(sq.where) || sq.is_aggregated || !isempty(sq.ctes) - if needs_new_cte_sq - sq.cte_count += 1 - cte_name_sq = "cte_" * string(sq.cte_count) - most_recent_source_sq = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from - select_sql_sq = "SELECT * FROM " * most_recent_source_sq - new_cte_sq = CTE(name=cte_name_sq, select=select_sql_sq) - push!(sq.ctes, new_cte_sq) - sq.from = cte_name_sq - end - - # Prepare the union query - if isa(uq, SQLQuery) - # Determine if uq needs a new CTE - needs_new_cte_uq = !isempty(uq.select) || !isempty(uq.where) || uq.is_aggregated || !isempty(uq.ctes) - if needs_new_cte_uq - sq.join_count +=1 - joinc = "j" * string(sq.join_count) - for cte in uq.ctes - cte.name = joinc * cte.name - end - uq.cte_count += 1 - cte_name_uq = joinc * "cte_" * string(uq.cte_count) - most_recent_source_uq = !isempty(uq.ctes) ? joinc * "cte_" * string(uq.cte_count - 1) : uq.from - select_sql_uq = finalize_query_jq(uq, most_recent_source_uq) - new_cte_uq = CTE(name=cte_name_uq, select=select_sql_uq) - push!(uq.ctes, new_cte_uq) - uq.from = cte_name_uq - end - - # Combine the queries using UNION - union_sql = "SELECT * FROM " * sq.from * " UNION ALL SELECT * FROM " * uq.from - - # Merge CTEs and metadata - sq.ctes = vcat(sq.ctes, uq.ctes) - # sq.metadata = vcat(sq.metadata, uq.metadata) - else - # Treat uq as a table name - union_sql = "SELECT * FROM " * sq.from * " UNION ALL SELECT * FROM " * string(uq) - # Update metadata - if current_sql_mode[] != :athena - new_metadata = get_table_metadata(sq.db, string(uq)) - else - new_metadata = get_table_metadata_athena(sq.db, string(uq), sq.athena_params) - end - # sq.metadata = vcat(sq.metadata, new_metadata) - end - - # Create a new CTE for the union - sq.cte_count += 1 - union_cte_name = "cte_" * string(sq.cte_count) - union_cte = CTE(name=union_cte_name, select=union_sql) - push!(sq.ctes, union_cte) - sq.from = union_cte_name - else - error("Expected sqlquery to be an instance of SQLQuery") - end - sq - end end \ No newline at end of file diff --git a/src/union_intersect_setdiff.jl b/src/union_intersect_setdiff.jl new file mode 100644 index 0000000..532332d --- /dev/null +++ b/src/union_intersect_setdiff.jl @@ -0,0 +1,162 @@ +# A unified function that performs the set operation, preserving existing logic + +function perform_set_operation(sq::SQLQuery, uq_or_table, op::String; all::Bool=false) + + if !isa(sq, SQLQuery) + error("Expected sqlquery to be an instance of SQLQuery") + end + + # 1) Possibly create a new CTE for the left query (sq) + needs_new_cte_sq = !isempty(sq.select) || !isempty(sq.where) || sq.is_aggregated || !isempty(sq.ctes) + if needs_new_cte_sq + sq.cte_count += 1 + cte_name_sq = "cte_" * string(sq.cte_count) + most_recent_source_sq = !isempty(sq.ctes) ? "cte_" * string(sq.cte_count - 1) : sq.from + select_sql_sq = "SELECT * FROM " * most_recent_source_sq + new_cte_sq = CTE(name=cte_name_sq, select=select_sql_sq) + push!(sq.ctes, new_cte_sq) + sq.from = cte_name_sq + end + + local op_clause + if all + op_clause = op * " ALL" + else + op_clause = op + end + + local union_sql + if isa(uq_or_table, SQLQuery) + uq = uq_or_table + # Possibly create a new CTE for the right query (uq) + needs_new_cte_uq = !isempty(uq.select) || !isempty(uq.where) || uq.is_aggregated || !isempty(uq.ctes) + if needs_new_cte_uq + sq.join_count += 1 + joinc = "j" * string(sq.join_count) + for cte in uq.ctes + cte.name = joinc * cte.name + end + uq.cte_count += 1 + cte_name_uq = joinc * "cte_" * string(uq.cte_count) + most_recent_source_uq = !isempty(uq.ctes) ? joinc * "cte_" * string(uq.cte_count - 1) : uq.from + select_sql_uq = finalize_query_jq(uq, most_recent_source_uq) + new_cte_uq = CTE(name=cte_name_uq, select=select_sql_uq) + push!(uq.ctes, new_cte_uq) + uq.from = cte_name_uq + end + + # Combine + union_sql = "SELECT * FROM " * sq.from * " " * op_clause * " SELECT * FROM " * uq.from + + sq.ctes = vcat(sq.ctes, uq.ctes) + # sq.metadata = vcat(sq.metadata, uq.metadata) + + else + # Treat uq_or_table as a table name + tbl_name = string(uq_or_table) + union_sql = "SELECT * FROM " * sq.from * " " * op_clause * " SELECT * FROM " * tbl_name + + # Update metadata (commented out as in original) + if current_sql_mode[] != :athena + new_metadata = get_table_metadata(sq.db, tbl_name) + else + new_metadata = get_table_metadata_athena(sq.db, tbl_name, sq.athena_params) + end + # sq.metadata = vcat(sq.metadata, new_metadata) + end + + # 4) Create a new CTE for the combined result + sq.cte_count += 1 + union_cte_name = "cte_" * string(sq.cte_count) + union_cte = CTE(name=union_cte_name, select=union_sql) + push!(sq.ctes, union_cte) + sq.from = union_cte_name + + return sq +end + + +""" +$docstring_union +""" +macro union(sqlquery, union_query, args...) + # parse the `all` argument exactly as in the original logic + all_flag = false + for arg in args + if isa(arg, Expr) && arg.head == :(=) + if arg.args[1] == :all && arg.args[2] == true + all_flag = true + end + end + end + return quote + perform_set_operation( + $(esc(sqlquery)), + $(esc(union_query)), + "UNION"; + all = $(all_flag) + ) + end +end + +""" +$docstring_union_all +""" +macro union_all(sqlquery, union_query) + return quote + perform_set_operation( + $(esc(sqlquery)), + $(esc(union_query)), + "UNION"; # We'll let the function append " ALL" + all = true + ) + end +end + +""" +$docstring_intersect +""" +macro intersect(sqlquery, union_query, args...) + # parse the `all` argument exactly as in the original logic + all_flag = false + for arg in args + if isa(arg, Expr) && arg.head == :(=) + if arg.args[1] == :all && arg.args[2] == true + all_flag = true + end + end + end + + return quote + perform_set_operation( + $(esc(sqlquery)), + $(esc(union_query)), + "INTERSECT"; + all = $(all_flag) + ) + end +end + +""" +$docstring_setdiff +""" +macro setdiff(sqlquery, union_query, args...) + # parse the `all` argument exactly as in the original logic + all_flag = false + for arg in args + if isa(arg, Expr) && arg.head == :(=) + if arg.args[1] == :all && arg.args[2] == true + all_flag = true + end + end + end + + return quote + perform_set_operation( + $(esc(sqlquery)), + $(esc(union_query)), + "EXCEPT"; + all = $(all_flag) + ) + end +end From 96165e25529d09243355022f0209196a42993527 Mon Sep 17 00:00:00 2001 From: Daniel Rizk Date: Thu, 9 Jan 2025 10:17:52 -0500 Subject: [PATCH 2/2] small join fix --- NEWS.md | 2 +- src/joins_sq.jl | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 3752f65..de71742 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,5 @@ # TidierDB.jl updates -## v0.6.2 - 2025-01-08 +## v0.6.2 - 2025-01-09 - adds `@intersect` and `@setdiff` (SQLs `INTERSECT` and `EXCEPT`) respectively, with optional `all` argument - adds support for `all` arg to `@union` (equivalent to `@union_all`) diff --git a/src/joins_sq.jl b/src/joins_sq.jl index f28095a..32b2e9e 100644 --- a/src/joins_sq.jl +++ b/src/joins_sq.jl @@ -221,6 +221,7 @@ function do_join( end # matching_indices_sq = findall(vq.metadata.name == rhs) + jq = jq isa String ? db_table(sq.db, jq) : jq jq.metadata = filter(row -> !(row.name in rhs_d), jq.metadata) join_sql = " " *