Skip to content

Commit

Permalink
Do not use core Airflow Flask related resources in FAB provider (test…
Browse files Browse the repository at this point in the history
…s of `www`)
  • Loading branch information
vincbeck committed Jan 8, 2025
1 parent adbe4e2 commit f5b453c
Show file tree
Hide file tree
Showing 19 changed files with 324 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@
from airflow.providers.fab.www.security import permissions
from airflow.providers.fab.www.security_manager import AirflowSecurityManagerV2
from airflow.providers.fab.www.session import (
AirflowDatabaseSessionInterface,
AirflowDatabaseSessionInterface as FabAirflowDatabaseSessionInterface,
)
from airflow.www.session import AirflowDatabaseSessionInterface

if TYPE_CHECKING:
from airflow.providers.fab.www.security.permissions import RESOURCE_ASSET
Expand Down
44 changes: 29 additions & 15 deletions providers/src/airflow/providers/fab/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@
# under the License.
from __future__ import annotations

from os.path import isabs

from flask import Flask
from flask_appbuilder import SQLA
from flask_wtf.csrf import CSRFProtect
from sqlalchemy.engine.url import make_url

from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.logging_config import configure_logging
from airflow.providers.fab.www.extensions.init_appbuilder import init_appbuilder
from airflow.providers.fab.www.extensions.init_jinja_globals import init_jinja_globals
from airflow.providers.fab.www.extensions.init_manifest_files import configure_manifest_files
from airflow.providers.fab.www.extensions.init_security import init_api_auth, init_xframe_protection
from airflow.providers.fab.www.extensions.init_views import init_error_handlers, init_plugins
from airflow.providers.fab.www.extensions.init_views import (
init_api_auth_provider,
init_api_connexion,
init_api_error_handlers,
init_error_handlers,
init_plugins,
)
from airflow.utils.json import AirflowJsonProvider

app: Flask | None = None

Expand All @@ -41,44 +44,55 @@
csrf = CSRFProtect()


def create_app():
def create_app(config=None, testing=False):
"""Create a new instance of Airflow WWW app."""
flask_app = Flask(__name__)
flask_app.secret_key = conf.get("webserver", "SECRET_KEY")
webserver_config = conf.get_mandatory_value("webserver", "config_file")
# Enable customizations in webserver_config.py to be applied via Flask.current_app.
with flask_app.app_context():
flask_app.config.from_pyfile(webserver_config, silent=True)

flask_app.config["TESTING"] = testing
flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", "SQL_ALCHEMY_CONN")

url = make_url(flask_app.config["SQLALCHEMY_DATABASE_URI"])
if url.drivername == "sqlite" and url.database and not isabs(url.database):
raise AirflowConfigException(
f'Cannot use relative path: `{conf.get("database", "SQL_ALCHEMY_CONN")}` to connect to sqlite. '
"Please use absolute path such as `sqlite:////tmp/airflow.db`."
)
if config:
flask_app.config.from_mapping(config)

if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args()

# Configure the JSON encoder used by `|tojson` filter from Flask
flask_app.json_provider_class = AirflowJsonProvider
flask_app.json = AirflowJsonProvider(flask_app)

csrf.init_app(flask_app)

db = SQLA()
db.session = settings.Session
db.init_app(flask_app)

init_api_auth(flask_app)
configure_logging()
configure_manifest_files(flask_app)
init_api_auth(flask_app)

with flask_app.app_context():
init_appbuilder(flask_app)
init_plugins(flask_app)
init_api_auth_provider(flask_app)
init_error_handlers(flask_app)
init_api_connexion(flask_app)
init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first
init_jinja_globals(flask_app)
init_xframe_protection(flask_app)
return flask_app


def cached_app():
def cached_app(config=None, testing=False):
"""Return cached instance of Airflow WWW app."""
global app
if not app:
app = create_app()
app = create_app(config=config, testing=testing)
return app


