Skip to content

Commit

Permalink
only retrieve task-level info for mapped dags
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Jan 31, 2025
1 parent 07a4a2e commit 7d4d050
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def fetch_state(self) -> SerializedAirflowDefinitionsData:
airflow_instance=self.airflow_instance,
mapped_assets=self.mapped_assets,
dag_selector_fn=self.dag_selector_fn,
automapping_enabled=False,
)

def defs_from_state(
Expand Down Expand Up @@ -252,6 +253,7 @@ def fetch_state(self) -> SerializedAirflowDefinitionsData:
airflow_instance=self.airflow_instance,
mapped_assets=self.mapped_assets,
dag_selector_fn=None,
automapping_enabled=True,
)

def defs_from_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,22 @@ def fetch_all_airflow_data(
airflow_instance: AirflowInstance,
mapping_info: AirliftMetadataMappingInfo,
dag_selector_fn: Optional[DagSelectorFn],
automapping_enabled: bool,
) -> FetchedAirflowData:
dag_infos = {
dag.dag_id: dag
for dag in airflow_instance.list_dags()
if dag_selector_fn is None or dag_selector_fn(dag)
}
# To limit the number of API calls, only fetch task infos for the dags that we absolutely have to.
# Airflow has no batch API for fetching task infos, so we have to fetch them one dag
# at a time.
task_info_map = defaultdict(dict)
for dag_id in dag_infos:
for dag_id in mapping_info.dag_ids:
# Explicitly don't fetch task information for dags that have no mapped tasks,
# unless automapping is enabled.
if len(mapping_info.task_id_map[dag_id]) == 0 and not automapping_enabled:
continue
task_info_map[dag_id] = {
task_info.task_id: task_info
for task_info in airflow_instance.get_task_infos(dag_id=dag_id)
Expand All @@ -165,9 +173,12 @@ def compute_serialized_data(
airflow_instance: AirflowInstance,
mapped_assets: Iterable["MappedAsset"],
dag_selector_fn: Optional[DagSelectorFn],
automapping_enabled: bool,
) -> "SerializedAirflowDefinitionsData":
mapping_info = build_airlift_metadata_mapping_info(mapped_assets)
fetched_airflow_data = fetch_all_airflow_data(airflow_instance, mapping_info, dag_selector_fn)
fetched_airflow_data = fetch_all_airflow_data(
airflow_instance, mapping_info, dag_selector_fn, automapping_enabled=automapping_enabled
)
return SerializedAirflowDefinitionsData(
instance_name=airflow_instance.name,
key_scoped_task_handles=[
Expand Down

0 comments on commit 7d4d050

Please sign in to comment.