Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airflow] Add lint rule to show error for removed context variables in airflow #15144

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR302_context.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a case where a function should not raise a warning?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added two of them but i can add more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import pendulum
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from airflow.plugins_manager import AirflowPlugin
from airflow.decorators import task, get_current_context
from airflow.models.baseoperator import BaseOperator
from airflow.decorators import dag, task
from airflow.providers.standard.operators.python import PythonOperator


def access_invalid_key_in_context(**context):
print("access invalid key", context["conf"])


@task
def access_invalid_key_task_out_of_dag(**context):
print("access invalid key", context.get("conf"))



@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=[""],
)
def invalid_dag():
@task()
def access_invalid_key_task(**context):
print("access invalid key", context.get("conf"))

task1 = PythonOperator(
task_id="task1",
python_callable=access_invalid_key_in_context,
)
access_invalid_key_task() >> task1
access_invalid_key_task_out_of_dag()


invalid_dag()

@task
def print_config(**context):
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
# This should not throw an error as logical_date is part of airflow context.
logical_date = context["logical_date"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunank200 and I discussed this earlier. What we're trying to check is whether there's a variable named as context in a function (most commonly seen in taskflow and python operator) and whether it's can be accessed like a dict with the keys we want to check. I think it's unlikely users are using something like this out of the airflow context. But would like to know whether there's any concern

@MichaReiser @uranusjr

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added logic for other ways to access context value as well. It is part of tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s probably better to detect

  1. Arguments of a function decorated with @task (either ** or simple named arguments). (As a follow-up, any functions called by such a function)
  2. The execute function of a BaseOperator subclass (As a follow-up, any functions called by execute)
  3. The dict returned by get_current_context.

This should be better than detecting with variable name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about python_callable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think python_callable takes the context though? It only accepts things you provide in self.op_args and self.op_kwargs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it'll be useful to guard this check by first verifying that the parameter is coming from a function which is decorated with a @task.

I think this can be done as a pre-check for context variables by using the checker.semantic().current_statements() method to traverse up the AST to find the function definition node and checking whether the function has a @task decorator that originates from the airflow module.

/// Returns an [`Iterator`] over the current statement hierarchy, from the current [`Stmt`]
/// through to any parents.
pub fn current_statements(&self) -> impl Iterator<Item = &'a Stmt> + '_ {
let id = self.node_id.expect("No current node");
self.nodes
.ancestor_ids(id)
.filter_map(move |id| self.nodes[id].as_statement())
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think python_callable takes the context though? It only accepts things you provide in self.op_args and self.op_kwargs.

I though we can still get it in the python_callable? https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonoperator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm OK I didn’t even realise you can do that… yeah in that case it’s probably a good idea to also detect python_callable arguments.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the logic for named argument and function decorated with @task


sunank200 marked this conversation as resolved.
Show resolved Hide resolved
# Removed usage - should trigger violations
execution_date = context["execution_date"]
next_ds = context["next_ds"]
next_ds_nodash = context["next_ds_nodash"]
next_execution_date = context["next_execution_date"]
prev_ds = context["prev_ds"]
prev_ds_nodash = context["prev_ds_nodash"]
prev_execution_date = context["prev_execution_date"]
prev_execution_date_success = context["prev_execution_date_success"]
tomorrow_ds = context["tomorrow_ds"]
yesterday_ds = context["yesterday_ds"]
yesterday_ds_nodash = context["yesterday_ds_nodash"]

with DAG(
dag_id="example_dag",
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
template_searchpath=["/templates"],
) as dag:
task1 = DummyOperator(
task_id="task1",
params={
# Removed variables in template
"execution_date": "{{ execution_date }}",
"next_ds": "{{ next_ds }}",
"prev_ds": "{{ prev_ds }}"
},
)

class CustomMacrosPlugin(AirflowPlugin):
name = "custom_macros"
macros = {
"execution_date_macro": lambda context: context["execution_date"],
"next_ds_macro": lambda context: context["next_ds"]
}

@task
def print_config():
context = get_current_context()
execution_date = context["execution_date"]
next_ds = context["next_ds"]
next_ds_nodash = context["next_ds_nodash"]
next_execution_date = context["next_execution_date"]
prev_ds = context["prev_ds"]
prev_ds_nodash = context["prev_ds_nodash"]
prev_execution_date = context["prev_execution_date"]
prev_execution_date_success = context["prev_execution_date_success"]
tomorrow_ds = context["tomorrow_ds"]
yesterday_ds = context["yesterday_ds"]
yesterday_ds_nodash = context["yesterday_ds_nodash"]

class CustomOperator(BaseOperator):
def execute(self, context):
execution_date = context["execution_date"]
next_ds = context["next_ds"]
next_ds_nodash = context["next_ds_nodash"]
next_execution_date = context["next_execution_date"]
prev_ds = context["prev_ds"]
prev_ds_nodash = context["prev_ds_nodash"]
prev_execution_date = context["prev_execution_date"]
prev_execution_date_success = context["prev_execution_date_success"]
tomorrow_ds = context["tomorrow_ds"]
yesterday_ds = context["yesterday_ds"]
yesterday_ds_nodash = context["yesterday_ds_nodash"]

@task
def access_invalid_argument_task_out_of_dag(execution_date, **context):
print("execution date", execution_date)
print("access invalid key", context.get("conf"))

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print(ds)
print(kwargs.get("tomorrow_ds"))

run_this = print_context()
4 changes: 3 additions & 1 deletion crates/ruff_linter/src/checkers/ast/analyze/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
if checker.enabled(Rule::NonPEP646Unpack) {
pyupgrade::rules::use_pep646_unpack(checker, subscript);
}

if checker.enabled(Rule::Airflow3Removal) {
airflow::rules::removed_in_3(checker, expr);
}
pandas_vet::rules::subscript(checker, value, expr);
}
Expr::Tuple(ast::ExprTuple {
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/rules/airflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod tests {
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_names.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_class_attribute.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_airflow_plugin.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_context.py"))]
#[test_case(Rule::Airflow3MovedToProvider, Path::new("AIR303.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());
Expand Down
163 changes: 160 additions & 3 deletions crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::checkers::ast::Checker;
use ruff_diagnostics::{Diagnostic, Edit, Fix, FixAvailability, Violation};
use ruff_macros::{derive_message_formats, ViolationMetadata};
use ruff_python_ast::helpers::map_callable;
use ruff_python_ast::{
name::QualifiedName, Arguments, Expr, ExprAttribute, ExprCall, ExprContext, ExprName,
StmtClassDef,
ExprStringLiteral, ExprSubscript, Stmt, StmtClassDef, StmtFunctionDef,
};
use ruff_python_semantic::analyze::typing;
use ruff_python_semantic::Modules;
use ruff_python_semantic::ScopeKind;
use ruff_text_size::Ranged;
use ruff_text_size::TextRange;

use crate::checkers::ast::Checker;

/// ## What it does
/// Checks for uses of deprecated Airflow functions and values.
///
Expand Down Expand Up @@ -71,6 +71,63 @@ impl Violation for Airflow3Removal {
}
}

const REMOVED_CONTEXT_KEYS: [&str; 12] = [
"conf",
"execution_date",
"next_ds",
"next_ds_nodash",
"next_execution_date",
"prev_ds",
"prev_ds_nodash",
"prev_execution_date",
"prev_execution_date_success",
"tomorrow_ds",
"yesterday_ds",
"yesterday_ds_nodash",
];

fn extract_name_from_slice(slice: &Expr) -> Option<String> {
match slice {
Expr::StringLiteral(ExprStringLiteral { value, .. }) => Some(value.to_string()),
_ => None,
}
}

pub(crate) fn removed_context_variable(checker: &mut Checker, expr: &Expr) {
if let Expr::Subscript(ExprSubscript { value, slice, .. }) = expr {
if let Expr::Name(ExprName { id, .. }) = &**value {
if id.as_str() == "context" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to support code like this?

c = get_current_context()
c["execution_date"]

We should probably not hard code the variable name.

if let Some(key) = extract_name_from_slice(slice) {
if REMOVED_CONTEXT_KEYS.contains(&key.as_str()) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: key,
replacement: Replacement::None,
},
slice.range(),
));
}
}
}
}
}