Expand Down
125 changes: 125 additions & 0 deletions providers/src/airflow/providers/fab/www/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from functools import wraps
from typing import TYPE_CHECKING, Callable, TypeVar, cast

from flask import flash, redirect, render_template, request, url_for

from airflow.api_fastapi.app import get_auth_manager
from airflow.auth.managers.models.resource_details import (
AccessView,
DagAccessEntity,
DagDetails,
)
from airflow.configuration import conf
from airflow.utils.net import get_hostname

if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import ResourceMethod

T = TypeVar("T", bound=Callable)

log = logging.getLogger(__name__)


def get_access_denied_message():
return conf.get("webserver", "access_denied_message")


def _has_access(*, is_authorized: bool, func: Callable, args, kwargs):
"""
Define the behavior whether the user is authorized to access the resource.
:param is_authorized: whether the user is authorized to access the resource
:param func: the function to call if the user is authorized
:param args: the arguments of ``func``
:param kwargs: the keyword arguments ``func``
:meta private:
"""
if is_authorized:
return func(*args, **kwargs)
elif get_auth_manager().is_logged_in() and not get_auth_manager().is_authorized_view(
access_view=AccessView.WEBSITE
):
return (
render_template(
"airflow/no_roles_permissions.html",
hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "",
logout_url=get_auth_manager().get_url_logout(),
),
403,
)
elif not get_auth_manager().is_logged_in():
return redirect(get_auth_manager().get_url_login(next_url=request.url))
else:
access_denied = get_access_denied_message()
flash(access_denied, "danger")
return redirect(url_for("Airflow.index"))


def has_access_dag(method: ResourceMethod, access_entity: DagAccessEntity | None = None) -> Callable[[T], T]:
def has_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
dag_id_kwargs = kwargs.get("dag_id")
dag_id_args = request.args.get("dag_id")
dag_id_form = request.form.get("dag_id")
dag_id_json = request.json.get("dag_id") if request.is_json else None
all_dag_ids = [dag_id_kwargs, dag_id_args, dag_id_form, dag_id_json]
unique_dag_ids = set(dag_id for dag_id in all_dag_ids if dag_id is not None)

if len(unique_dag_ids) > 1:
log.warning(
"There are different dag_ids passed in the request: %s. Returning 403.", unique_dag_ids
)
log.warning(
"kwargs: %s, args: %s, form: %s, json: %s",
dag_id_kwargs,
dag_id_args,
dag_id_form,
dag_id_json,
)
return (
render_template(
"airflow/no_roles_permissions.html",
hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "",
logout_url=get_auth_manager().get_url_logout(),
),
403,
)
dag_id = unique_dag_ids.pop() if unique_dag_ids else None

is_authorized = get_auth_manager().is_authorized_dag(
method=method,
access_entity=access_entity,
details=None if not dag_id else DagDetails(id=dag_id),
)

return _has_access(
is_authorized=is_authorized,
func=func,
args=args,
kwargs=kwargs,
)

return cast(T, decorated)

return has_access_decorator
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from flask_appbuilder.views import IndexView

from airflow import settings
from airflow.api_fastapi.app import create_auth_manager
from airflow.api_fastapi.app import create_auth_manager, get_auth_manager
from airflow.configuration import conf
from airflow.providers.fab.www.security_manager import AirflowSecurityManagerV2

Expand Down Expand Up @@ -283,6 +283,8 @@ def _add_admin_views(self):
self.indexview = self._check_and_init(self.indexview)
self.add_view_no_menu(self.indexview)

get_auth_manager().register_views()

def _add_addon_views(self):
"""Register declared addons."""
for addon in self._addon_managers:
Expand Down
97 changes: 95 additions & 2 deletions providers/src/airflow/providers/fab/www/extensions/init_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,47 @@

import logging
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING

