Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Construct): Bedrock Data Automation construct #954

Open
wants to merge 37 commits into
base: main
Choose a base branch
from

Conversation

dineshSajwan
Copy link
Contributor

Fixes #

  1. Deploy lambda function for blueprint CRUD, project CRUD, data invocation status and get result from bedrock data automation client.
  2. Create custom blueprint with user friendly schema definition or upload your existing schema.
  3. Create input/output bucket which can be accessed from lambda function with AWS best practices.
  4. Support validation and strongly typed function.
  5. The lambda functions can be integrated with amazon Eventbridge.
  6. Pick and choose between blueprint, project or invocation features.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of the project license.

Copy link

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi guys! Thanks for adding Powertools here, I left some comments. If you think it makes sense to schedule a meeting to see how Powertools can help with this challenge, please let me know.

Comment on lines 13 to 15
logger = Logger(service="BEDROCK_DATA_AUTOMATION")
tracer = Tracer(service="BEDROCK_DATA_AUTOMATION")
metrics = Metrics(namespace="CREATE_BLUEPRINT", service="BEDROCK_DATA_AUTOMATION")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, you can set the environment variable POWERTOOLS_SERVICE_NAME and remove setting it explicitly.

This is also true for the other files.

Reference: https://docs.powertools.aws.dev/lambda/python/latest/#environment-variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done b73eb4b

Comment on lines +47 to +51
s3_client = boto3.client('s3')
response = s3_client.get_object(
Bucket=bucket_name,
Key=schema_key
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the Streaming utility in Powertools that is optimized for getting larges objects from S3. Idk if this is the case, but worth it to check: https://docs.powertools.aws.dev/lambda/python/latest/utilities/streaming/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we don't have streaming requirement as of now. But good to know may use it in future.

Comment on lines 78 to 86
if 'detail' not in event:
logger.error("Event detail section is missing")
return {
'statusCode': 400,
'body': json.dumps({
'message': 'Invalid event format',
'error': 'detail section is missing'
})
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you're getting an event from Event Bridge here and want to enforce data parsing and validation using Parser utility. You can use event_parser decorator and fail fast if the event that arrives in Lambda is not an EB event. Or you can use parse function to have better control over exception mechanism.

It also provides a fully typed variable with auto-completion in the IDE.

A pseudo-code is something like this:

from aws_lambda_powertools.utilities.parser import parse
from aws_lambda_powertools.utilities.parser.models import EventBridgeModel

def lambda_handler(event: dict, context):
    try:
        # Manually parse the incoming event into the custom model
        parsed_event: EventBridgeModel = parse(model=EventBridgeModel, event=event)

        return {"statusCode": 200, "body": f"Event from {parsed_event.source}, type: {parsed_event.detail_type}"}
    except ValidationError as e:
        return {"statusCode": 400, "body": f"Validation error: {str(e)}"}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. May implement it in next build . For now I have already added the specific validations.

@dineshSajwan dineshSajwan marked this pull request as ready for review February 19, 2025 19:24
@dineshSajwan dineshSajwan requested a review from a team as a code owner February 19, 2025 19:24
@krokoko
Copy link
Collaborator

krokoko commented Feb 20, 2025

Refactored the construct, next steps:

  • need to review the python code, didn't get there yet. Refactored the index.ts for now
  • check if all the permissions can be scoped
  • deploy in a sample and check for cdk nag suppressions to add
  • [OK] deploy two instances of the construct in the same stack and ensure no collisions

"title": None,
"json_schema_extra": {
# Specify JSON Schema version and default metadata
"$schema": "http://json-schema.org/draft-07/schema#",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Will the 07 change in the future? I assume this is dependent on Pydantic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the Json schema draft which I tested to create the blueprints. Once we get the official Json schema support announcement for BDA we can update it to latest template.

if operation_type not in [stage.value for stage in OperationType]:
raise ValueError(f"Invalid operation type: {operation_type}. Must be one of {[stage.value for stage in OperationType]}")

if operation_type.lower() == 'delete':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can use a switch case, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 964c3b9


if operation == 'create':
response = create_project(project_config)
return {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: DRY code.

  def CRU_response(response_msg: string, response: Any ):
    
              return {
                  'statusCode': 200,
                  'body': json.dumps({
                      'message': response_msg,
                      'response': response
                  })
              }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 964c3b9

'extraction': {
'category': {
'state': config.get('extraction', {}).get('category', {}).get('state', State.DISABLED.value),
'types': (lambda x: [x] if not isinstance(x, list) else x)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: DRY

It looks lke we use this function a few times:

(lambda x: [x] if not isinstance(x, list) else x

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self.client = boto3.client("bedrock-data-automation-runtime")
self.input_bucket = input_bucket
self.output_bucket = output_bucket
# self.max_retries = 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Delete? Or future feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 964c3b9

validate_configs(blueprint_config, data_automation_config)

# Create S3 URIs
#s3://cb-output-documents/noa.json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

def __init__(self, s3_client=None, bda_client=None):
self.s3 = boto3.client('s3')
self.bda_client = boto3.client("bedrock-data-automation-runtime")
self.max_retries = 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Should we uncomment the above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need that now, so deleted it. 964c3b9

@MichaelWalker-git
Copy link
Contributor

This is really great work. Please ignore my comments / NITs if you don't find them useful. Thank you all!

@dineshSajwan
Copy link
Contributor Author

Tested the construct

[OK] Create Blueprint
Screenshot 2025-02-21 at 10 38 26 AM
[OK] Create Project
Screenshot 2025-02-21 at 12 10 19 PM
[OK] Invoke BDA
Screenshot 2025-02-21 at 12 50 45 PM
[OK] Get result
Screenshot 2025-02-21 at 1 12 00 PM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants