diff --git a/providers/src/airflow/providers/fab/auth_manager/security_manager/override.py b/providers/src/airflow/providers/fab/auth_manager/security_manager/override.py index d0e00b0977ce8..5c8b58c2139d6 100644 --- a/providers/src/airflow/providers/fab/auth_manager/security_manager/override.py +++ b/providers/src/airflow/providers/fab/auth_manager/security_manager/override.py @@ -110,9 +110,9 @@ from airflow.providers.fab.www.security import permissions from airflow.providers.fab.www.security_manager import AirflowSecurityManagerV2 from airflow.providers.fab.www.session import ( + AirflowDatabaseSessionInterface, AirflowDatabaseSessionInterface as FabAirflowDatabaseSessionInterface, ) -from airflow.www.session import AirflowDatabaseSessionInterface if TYPE_CHECKING: from airflow.providers.fab.www.security.permissions import RESOURCE_ASSET diff --git a/providers/src/airflow/providers/fab/www/app.py b/providers/src/airflow/providers/fab/www/app.py index 0414fc5e408b5..43dd36742997d 100644 --- a/providers/src/airflow/providers/fab/www/app.py +++ b/providers/src/airflow/providers/fab/www/app.py @@ -17,22 +17,25 @@ # under the License. from __future__ import annotations -from os.path import isabs - from flask import Flask from flask_appbuilder import SQLA from flask_wtf.csrf import CSRFProtect -from sqlalchemy.engine.url import make_url from airflow import settings from airflow.configuration import conf -from airflow.exceptions import AirflowConfigException from airflow.logging_config import configure_logging from airflow.providers.fab.www.extensions.init_appbuilder import init_appbuilder from airflow.providers.fab.www.extensions.init_jinja_globals import init_jinja_globals from airflow.providers.fab.www.extensions.init_manifest_files import configure_manifest_files from airflow.providers.fab.www.extensions.init_security import init_api_auth, init_xframe_protection -from airflow.providers.fab.www.extensions.init_views import init_error_handlers, init_plugins +from airflow.providers.fab.www.extensions.init_views import ( + init_api_auth_provider, + init_api_connexion, + init_api_error_handlers, + init_error_handlers, + init_plugins, +) +from airflow.utils.json import AirflowJsonProvider app: Flask | None = None @@ -41,44 +44,55 @@ csrf = CSRFProtect() -def create_app(): +def create_app(config=None, testing=False): """Create a new instance of Airflow WWW app.""" flask_app = Flask(__name__) flask_app.secret_key = conf.get("webserver", "SECRET_KEY") + webserver_config = conf.get_mandatory_value("webserver", "config_file") + # Enable customizations in webserver_config.py to be applied via Flask.current_app. + with flask_app.app_context(): + flask_app.config.from_pyfile(webserver_config, silent=True) + + flask_app.config["TESTING"] = testing flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", "SQL_ALCHEMY_CONN") - url = make_url(flask_app.config["SQLALCHEMY_DATABASE_URI"]) - if url.drivername == "sqlite" and url.database and not isabs(url.database): - raise AirflowConfigException( - f'Cannot use relative path: `{conf.get("database", "SQL_ALCHEMY_CONN")}` to connect to sqlite. ' - "Please use absolute path such as `sqlite:////tmp/airflow.db`." - ) + if config: + flask_app.config.from_mapping(config) if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config: flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args() + # Configure the JSON encoder used by `|tojson` filter from Flask + flask_app.json_provider_class = AirflowJsonProvider + flask_app.json = AirflowJsonProvider(flask_app) + + csrf.init_app(flask_app) + db = SQLA() db.session = settings.Session db.init_app(flask_app) + init_api_auth(flask_app) configure_logging() configure_manifest_files(flask_app) - init_api_auth(flask_app) with flask_app.app_context(): init_appbuilder(flask_app) init_plugins(flask_app) + init_api_auth_provider(flask_app) init_error_handlers(flask_app) + init_api_connexion(flask_app) + init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first init_jinja_globals(flask_app) init_xframe_protection(flask_app) return flask_app -def cached_app(): +def cached_app(config=None, testing=False): """Return cached instance of Airflow WWW app.""" global app if not app: - app = create_app() + app = create_app(config=config, testing=testing) return app diff --git a/providers/src/airflow/providers/fab/www/auth.py b/providers/src/airflow/providers/fab/www/auth.py new file mode 100644 index 0000000000000..198acb29f9a69 --- /dev/null +++ b/providers/src/airflow/providers/fab/www/auth.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from functools import wraps +from typing import TYPE_CHECKING, Callable, TypeVar, cast + +from flask import flash, redirect, render_template, request, url_for + +from airflow.api_fastapi.app import get_auth_manager +from airflow.auth.managers.models.resource_details import ( + AccessView, + DagAccessEntity, + DagDetails, +) +from airflow.configuration import conf +from airflow.utils.net import get_hostname + +if TYPE_CHECKING: + from airflow.auth.managers.base_auth_manager import ResourceMethod + +T = TypeVar("T", bound=Callable) + +log = logging.getLogger(__name__) + + +def get_access_denied_message(): + return conf.get("webserver", "access_denied_message") + + +def _has_access(*, is_authorized: bool, func: Callable, args, kwargs): + """ + Define the behavior whether the user is authorized to access the resource. + + :param is_authorized: whether the user is authorized to access the resource + :param func: the function to call if the user is authorized + :param args: the arguments of ``func`` + :param kwargs: the keyword arguments ``func`` + + :meta private: + """ + if is_authorized: + return func(*args, **kwargs) + elif get_auth_manager().is_logged_in() and not get_auth_manager().is_authorized_view( + access_view=AccessView.WEBSITE + ): + return ( + render_template( + "airflow/no_roles_permissions.html", + hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "", + logout_url=get_auth_manager().get_url_logout(), + ), + 403, + ) + elif not get_auth_manager().is_logged_in(): + return redirect(get_auth_manager().get_url_login(next_url=request.url)) + else: + access_denied = get_access_denied_message() + flash(access_denied, "danger") + return redirect(url_for("Airflow.index")) + + +def has_access_dag(method: ResourceMethod, access_entity: DagAccessEntity | None = None) -> Callable[[T], T]: + def has_access_decorator(func: T): + @wraps(func) + def decorated(*args, **kwargs): + dag_id_kwargs = kwargs.get("dag_id") + dag_id_args = request.args.get("dag_id") + dag_id_form = request.form.get("dag_id") + dag_id_json = request.json.get("dag_id") if request.is_json else None + all_dag_ids = [dag_id_kwargs, dag_id_args, dag_id_form, dag_id_json] + unique_dag_ids = set(dag_id for dag_id in all_dag_ids if dag_id is not None) + + if len(unique_dag_ids) > 1: + log.warning( + "There are different dag_ids passed in the request: %s. Returning 403.", unique_dag_ids + ) + log.warning( + "kwargs: %s, args: %s, form: %s, json: %s", + dag_id_kwargs, + dag_id_args, + dag_id_form, + dag_id_json, + ) + return ( + render_template( + "airflow/no_roles_permissions.html", + hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "", + logout_url=get_auth_manager().get_url_logout(), + ), + 403, + ) + dag_id = unique_dag_ids.pop() if unique_dag_ids else None + + is_authorized = get_auth_manager().is_authorized_dag( + method=method, + access_entity=access_entity, + details=None if not dag_id else DagDetails(id=dag_id), + ) + + return _has_access( + is_authorized=is_authorized, + func=func, + args=args, + kwargs=kwargs, + ) + + return cast(T, decorated) + + return has_access_decorator diff --git a/providers/src/airflow/providers/fab/www/extensions/init_appbuilder.py b/providers/src/airflow/providers/fab/www/extensions/init_appbuilder.py index 9cf353490c3ac..ce2d559d3ad2a 100644 --- a/providers/src/airflow/providers/fab/www/extensions/init_appbuilder.py +++ b/providers/src/airflow/providers/fab/www/extensions/init_appbuilder.py @@ -39,7 +39,7 @@ from flask_appbuilder.views import IndexView from airflow import settings -from airflow.api_fastapi.app import create_auth_manager +from airflow.api_fastapi.app import create_auth_manager, get_auth_manager from airflow.configuration import conf from airflow.providers.fab.www.security_manager import AirflowSecurityManagerV2 @@ -283,6 +283,8 @@ def _add_admin_views(self): self.indexview = self._check_and_init(self.indexview) self.add_view_no_menu(self.indexview) + get_auth_manager().register_views() + def _add_addon_views(self): """Register declared addons.""" for addon in self._addon_managers: diff --git a/providers/src/airflow/providers/fab/www/extensions/init_views.py b/providers/src/airflow/providers/fab/www/extensions/init_views.py index 382bcaf9ca748..e8e6c6fa6c41a 100644 --- a/providers/src/airflow/providers/fab/www/extensions/init_views.py +++ b/providers/src/airflow/providers/fab/www/extensions/init_views.py @@ -18,17 +18,47 @@ import logging from functools import cached_property +from pathlib import Path from typing import TYPE_CHECKING -from connexion import Resolver +from connexion import FlaskApi, Resolver from connexion.decorators.validation import RequestBodyValidator -from connexion.exceptions import BadRequestProblem +from connexion.exceptions import BadRequestProblem, ProblemException +from flask import request + +from airflow.api_connexion.exceptions import common_error_handler +from airflow.api_fastapi.app import get_auth_manager +from airflow.configuration import conf +from airflow.providers.fab.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED +from airflow.utils.yaml import safe_load if TYPE_CHECKING: from flask import Flask log = logging.getLogger(__name__) +# providers/src/airflow/providers/fab/www/extensions/init_views.py => airflow/ +ROOT_APP_DIR = Path(__file__).parents[7].joinpath("airflow").resolve() + + +def set_cors_headers_on_response(response): + """Add response headers.""" + allow_headers = conf.get("api", "access_control_allow_headers") + allow_methods = conf.get("api", "access_control_allow_methods") + allow_origins = conf.get("api", "access_control_allow_origins") + if allow_headers: + response.headers["Access-Control-Allow-Headers"] = allow_headers + if allow_methods: + response.headers["Access-Control-Allow-Methods"] = allow_methods + if allow_origins == "*": + response.headers["Access-Control-Allow-Origin"] = "*" + elif allow_origins: + allowed_origins = allow_origins.split(" ") + origin = request.environ.get("HTTP_ORIGIN", allowed_origins[0]) + if origin in allowed_origins: + response.headers["Access-Control-Allow-Origin"] = origin + return response + class _LazyResolution: """ @@ -78,6 +108,59 @@ def validate_schema(self, data, url): return super().validate_schema(data, url) +base_paths: list[str] = [] # contains the list of base paths that have api endpoints + + +def init_api_error_handlers(app: Flask) -> None: + """Add error handlers for 404 and 405 errors for existing API paths.""" + + @app.errorhandler(404) + def _handle_api_not_found(ex): + if any([request.path.startswith(p) for p in base_paths]): + # 404 errors are never handled on the blueprint level + # unless raised from a view func so actual 404 errors, + # i.e. "no route for it" defined, need to be handled + # here on the application level + return common_error_handler(ex) + else: + from airflow.providers.fab.www.views import not_found + + return not_found(ex) + + @app.errorhandler(405) + def _handle_method_not_allowed(ex): + if any([request.path.startswith(p) for p in base_paths]): + return common_error_handler(ex) + else: + from airflow.providers.fab.www.views import method_not_allowed + + return method_not_allowed(ex) + + app.register_error_handler(ProblemException, common_error_handler) + + +def init_api_connexion(app: Flask) -> None: + """Initialize Stable API.""" + base_path = "/api/v1" + base_paths.append(base_path) + + with ROOT_APP_DIR.joinpath("api_connexion", "openapi", "v1.yaml").open() as f: + specification = safe_load(f) + api_bp = FlaskApi( + specification=specification, + resolver=_LazyResolver(), + base_path=base_path, + options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()}, + strict_validation=True, + validate_responses=True, + validator_map={"body": _CustomErrorRequestBodyValidator}, + ).blueprint + api_bp.after_request(set_cors_headers_on_response) + + app.register_blueprint(api_bp) + app.extensions["csrf"].exempt(api_bp) + + def init_plugins(app): """Integrate Flask and FAB with plugins.""" from airflow import plugins_manager @@ -118,3 +201,13 @@ def init_error_handlers(app: Flask): app.register_error_handler(500, views.show_traceback) app.register_error_handler(404, views.not_found) + + +def init_api_auth_provider(app): + """Initialize the API offered by the auth manager.""" + auth_mgr = get_auth_manager() + blueprint = auth_mgr.get_api_endpoints() + if blueprint: + base_paths.append(blueprint.url_prefix) + app.register_blueprint(blueprint) + app.extensions["csrf"].exempt(blueprint) diff --git a/providers/src/airflow/providers/fab/www/templates/airflow/no_roles_permissions.html b/providers/src/airflow/providers/fab/www/templates/airflow/no_roles_permissions.html new file mode 100644 index 0000000000000..fa619c403c030 --- /dev/null +++ b/providers/src/airflow/providers/fab/www/templates/airflow/no_roles_permissions.html @@ -0,0 +1,42 @@ +{# + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +#} + + + +
+Unfortunately your user has no roles, and therefore you cannot use Airflow.
+Please contact your Airflow administrator + (authentication + may be misconfigured) or +
+ +{{ hostname }}
+