-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monkeypatch BiqQuery adapter to retrive SQL for async execution #1474
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
Deploying astronomer-cosmos with Cloudflare Pages
|
@@ -371,32 +370,6 @@ def generate_task_or_group( | |||
return task_or_group | |||
|
|||
|
|||
def _add_dbt_compile_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) -> Tuple[BigQueryAdapterResponse, agate.Table]: | ||
return BigQueryAdapterResponse("mock_bigquery_adapter_response"), empty_table() | ||
|
||
BigQueryConnectionManager.execute = execute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may have to mock a different method when using dbt build
. That said, this is outside of the scope of fixing the original but this PR solves, so I've logged a follow up ticket:
#1476
cosmos/operators/airflow_async.py
Outdated
"gcp_project", | ||
"dataset", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we remove these as templated fields?
@@ -60,13 +55,11 @@ class DbtSourceAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtSourceLocalO | |||
pass | |||
|
|||
|
|||
class DbtRunAirflowAsyncOperator(BigQueryInsertJobOperator): # type: ignore | |||
class DbtRunAirflowAsyncOperator(BigQueryInsertJobOperator, DbtRunLocalOperator): # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if we can see the SQL statement in the operator Templated Field, like we can do with the Cosmos Local operators. If this is too much work, I'm happy for it to be done in a follow up PR/ticket.
9590f54
to
e521051
Compare
cosmos/operators/local.py
Outdated
return_sql: bool = False, | ||
sql_context: dict[str, Any] | None = None, | ||
) -> FullOutputSubprocessResult | dbtRunnerResult | str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused with this method, given what the LocalOperatorrun_command
does:
astronomer-cosmos/cosmos/operators/local.py
Lines 422 to 427 in 74c7c7d
def run_command( | |
self, | |
cmd: list[str], | |
env: dict[str, str | bytes | os.PathLike[Any]], | |
context: Context, | |
) -> FullOutputSubprocessResult | dbtRunnerResult: |
The LocalOperator currently:
- runs the dbt command
- runs the actual transformation in the data warehouse
I suggest one of the following is done:
a) have both methods accomplishing the same goal (not only running the dbt command, but actually executing it in the data warehouse) or
b) name this method something else
If we decide to do approach (a), I believe we should:
- stick to the existing method interface, exposing it in the cosmos BaseOperator class
- change this method in this PR to also execute the SQL statement
@@ -647,11 +669,15 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope | |||
) | |||
|
|||
def build_and_run_cmd( | |||
self, context: Context, cmd_flags: list[str] | None = None | |||
self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar feedback to #1474 (comment) here.
Concerns regarding:
- inconsistent interface between different ExecutionMode implementations
- expectation that this method would not only build the command with necessary flags, but actually have executed the dbt transformation by the end of its run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pankajkoti I'm very excited that we now have a more reliable way of calculating the full dbt SQL query. This approach fixes #1260 and solves many of the async tickets we have open.
Monkey-patching always carries a risk, but it is worth it at this stage.
It would be great if - either as part of this PR - or as a priority follow-up PR, we have an efficient way of testing that the monkey patching works in multiple versions of dbt, including the latest releases, and that the transformation is not being executed when we run the dbt command. I believe this must be done before we release this feature in 1.9.0
I've logged two follow-up tickets that are relevant:
- one is to consider the re-introduction of the compile task, if that means we can avoid having dbt installed in all dbt worker nodes, and executing the command in most Cosmos tasks: Re-evaluate adding compile task when using
ExecutionMode.AIRFLOW_ASYNC
#1477 - the other is to support
TestBehavior.BUILD
SupportTestBehavior.BUILD
when usingExecutionMode.AIRFLOW_ASYNC
#1476
It would be great if these could be accomplished before 1.9.0 release, but I'm also happy with us sticking to approach if time does not allow further analysis / implementation.
cc: @joppevos for visibility on the ongoing work |
8ef4ac5
to
bd85529
Compare
c161db2
to
94eada9
Compare
1a413d6
to
379d997
Compare
closes: #1260
related: #1261
related: #1266
closes: #1265