Skip to content

Commit

Permalink
Allow internal retries when pending k8s pod is deleted (apache#45184)
Browse files Browse the repository at this point in the history
* Remove code that fails task upon pending pod deletion

* Remove clear_not_launched_queued_tasks

* Remove units tests for clear_not_launched_queued_tasks

* Remove worker_pods_queued_check_interval configuration

* Remove worker_pods_queued_check_interval from provider.yaml

---------

Co-authored-by: Ryan Hatter <[email protected]>
Co-authored-by: Daniel Imberman <[email protected]>
  • Loading branch information
3 people authored Jan 24, 2025
1 parent f5f5200 commit f871e01
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 568 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

from deprecated import deprecated
from kubernetes.dynamic import DynamicClient
from sqlalchemy import or_, select, update
from sqlalchemy import select

try:
from airflow.cli.cli_config import ARG_LOGICAL_DATE
Expand All @@ -60,7 +60,6 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import KUBERNETES_EXECUTOR
from airflow.providers.cncf.kubernetes.exceptions import PodMutationHookException, PodReconciliationError
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
Expand All @@ -69,7 +68,6 @@
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key
from airflow.stats import Stats
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -145,7 +143,6 @@ def __init__(self):
self.kube_scheduler: AirflowKubernetesScheduler | None = None
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None
self.event_scheduler: EventScheduler | None = None
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
Expand Down Expand Up @@ -218,96 +215,6 @@ def get_pod_combined_search_str_to_pod_map(self) -> dict[str, k8s.V1Pod]:
pod_combined_search_str_to_pod_map[search_str] = pod
return pod_combined_search_str_to_pod_map

@provide_session
def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> None:
"""
Clear tasks that were not yet launched, but were previously queued.
Tasks can end up in a "Queued" state when a rescheduled/deferred operator
comes back up for execution (with the same try_number) before the
pod of its previous incarnation has been fully removed (we think).
It's also possible when an executor abruptly shuts down (leaving a non-empty
task_queue on that executor), but that scenario is handled via normal adoption.
This method checks each of our queued tasks to see if the corresponding pod
is around, and if not, and there's no matching entry in our own
task_queue, marks it for re-execution.
"""
if TYPE_CHECKING:
assert self.kube_client
from airflow.models.taskinstance import TaskInstance

hybrid_executor_enabled = hasattr(TaskInstance, "executor")
default_executor_alias = None
if hybrid_executor_enabled:
from airflow.executors.executor_loader import ExecutorLoader

default_executor_name = ExecutorLoader.get_default_executor_name()
default_executor_alias = default_executor_name.alias

with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
self.log.debug("Clearing tasks that have not been launched")
query = select(TaskInstance).where(
TaskInstance.state == TaskInstanceState.QUEUED,
TaskInstance.queued_by_job_id == self.job_id,
)
if self.kubernetes_queue:
query = query.where(TaskInstance.queue == self.kubernetes_queue)
# KUBERNETES_EXECUTOR is the string name/alias of the "core" executor represented by this
# module. The ExecutorName for "core" executors always contains an alias and cannot be modified
# to be different from the constant (in this case KUBERNETES_EXECUTOR).
elif hybrid_executor_enabled and default_executor_alias == KUBERNETES_EXECUTOR:
query = query.where(
or_(
TaskInstance.executor == KUBERNETES_EXECUTOR,
TaskInstance.executor.is_(None),
),
)
elif hybrid_executor_enabled:
query = query.where(TaskInstance.executor == KUBERNETES_EXECUTOR)
queued_tis: list[TaskInstance] = session.scalars(query).all()
self.log.info("Found %s queued task instances", len(queued_tis))

# Go through the "last seen" dictionary and clean out old entries
allowed_age = self.kube_config.worker_pods_queued_check_interval * 3
for key, timestamp in list(self.last_handled.items()):
if time.time() - timestamp > allowed_age:
del self.last_handled[key]

if not queued_tis:
return

pod_combined_search_str_to_pod_map = self.get_pod_combined_search_str_to_pod_map()

for ti in queued_tis:
self.log.debug("Checking task instance %s", ti)

# Check to see if we've handled it ourselves recently
if ti.key in self.last_handled:
continue

# Build the pod selector
base_selector = f"dag_id={ti.dag_id},task_id={ti.task_id}"
if ti.map_index >= 0:
# Old tasks _couldn't_ be mapped, so we don't have to worry about compat
base_selector += f",map_index={ti.map_index}"

search_str = f"{base_selector},run_id={ti.run_id}"
if search_str in pod_combined_search_str_to_pod_map:
continue
self.log.info("TaskInstance: %s found in queued state but was not launched, rescheduling", ti)
session.execute(
update(TaskInstance)
.where(
TaskInstance.dag_id == ti.dag_id,
TaskInstance.task_id == ti.task_id,
TaskInstance.run_id == ti.run_id,
TaskInstance.map_index == ti.map_index,
)
.values(state=TaskInstanceState.SCHEDULED)
)

def start(self) -> None:
"""Start the executor."""
self.log.info("Start Kubernetes executor")
Expand All @@ -325,15 +232,6 @@ def start(self) -> None:
kube_client=self.kube_client,
scheduler_job_id=self.scheduler_job_id,
)
self.event_scheduler = EventScheduler()

self.event_scheduler.call_regular_interval(
self.kube_config.worker_pods_queued_check_interval,
self.clear_not_launched_queued_tasks,
)
# We also call this at startup as that's the most likely time to see
# stuck queued tasks
self.clear_not_launched_queued_tasks()

def execute_async(
self,
Expand Down Expand Up @@ -378,7 +276,6 @@ def sync(self) -> None:
assert self.kube_config
assert self.result_queue
assert self.task_queue
assert self.event_scheduler

if self.running:
self.log.debug("self.running: %s", self.running)
Expand Down Expand Up @@ -466,10 +363,6 @@ def sync(self) -> None:
finally:
self.task_queue.task_done()

# Run any pending timed events
next_event = self.event_scheduler.run(blocking=False)
self.log.debug("Next timed event is in %f", next_event)

@provide_session
def _change_state(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,8 @@ def process_status(
)
elif status == "Pending":
# deletion_timestamp is set by kube server when a graceful deletion is requested.
# since kube server have received request to delete pod set TI state failed
if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
self.log.info("Event: Failed to start pod %s, annotations: %s", pod_name, annotations_string)
self.watcher_queue.put(
(pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version)
)
elif (
self.kube_config.worker_pod_pending_fatal_container_state_reasons
and "status" in event["raw_object"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ def __init__(self):
# interact with cluster components.
self.executor_namespace = conf.get(self.kubernetes_section, "namespace")

self.worker_pods_queued_check_interval = conf.getint(
self.kubernetes_section, "worker_pods_queued_check_interval"
)

self.kube_client_request_args = conf.getjson(
self.kubernetes_section, "kube_client_request_args", fallback={}
)
Expand Down
7 changes: 0 additions & 7 deletions providers/src/airflow/providers/cncf/kubernetes/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,6 @@ config:
type: boolean
example: ~
default: "True"
worker_pods_queued_check_interval:
description: |
How often in seconds to check for task instances stuck in "queued" status without a pod
version_added: ~
type: integer
example: ~
default: "60"
ssl_ca_cert:
description: |
Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate.
Expand Down
Loading

0 comments on commit f871e01

Please sign in to comment.