Skip to content

Commit

Permalink
Add token method (>^.^)>
Browse files Browse the repository at this point in the history
We expect users of this method to provide a YAML-structured set of params including a uri, an authentication string, and whatever paramters might be needed to construct the correct payload equivalent to data in a curl request. There is an all-important under the hood POST which needs a set of params unique to each identity provider to generate access tokens for use with TokenAuthIdpPlugin.
  • Loading branch information
VersusFacit committed Jan 14, 2025
1 parent 5ad2a73 commit afd9d13
Showing 1 changed file with 64 additions and 24 deletions.
88 changes: 64 additions & 24 deletions dbt/adapters/redshift/connections.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import re
import redshift_connector
import requests
import sqlparse

from multiprocessing import Lock
from contextlib import contextmanager
from typing import Any, Callable, Dict, Tuple, Union, Optional, List, TYPE_CHECKING
from dataclasses import dataclass, field

import sqlparse
import redshift_connector
from dbt.adapters.exceptions import FailedToConnectError
from redshift_connector.utils.oids import get_datatype_name

Expand Down Expand Up @@ -37,26 +39,12 @@ def get_message(self) -> str:
logger = AdapterLogger("Redshift")


class IdentityCenterTokenType(StrEnum):
ACCESS_TOKEN = "ACCESS_TOKEN"
EXT_JWT = "EXT_JWT"

@classmethod
def validate(cls, token_type: str):
try:
cls(token_type)
except ValueError:
raise FailedToConnectError(
f"'token_type' must be set to one of {[token.value for token in iter(cls)]}"
)


class RedshiftConnectionMethod(StrEnum):
DATABASE = "database"
IAM = "iam"
IAM_ROLE = "iam_role"
IAM_IDENTITY_CENTER_BROWSER = "browser_identity_center"
IAM_IDENTITY_CENTER_TOKEN = "iam_idc_token"
IAM_IDENTITY_CENTER_TOKEN = "oauth_token_identity_center"

@classmethod
def uses_identity_center(cls, method: str) -> bool:
Expand Down Expand Up @@ -163,9 +151,11 @@ class RedshiftCredentials(Credentials):
idc_client_display_name: Optional[str] = "Amazon Redshift driver"
idp_response_timeout: Optional[int] = None

# token
token: Optional[str] = None
token_type: Optional[str] = None
# token_endpoint
# a field that we expect to be a dictionary of values used to create
# access tokens from an external identity provider integrated with a redshift
# and aws org or account Iam Idc instance
token_endpoint: Optional[Dict[str, str]] = None

_ALIASES = {"dbname": "database", "pass": "password"}

Expand Down Expand Up @@ -338,15 +328,65 @@ def __iam_idc_browser_kwargs(credentials) -> Dict[str, Any]:
return __iam_kwargs(credentials) | idc_kwargs

def __iam_idc_token_kwargs(credentials) -> Dict[str, Any]:
"""
Expand the logic and flow control here to account for other Identity providers
with their specific request configurations. May require new parameters for
token_endpoint extraction to distinguish idp's if the url is insufficient.
We only support token_type=EXT_JWT tokens. token_type=ACCESS_TOKEN has not been
tested or is supported but can be added with a presenting use-case.
"""

def handle_response(response):
"""
Rate limiting of access_token generation has been called out during refinement
and could require special handling, so this method exists to encapsulate that
code with an eye towards future handling. All other failures are thrown as
normal request failures.
"""
# Handle the 429 rate-limiting case first
if response.status_code == 429:
raise DbtRuntimeError(
"Rate limit on identity provider's token dispatch has been reached. "
"Consider increasing your identity provider's refresh token rate or "
"lower dbt's maximum concurrent thread count."
)

# Raise for any other non-success status codes (4xx, 5xx)
response.raise_for_status()

logger.debug("Connecting to Redshift with '{credentials.method}' credentials method")

__validate_required_fields("iam_idc_token", ("method", "token", "token_type"))
IdentityCenterTokenType.validate(credentials.token_type)
__validate_required_fields("iam_idc_token", ("token_endpoint",))

required_keys = {"request_url", "idp_auth_credentials", "request_data"}
if required_keys - credentials.token_endpoint.keys():
raise FailedToConnectError(
"okta requires token_endpoint is provided all three of: \n"
" (1) request_url - the endpoint\n"
" (2) idp_auth_credentials - the base64 encoding of okta client's client_id:client_secret\n"
" (3) request_data field with all necessary parameters in http friendly encoding including refresh_token\n"
)

encoded_idp_client_creds = credentials.token_endpoint["idp_auth_credentials"]

headers = {
"accept": "application/json",
"authorization": f"Basic {encoded_idp_client_creds}",
"content-type": "application/x-www-form-urlencoded",
}

response = requests.post(
credentials.token_endpoint["request_url"],
headers=headers,
data=credentials.token_endpoint["request_data"],
)
handle_response(response)

return __iam_kwargs(credentials) | {
"credentials_provider": "IdpTokenAuthPlugin",
"token": credentials.token,
"token_type": credentials.token_type,
"token": response.json()["access_token"],
"token_type": "EXT_JWT",
}

#
Expand Down

0 comments on commit afd9d13

Please sign in to comment.