Skip to content

Commit

Permalink
Support multiple SQL queries in Dataproc SQL job (apache#44890)
Browse files Browse the repository at this point in the history
* Send multiple queries on dataproc sql job

DataProcJobBuilder.add_query(query) is misleading, cause it can make you think that you can call this function multiple times with different queries and then execute and it will send all queries, but it fact it will sent the last one since its override the queries.

I've added set_queries function which takes a list of strings and send it.
Dataproc supports queries list.

* Append query to the queries sent to dataproc instead of replacing

Requested by reviewer, instead of adding a function that sets a list of queries, just fix the original function add_query that will actually append query to the queries list send to dataproc job.

* Send multiple queries on dataproc sql job

DataProcJobBuilder.add_query(query) is misleading, cause it can make you think that you can call this function multiple times with different queries and then execute and it will send all queries, but it fact it will sent the last one since its override the queries.

I've added set_queries function which takes a list of strings and send it.
Dataproc supports queries list.

* Append query to the queries sent to dataproc instead of replacing

Requested by reviewer, instead of adding a function that sets a list of queries, just fix the original function add_query that will actually append query to the queries list send to dataproc job.

* formatter

---------

Co-authored-by: Amir Mor <[email protected]>
  • Loading branch information
amirmor1 and Amir Mor authored Dec 15, 2024
1 parent a4a5864 commit 9d68013
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
10 changes: 7 additions & 3 deletions providers/src/airflow/providers/google/cloud/hooks/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,17 @@ def add_args(self, args: list[str] | None = None) -> None:
if args is not None:
self.job["job"][self.job_type]["args"] = args

def add_query(self, query: str) -> None:
def add_query(self, query: str | list[str]) -> None:
"""
Set query for Dataproc job.
Add query for Dataproc job.
:param query: query for the job.
"""
self.job["job"][self.job_type]["query_list"] = {"queries": [query]}
queries = self.job["job"][self.job_type].setdefault("query_list", {"queries": []})["queries"]
if isinstance(query, str):
queries.append(query)
elif isinstance(query, list):
queries.extend(query)

def add_query_uri(self, query_uri: str) -> None:
"""
Expand Down
13 changes: 10 additions & 3 deletions providers/tests/google/cloud/hooks/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,9 +1097,16 @@ def test_add_args(self):
assert args == self.builder.job["job"][self.job_type]["args"]

def test_add_query(self):
query = ["query"]
self.builder.add_query(query)
assert self.builder.job["job"][self.job_type]["query_list"] == {"queries": [query]}
query1 = "query1"
self.builder.add_query(query1)
query2 = "query2"
self.builder.add_query(query2)
assert self.builder.job["job"][self.job_type]["query_list"] == {"queries": [query1, query2]}
new_queries = ["query3", "query4"]
self.builder.add_query(new_queries)
assert self.builder.job["job"][self.job_type]["query_list"] == {
"queries": [query1, query2] + new_queries
}

def test_add_query_uri(self):
query_uri = "query_uri"
Expand Down

0 comments on commit 9d68013

Please sign in to comment.