if let Expr::StringLiteral(ExprStringLiteral { value, .. }) = expr {
let value_str = value.to_string();
for key in REMOVED_CONTEXT_KEYS {
if value_str.contains(&format!("{{{{ {key} }}}}")) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: key.to_string(),
replacement: Replacement::None,
},
expr.range(),
));
}
}
}
}

/// AIR302
pub(crate) fn removed_in_3(checker: &mut Checker, expr: &Expr) {
if !checker.semantic().seen_module(Modules::AIRFLOW) {
Expand All @@ -87,6 +144,7 @@ pub(crate) fn removed_in_3(checker: &mut Checker, expr: &Expr) {
check_call_arguments(checker, &qualname, arguments);
};
check_method(checker, call_expr);
check_context_get(checker, call_expr);
}
Expr::Attribute(attribute_expr @ ExprAttribute { attr, .. }) => {
check_name(checker, expr, attr.range());
Expand All @@ -100,6 +158,9 @@ pub(crate) fn removed_in_3(checker: &mut Checker, expr: &Expr) {
}
}
}
Expr::Subscript(_) => {
removed_context_variable(checker, expr);
}
_ => {}
}
}
Expand Down Expand Up @@ -247,6 +308,50 @@ fn check_class_attribute(checker: &mut Checker, attribute_expr: &ExprAttribute)
}
}

