Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for keyring errors when initializing Flyte for_sandbox config client #2962

Merged
merged 43 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
21ebec7
Fix for keyring errors when initializing Flyte for_sandbox config cli…
taieeuu Nov 25, 2024
f358477
no_msg
taieeuu Nov 27, 2024
77bc862
no_msg
taieeuu Nov 27, 2024
5c9bed0
Merge branch 'master' into issue_4354
taieeuu Nov 27, 2024
b42a90f
fix: run linting on codebase
taieeuu Nov 27, 2024
6cb2d3f
add grpc 401 comments
taieeuu Dec 3, 2024
2214e3d
fix: import
taieeuu Dec 7, 2024
a82cd3a
no_msg
taieeuu Dec 7, 2024
4475cc5
Merge branch 'master' into issue_4354
taieeuu Dec 7, 2024
8f28333
fix: update method for initializing authenticator
taieeuu Dec 22, 2024
db7cb2f
fix: update the unit_test
taieeuu Dec 22, 2024
48600a7
fix: add grpc's health check to requirements.txt
taieeuu Dec 22, 2024
2542435
no_msg
taieeuu Dec 22, 2024
3e27029
no_msg
taieeuu Dec 22, 2024
61b35fd
fix: package dependency
taieeuu Dec 23, 2024
42509fb
fix: ci dependencies
taieeuu Dec 23, 2024
0a5afe0
fix: dependencies
taieeuu Dec 23, 2024
a50078e
no_msg
taieeuu Dec 24, 2024
456cd5d
fix: lint
taieeuu Dec 24, 2024
9253dc0
fix: add dependencies
taieeuu Dec 24, 2024
c5d9426
no_msg
taieeuu Dec 24, 2024
7bb5ef3
no_msg
taieeuu Dec 24, 2024
b7e5146
no_msg
taieeuu Dec 24, 2024
b97107b
no_msg
taieeuu Dec 24, 2024
89cf6f6
no_msg
taieeuu Dec 27, 2024
17c840f
no_msg
taieeuu Dec 27, 2024
27a6527
Merge branch 'master' into issue_4354
taieeuu Dec 27, 2024
857db92
no_msg
taieeuu Dec 27, 2024
1e56ac7
no_msg
taieeuu Dec 27, 2024
34f4aba
no_msg
taieeuu Dec 27, 2024
3bbbf41
no_msg
taieeuu Dec 27, 2024
9b3916c
no_msg
taieeuu Dec 27, 2024
0028436
no_msg
taieeuu Dec 27, 2024
d3ea8e4
no_msg
taieeuu Dec 28, 2024
5c19192
no_msg
taieeuu Dec 28, 2024
cf442e6
no_msg
taieeuu Dec 28, 2024
569558e
no_msg
taieeuu Dec 28, 2024
c4817cc
no_msg
taieeuu Dec 28, 2024
3d142c3
no_msg
taieeuu Dec 28, 2024
e207a64
Add lazy loading to AuthUnaryInterceptor.
taieeuu Jan 25, 2025
3f989f7
Merge master branch
taieeuu Jan 25, 2025
1f62483
remove not use
taieeuu Jan 25, 2025
eb43e3a
feat: implements test for keyring_exception
taieeuu Feb 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/monodocs_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ jobs:
DOCSEARCH_API_KEY: fake_docsearch_api_key # must be set to get doc build to succeed
run: |
conda activate monodocs-env
pip install grpcio-health-checking==1.49.0
make -C docs clean html SPHINXOPTS="-W -vvv"
4 changes: 4 additions & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ ipykernel

orjson
kubernetes>=12.0.1

grpcio-health-checking
grpcio-tools
grpcio
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,5 @@ zipp==3.19.1

# The following packages are considered to be unsafe in a requirements file:
# setuptools

