From 7d4d050ee4ecf6e3c7117a60db87630406867085 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 27 Jan 2025 12:04:38 -0800 Subject: [PATCH] only retrieve task-level info for mapped dags --- .../dagster_airlift/core/load_defs.py | 2 ++ .../dagster_airlift/core/serialization/compute.py | 15 +++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py index f2cd3ec09795f..6c4e3df78ec50 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py @@ -51,6 +51,7 @@ def fetch_state(self) -> SerializedAirflowDefinitionsData: airflow_instance=self.airflow_instance, mapped_assets=self.mapped_assets, dag_selector_fn=self.dag_selector_fn, + automapping_enabled=False, ) def defs_from_state( @@ -252,6 +253,7 @@ def fetch_state(self) -> SerializedAirflowDefinitionsData: airflow_instance=self.airflow_instance, mapped_assets=self.mapped_assets, dag_selector_fn=None, + automapping_enabled=True, ) def defs_from_state( diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/serialization/compute.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/serialization/compute.py index 13ea2bc333b49..12f4705013d52 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/serialization/compute.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/serialization/compute.py @@ -141,14 +141,22 @@ def fetch_all_airflow_data( airflow_instance: AirflowInstance, mapping_info: AirliftMetadataMappingInfo, dag_selector_fn: Optional[DagSelectorFn], + automapping_enabled: bool, ) -> FetchedAirflowData: dag_infos = { dag.dag_id: dag for dag in airflow_instance.list_dags() if dag_selector_fn is None or dag_selector_fn(dag) } + # To limit the number of API calls, only fetch task infos for the dags that we absolutely have to. + # Airflow has no batch API for fetching task infos, so we have to fetch them one dag + # at a time. task_info_map = defaultdict(dict) - for dag_id in dag_infos: + for dag_id in mapping_info.dag_ids: + # Explicitly don't fetch task information for dags that have no mapped tasks, + # unless automapping is enabled. + if len(mapping_info.task_id_map[dag_id]) == 0 and not automapping_enabled: + continue task_info_map[dag_id] = { task_info.task_id: task_info for task_info in airflow_instance.get_task_infos(dag_id=dag_id) @@ -165,9 +173,12 @@ def compute_serialized_data( airflow_instance: AirflowInstance, mapped_assets: Iterable["MappedAsset"], dag_selector_fn: Optional[DagSelectorFn], + automapping_enabled: bool, ) -> "SerializedAirflowDefinitionsData": mapping_info = build_airlift_metadata_mapping_info(mapped_assets) - fetched_airflow_data = fetch_all_airflow_data(airflow_instance, mapping_info, dag_selector_fn) + fetched_airflow_data = fetch_all_airflow_data( + airflow_instance, mapping_info, dag_selector_fn, automapping_enabled=automapping_enabled + ) return SerializedAirflowDefinitionsData( instance_name=airflow_instance.name, key_scoped_task_handles=[