Skip to content

Commit

Permalink
K8s yaml templates not rendered by k8sexecutor (apache#12303)
Browse files Browse the repository at this point in the history
* K8s yaml templates not rendered by k8sexecutor

There is a bug in the yaml template rendering caused by the logic that
yaml templates are only generated when the current executor is the
k8sexecutor. This is a problem as the templates are generated by the
task pod, which is itself running a LocalExecutor. Also generates a
"base" template if this taskInstance has not run yet.

* fix tests

* fix taskinstance test

* fix taskinstance

* fix pod generator tests

* fix podgen

* Update tests/kubernetes/test_pod_generator.py

Co-authored-by: Ash Berlin-Taylor <[email protected]>

* @ashb comment

Co-authored-by: Ash Berlin-Taylor <[email protected]>
  • Loading branch information
dimberman and ashb authored Nov 13, 2020
1 parent d54f087 commit 4e362c1
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 7 deletions.
1 change: 1 addition & 0 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ def construct_pod( # pylint: disable=too-many-arguments
name="base",
command=command,
image=image,
env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")],
)
]
),
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Save Rendered Template Fields"""
import os
from typing import Optional

import sqlalchemy_jsonfield
Expand All @@ -26,7 +27,7 @@
from airflow.models.base import ID_LEN, Base
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.helpers import serialize_template_field
from airflow.settings import IS_K8S_OR_K8SCELERY_EXECUTOR, json
from airflow.settings import json
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime

Expand All @@ -50,7 +51,7 @@ def __init__(self, ti: TaskInstance, render_templates=True):
self.ti = ti
if render_templates:
ti.render_templates()
if IS_K8S_OR_K8SCELERY_EXECUTOR:
if os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", None):
self.k8s_pod_yaml = ti.render_k8s_pod_yaml()
self.rendered_fields = {
field: serialize_template_field(getattr(self.task, field)) for field in self.task.template_fields
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ def get_rendered_k8s_spec(self):
rendered_k8s_spec = RenderedTaskInstanceFields.get_k8s_pod_yaml(self)
if not rendered_k8s_spec:
try:
self.render_k8s_pod_yaml()
rendered_k8s_spec = self.render_k8s_pod_yaml()
except (TemplateAssertionError, UndefinedError) as e:
raise AirflowException(f"Unable to render a k8s spec for this taskinstance: {e}") from e
return rendered_k8s_spec
Expand Down
2 changes: 0 additions & 2 deletions scripts/ci/libraries/_kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
function kind::get_kind_cluster_name() {
# Name of the KinD cluster to connect to
export KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"}
readonly KIND_CLUSTER_NAME
# Name of the KinD cluster to connect to when referred to via kubectl
export KUBECTL_CLUSTER_NAME=kind-${KIND_CLUSTER_NAME}
readonly KUBECTL_CLUSTER_NAME
export KUBECONFIG="${BUILD_CACHE_DIR}/.kube/config"
mkdir -pv "${BUILD_CACHE_DIR}/.kube/"
touch "${KUBECONFIG}"
Expand Down
9 changes: 9 additions & 0 deletions tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ def test_construct_pod(self, mock_uuid):
expected.spec.containers[0].command = ['command']
expected.spec.containers[0].image = 'airflow_image'
expected.spec.containers[0].resources = {'limits': {'cpu': '1m', 'memory': '1G'}}
expected.spec.containers[0].env.append(
k8s.V1EnvVar(
name="AIRFLOW_IS_K8S_EXECUTOR_POD",
value='True',
)
)
result_dict = self.k8s_client.sanitize_for_serialization(result)
expected_dict = self.k8s_client.sanitize_for_serialization(self.expected)

Expand Down Expand Up @@ -473,6 +479,9 @@ def test_construct_pod_empty_executor_config(self, mock_uuid):
worker_config.metadata.labels['app'] = 'myapp'
worker_config.metadata.name = 'pod_id-' + self.static_uuid.hex
worker_config.metadata.namespace = 'namespace'
worker_config.spec.containers[0].env.append(
k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value='True')
)
worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
self.assertEqual(worker_config_result, sanitized_result)

Expand Down
4 changes: 3 additions & 1 deletion tests/models/test_renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

"""Unit tests for RenderedTaskInstanceFields."""

import os
import unittest
from datetime import date, timedelta
from unittest import mock
Expand Down Expand Up @@ -227,7 +228,7 @@ def test_write(self):
('test_write', 'test', {'bash_command': 'echo test_val_updated', 'env': None}), result_updated
)

@mock.patch("airflow.models.renderedtifields.IS_K8S_OR_K8SCELERY_EXECUTOR", new=True)
@mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
@mock.patch("airflow.settings.pod_mutation_hook")
def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook):
"""
Expand Down Expand Up @@ -281,6 +282,7 @@ def test_get_k8s_pod_yaml(self, mock_pod_mutation_hook):
],
'image': ':',
'name': 'base',
'env': [{'name': 'AIRFLOW_IS_K8S_EXECUTOR_POD', 'value': 'True'}],
}
]
},
Expand Down
3 changes: 2 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1814,7 +1814,7 @@ def test_get_rendered_template_fields(self):
with create_session() as session:
session.query(RenderedTaskInstanceFields).delete()

@patch("airflow.models.renderedtifields.IS_K8S_OR_K8SCELERY_EXECUTOR", new=True)
@mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
def test_get_rendered_k8s_spec(self):
with DAG('test_get_rendered_k8s_spec', start_date=DEFAULT_DATE):
task = BashOperator(task_id='op1', bash_command="{{ task.task_id }}")
Expand Down Expand Up @@ -1854,6 +1854,7 @@ def test_get_rendered_k8s_spec(self):
],
'image': ':',
'name': 'base',
'env': [{'name': 'AIRFLOW_IS_K8S_EXECUTOR_POD', 'value': 'True'}],
}
]
},
Expand Down

0 comments on commit 4e362c1

Please sign in to comment.