grpcio-health-checking==1.49.0
14 changes: 10 additions & 4 deletions flytekit/clients/auth_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ def upgrade_channel_to_proxy_authenticated(cfg: PlatformConfig, in_channel: grpc
:param in_channel: grpc.Channel Precreated channel
:return: grpc.Channel. New composite channel
"""

def authenticator_factory():
return get_proxy_authenticator(cfg)
Comment on lines +126 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider caching authenticator instance

Consider caching the authenticator instance instead of creating a new one on each call to authenticator_factory(). This could improve performance by avoiding unnecessary object creation.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def authenticator_factory():
return get_proxy_authenticator(cfg)
_cached_authenticator = None
def authenticator_factory():
nonlocal _cached_authenticator
if _cached_authenticator is None:
_cached_authenticator = get_proxy_authenticator(cfg)
return _cached_authenticator

Code Review Run #5371e4


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


if cfg.proxy_command:
proxy_authenticator = get_proxy_authenticator(cfg)
return grpc.intercept_channel(in_channel, AuthUnaryInterceptor(proxy_authenticator))
return grpc.intercept_channel(in_channel, AuthUnaryInterceptor(authenticator_factory))
else:
return in_channel

Expand All @@ -137,8 +140,11 @@ def upgrade_channel_to_authenticated(cfg: PlatformConfig, in_channel: grpc.Chann
:param in_channel: grpc.Channel Precreated channel
:return: grpc.Channel. New composite channel
"""
authenticator = get_authenticator(cfg, RemoteClientConfigStore(in_channel))
return grpc.intercept_channel(in_channel, AuthUnaryInterceptor(authenticator))

def authenticator_factory():
return get_authenticator(cfg, RemoteClientConfigStore(in_channel))
Comment on lines +144 to +145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider caching authenticator instance

Consider caching the authenticator instance instead of creating a new one on every call to authenticator_factory(). This could improve performance since authentication configuration is unlikely to change during runtime.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def authenticator_factory():
return get_authenticator(cfg, RemoteClientConfigStore(in_channel))
authenticator = None
def authenticator_factory():
nonlocal authenticator
if authenticator is None:
authenticator = get_authenticator(cfg, RemoteClientConfigStore(in_channel))
return authenticator

Code Review Run #5371e4


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


return grpc.intercept_channel(in_channel, AuthUnaryInterceptor(authenticator_factory))


def get_authenticated_channel(cfg: PlatformConfig) -> grpc.Channel:
Expand Down
15 changes: 12 additions & 3 deletions flytekit/clients/grpc_utils/auth_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,22 @@
is needed.
"""

def __init__(self, authenticator: Authenticator):
self._authenticator = authenticator
def __init__(self, get_authenticator: typing.Callable[[], Authenticator]):
self._get_authenticator = get_authenticator
self._authenticator = None

@property
def authenticator(self) -> Authenticator:
if self._authenticator is None:
self._authenticator = self._get_authenticator()
return self._authenticator

def _call_details_with_auth_metadata(self, client_call_details: grpc.ClientCallDetails) -> grpc.ClientCallDetails:
"""
Returns new ClientCallDetails with metadata added.
"""
metadata = client_call_details.metadata
auth_metadata = self._authenticator.fetch_grpc_call_auth_metadata()
auth_metadata = self.authenticator.fetch_grpc_call_auth_metadata()
if auth_metadata:
metadata = []
if client_call_details.metadata:
Expand Down Expand Up @@ -65,6 +72,7 @@
raise e
if e.code() == grpc.StatusCode.UNAUTHENTICATED or e.code() == grpc.StatusCode.UNKNOWN:
self._authenticator.refresh_credentials()
self.authenticator.refresh_credentials()

Check warning on line 75 in flytekit/clients/grpc_utils/auth_interceptor.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/grpc_utils/auth_interceptor.py#L75

Added line #L75 was not covered by tests
updated_call_details = self._call_details_with_auth_metadata(client_call_details)
return continuation(updated_call_details, request)
return fut
Expand All @@ -77,6 +85,7 @@
c: grpc.Call = continuation(updated_call_details, request)
if c.code() == grpc.StatusCode.UNAUTHENTICATED:
self._authenticator.refresh_credentials()
self.authenticator.refresh_credentials()

Check warning on line 88 in flytekit/clients/grpc_utils/auth_interceptor.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/grpc_utils/auth_interceptor.py#L88

Added line #L88 was not covered by tests
updated_call_details = self._call_details_with_auth_metadata(client_call_details)
return continuation(updated_call_details, request)
return c
32 changes: 32 additions & 0 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import typing

import grpc
Expand All @@ -18,6 +19,12 @@
wrap_exceptions_channel,
)
from flytekit.configuration import PlatformConfig
from flytekit.exceptions.system import FlyteSystemUnavailableException
from flytekit.exceptions.user import (
FlyteEntityAlreadyExistsException,
FlyteEntityNotExistException,
FlyteInvalidInputException,
)
from flytekit.loggers import logger


Expand Down Expand Up @@ -51,12 +58,14 @@
# 32KB for error messages, 20MB for actual messages.
options = (("grpc.max_metadata_size", 32 * 1024), ("grpc.max_receive_message_length", 20 * 1024 * 1024))
self._cfg = cfg

self._channel = wrap_exceptions_channel(
cfg,
upgrade_channel_to_authenticated(
cfg, upgrade_channel_to_proxy_authenticated(cfg, get_channel(cfg, options=options))
),
)

self._stub = _admin_service.AdminServiceStub(self._channel)
self._signal = signal_service.SignalServiceStub(self._channel)
self._dataproxy_stub = dataproxy_service.DataProxyServiceStub(self._channel)
Expand All @@ -67,6 +76,29 @@
# metadata will hold the value of the token to send to the various endpoints.
self._metadata = None

@staticmethod
def check_grpc_health_with_authentication(in_channel):
from grpc_health.v1 import health_pb2, health_pb2_grpc

Check warning on line 81 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L81

Added line #L81 was not covered by tests

health_stub = health_pb2_grpc.HealthStub(in_channel)
request = health_pb2.HealthCheckRequest()
try:
response = health_stub.Check(request)

Check warning on line 86 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L83-L86

Added lines #L83 - L86 were not covered by tests
if response.status == health_pb2.HealthCheckResponse.SERVING:
logging.info("Service is healthy and ready to serve.")
return True
except grpc.RpcError as e:

Check warning on line 90 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L88-L90

Added lines #L88 - L90 were not covered by tests
if e.code() == grpc.StatusCode.UNAUTHENTICATED:
return False

Check warning on line 92 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L92

Added line #L92 was not covered by tests
elif e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise FlyteEntityAlreadyExistsException() from e

Check warning on line 94 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L94

Added line #L94 was not covered by tests
elif e.code() == grpc.StatusCode.NOT_FOUND:
raise FlyteEntityNotExistException() from e

Check warning on line 96 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L96

Added line #L96 was not covered by tests
elif e.code() == grpc.StatusCode.INVALID_ARGUMENT:
raise FlyteInvalidInputException(request) from e

Check warning on line 98 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L98

Added line #L98 was not covered by tests
elif e.code() == grpc.StatusCode.UNAVAILABLE:
raise FlyteSystemUnavailableException() from e

Check warning on line 100 in flytekit/clients/raw.py

View check run for this annotation

Codecov / codecov/patch

flytekit/clients/raw.py#L100

Added line #L100 was not covered by tests

@classmethod
def with_root_certificate(cls, cfg: PlatformConfig, root_cert_file: str) -> RawSynchronousFlyteClient:
b = None
Expand Down
2 changes: 1 addition & 1 deletion tests/flytekit/unit/clients/test_auth_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_upgrade_channel_to_proxy_auth():
ch,
)
assert isinstance(out_ch._interceptor, AuthUnaryInterceptor)
assert isinstance(out_ch._interceptor._authenticator, CommandAuthenticator)
assert isinstance(out_ch._interceptor.authenticator, CommandAuthenticator)


def test_get_proxy_authenticated_session():
Expand Down
11 changes: 7 additions & 4 deletions tests/flytekit/unit/clients/test_friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,29 @@
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient
from flytekit.configuration import PlatformConfig
from flytekit.models.project import Project as _Project

from grpc_health.v1 import health_pb2

@mock.patch("flytekit.clients.friendly._RawSynchronousFlyteClient.update_project")
def test_update_project(mock_raw_update_project):
@mock.patch("flytekit.clients.raw.RawSynchronousFlyteClient.check_grpc_health_with_authentication", return_value=health_pb2.HealthCheckResponse.SERVING)
def test_update_project(mock_check_health, mock_raw_update_project):
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("a.b.com", True))
project = _Project("foo", "name", "description", state=_Project.ProjectState.ACTIVE)
client.update_project(project)
mock_raw_update_project.assert_called_with(project.to_flyte_idl())


@mock.patch("flytekit.clients.friendly._RawSynchronousFlyteClient.list_projects")
def test_list_projects_paginated(mock_raw_list_projects):
@mock.patch("flytekit.clients.raw.RawSynchronousFlyteClient.check_grpc_health_with_authentication", return_value=health_pb2.HealthCheckResponse.SERVING)
def test_list_projects_paginated(mock_check_health, mock_raw_list_projects):
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("a.b.com", True))
client.list_projects_paginated(limit=100, token="")
project_list_request = _project_pb2.ProjectListRequest(limit=100, token="", filters=None, sort_by=None)
mock_raw_list_projects.assert_called_with(project_list_request=project_list_request)


@mock.patch("flytekit.clients.friendly._RawSynchronousFlyteClient.create_upload_location")
def test_create_upload_location(mock_raw_create_upload_location):
@mock.patch("flytekit.clients.raw.RawSynchronousFlyteClient.check_grpc_health_with_authentication", return_value=health_pb2.HealthCheckResponse.SERVING)
def test_create_upload_location(mock_check_health, mock_raw_create_upload_location):
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("a.b.com", True))
client.get_upload_signed_url("foo", "bar", bytes(), "baz.qux", timedelta(minutes=42), add_content_md5_metadata=True)
duration_pb = Duration()
Expand Down
9 changes: 6 additions & 3 deletions tests/flytekit/unit/clients/test_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from flytekit.clients.raw import RawSynchronousFlyteClient
from flytekit.configuration import PlatformConfig

from grpc_health.v1 import health_pb2

@mock.patch("flytekit.clients.raw._admin_service")
@mock.patch("flytekit.clients.raw.grpc.insecure_channel")
def test_update_project(mock_channel, mock_admin):
@mock.patch.object(RawSynchronousFlyteClient, "check_grpc_health_with_authentication", return_value=True)
def test_update_project(mock_check_health, mock_channel, mock_admin):
mock_health_stub = mock.Mock()
client = RawSynchronousFlyteClient(PlatformConfig(endpoint="a.b.com", insecure=True))
project = _project_pb2.Project(id="foo", name="name", description="description", state=_project_pb2.Project.ACTIVE)
client.update_project(project)
Expand All @@ -17,7 +19,8 @@ def test_update_project(mock_channel, mock_admin):

@mock.patch("flytekit.clients.raw._admin_service")
@mock.patch("flytekit.clients.raw.grpc.insecure_channel")
def test_list_projects_paginated(mock_channel, mock_admin):
@mock.patch("flytekit.clients.raw.RawSynchronousFlyteClient.check_grpc_health_with_authentication", return_value=health_pb2.HealthCheckResponse.SERVING)
def test_list_projects_paginated(mock_check_health, mock_channel, mock_admin):
client = RawSynchronousFlyteClient(PlatformConfig(endpoint="a.b.com", insecure=True))
project_list_request = _project_pb2.ProjectListRequest(limit=100, token="", filters=None, sort_by=None)
client.list_projects(project_list_request)
Expand Down
Loading