from connexion import Resolver
from connexion import FlaskApi, Resolver
from connexion.decorators.validation import RequestBodyValidator
from connexion.exceptions import BadRequestProblem
from connexion.exceptions import BadRequestProblem, ProblemException
from flask import request

from airflow.api_connexion.exceptions import common_error_handler
from airflow.api_fastapi.app import get_auth_manager
from airflow.configuration import conf
from airflow.providers.fab.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
from airflow.utils.yaml import safe_load

if TYPE_CHECKING:
from flask import Flask

log = logging.getLogger(__name__)

# providers/src/airflow/providers/fab/www/extensions/init_views.py => airflow/
ROOT_APP_DIR = Path(__file__).parents[7].joinpath("airflow").resolve()


def set_cors_headers_on_response(response):
"""Add response headers."""
allow_headers = conf.get("api", "access_control_allow_headers")
allow_methods = conf.get("api", "access_control_allow_methods")
allow_origins = conf.get("api", "access_control_allow_origins")
if allow_headers:
response.headers["Access-Control-Allow-Headers"] = allow_headers
if allow_methods:
response.headers["Access-Control-Allow-Methods"] = allow_methods
if allow_origins == "*":
response.headers["Access-Control-Allow-Origin"] = "*"
elif allow_origins:
allowed_origins = allow_origins.split(" ")
origin = request.environ.get("HTTP_ORIGIN", allowed_origins[0])
if origin in allowed_origins:
response.headers["Access-Control-Allow-Origin"] = origin
return response


class _LazyResolution:
"""
Expand Down Expand Up @@ -78,6 +108,59 @@ def validate_schema(self, data, url):
return super().validate_schema(data, url)


base_paths: list[str] = [] # contains the list of base paths that have api endpoints


def init_api_error_handlers(app: Flask) -> None:
"""Add error handlers for 404 and 405 errors for existing API paths."""

@app.errorhandler(404)
def _handle_api_not_found(ex):
if any([request.path.startswith(p) for p in base_paths]):
# 404 errors are never handled on the blueprint level
# unless raised from a view func so actual 404 errors,
# i.e. "no route for it" defined, need to be handled
# here on the application level
return common_error_handler(ex)
else:
from airflow.providers.fab.www.views import not_found

return not_found(ex)

@app.errorhandler(405)
def _handle_method_not_allowed(ex):
if any([request.path.startswith(p) for p in base_paths]):
return common_error_handler(ex)
else:
from airflow.providers.fab.www.views import method_not_allowed

return method_not_allowed(ex)

app.register_error_handler(ProblemException, common_error_handler)


def init_api_connexion(app: Flask) -> None:
"""Initialize Stable API."""
base_path = "/api/v1"
base_paths.append(base_path)

with ROOT_APP_DIR.joinpath("api_connexion", "openapi", "v1.yaml").open() as f:
specification = safe_load(f)
api_bp = FlaskApi(
specification=specification,
resolver=_LazyResolver(),
base_path=base_path,
options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
strict_validation=True,
validate_responses=True,
validator_map={"body": _CustomErrorRequestBodyValidator},
).blueprint
api_bp.after_request(set_cors_headers_on_response)

app.register_blueprint(api_bp)
app.extensions["csrf"].exempt(api_bp)


def init_plugins(app):
"""Integrate Flask and FAB with plugins."""
from airflow import plugins_manager
Expand Down Expand Up @@ -118,3 +201,13 @@ def init_error_handlers(app: Flask):

app.register_error_handler(500, views.show_traceback)
app.register_error_handler(404, views.not_found)


def init_api_auth_provider(app):
"""Initialize the API offered by the auth manager."""
auth_mgr = get_auth_manager()
blueprint = auth_mgr.get_api_endpoints()
if blueprint:
base_paths.append(blueprint.url_prefix)
app.register_blueprint(blueprint)
app.extensions["csrf"].exempt(blueprint)
Loading

0 comments on commit f5b453c

Please sign in to comment.