/// Check whether a removed context key is access through context.get("key").
///
/// ```python
/// from airflow.decorators import task
///
///
/// @task
/// def access_invalid_key_task_out_of_dag(**context):
/// print("access invalid key", context.get("conf"))
/// ```
fn check_context_get(checker: &mut Checker, call_expr: &ExprCall) {
if is_task_context_referenced(checker, &call_expr.func) {
return;
}

let Expr::Attribute(ExprAttribute { value, attr, .. }) = &*call_expr.func else {
return;
};

if !value
.as_name_expr()
.is_some_and(|name| matches!(name.id.as_str(), "context" | "kwargs"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, I think we should just check for all names (as long as it’s **).

{
return;
}

if attr.as_str() != "get" {
return;
}

for removed_key in REMOVED_CONTEXT_KEYS {
if let Some(argument) = call_expr.arguments.find_argument_value(removed_key, 0) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: removed_key.to_string(),
replacement: Replacement::None,
},
argument.range(),
));
return;
}
}
}

/// Check whether a removed Airflow class method is called.
///
/// For example:
Expand Down Expand Up @@ -849,3 +954,55 @@ fn is_airflow_builtin_or_provider(segments: &[&str], module: &str, symbol_suffix
_ => false,
}
}

fn is_task_context_referenced(checker: &mut Checker, expr: &Expr) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what this function is suppose to do? I'm confused as to why this function is also looping over the REMOVED_CONTEXT_ARGS and adding the diagnostics when the callee also does something similar. Additionally, the name and the return type of this function suggests that it checks for a condition but it's adding diagnostics as well.

let parents: Vec<_> = checker.semantic().current_statements().collect();

for stmt in parents {
Comment on lines +959 to +961
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid allocating a vector here as we can directly iterate over the statement tree like so:

for stmt in checker.semantic().current_statements() {
	// ...
}

if let Stmt::FunctionDef(function_def) = stmt {
if is_decorated_with(checker, function_def) {
let arguments = extract_task_function_arguments(function_def);

for deprecated_arg in REMOVED_CONTEXT_KEYS {
if arguments.contains(&deprecated_arg.to_string()) {
checker.diagnostics.push(Diagnostic::new(
Airflow3Removal {
deprecated: deprecated_arg.to_string(),
replacement: Replacement::None,
},
expr.range(),
));
return true;
}
}
}
}
Comment on lines +962 to +979
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid multiple indentation levels by using let ... else ... which improves readability like so:

        let Stmt::FunctionDef(function_def) = stmt else {
            continue;
        };

        if !is_decorated_with(checker, function_def) {
            continue;
        }

        let arguments = extract_task_function_arguments(function_def);

        for deprecated_arg in REMOVED_CONTEXT_KEYS {
            if arguments.contains(&deprecated_arg.to_string()) {
                checker.diagnostics.push(Diagnostic::new(
                    Airflow3Removal {
                        deprecated: deprecated_arg.to_string(),
                        replacement: Replacement::None,
                    },
                    expr.range(),
                ));
                return true;
            }
        }

}

false
}

fn extract_task_function_arguments(stmt: &StmtFunctionDef) -> Vec<String> {
let mut arguments = Vec::new();
Comment on lines +985 to +986
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could avoid this allocation but I would like to first understand what does is_task_context_referenced function is suppose to do.


for param in &stmt.parameters.args {
arguments.push(param.parameter.name.to_string());
}

if let Some(vararg) = &stmt.parameters.kwarg {
arguments.push(format!("**{}", vararg.name));
}

arguments
}

fn is_decorated_with(checker: &mut Checker, stmt: &StmtFunctionDef) -> bool {
stmt.decorator_list.iter().any(|decorator| {
checker
.semantic()
.resolve_qualified_name(map_callable(&decorator.expression))
.is_some_and(|qualified_name| {
matches!(qualified_name.segments(), ["airflow", "decorators", "task"])
})
})
}
Comment on lines +999 to +1008
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the name of the function, did you mean to make this a generic function over any decorator originating from airflow.decorators.* ? If not, can you rename this to has_task_decorator ?

Loading
Loading