From b4733e8d19e4ad632449cb3a9cce1efe6d254e5d Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sat, 20 Aug 2022 16:35:06 +0100 Subject: [PATCH 01/16] Add process_data logic --- week_1/project/week_1.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index ccfe963b..626b94bc 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -50,9 +50,15 @@ def get_s3_data(context): return output -@op -def process_data(): - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + return Aggregation(date=max_stock.date, high=max_stock.high) @op From 8785918bacfff467daa3488a1217336ce1f43dae Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sat, 20 Aug 2022 17:35:32 +0100 Subject: [PATCH 02/16] Add stub for pub_redis_data --- week_1/project/week_1.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 626b94bc..9e07be25 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List -from dagster import In, Nothing, Out, job, op, usable_as_dagster_type +from dagster import In, Nothing, Out, job, op, usable_as_dagster_type, get_dagster_logger from pydantic import BaseModel @@ -61,8 +61,15 @@ def process_data(stocks: List[Stock]) -> Aggregation: return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + out=None, + description="Write to Redis" +) +def put_redis_data(agg_max: Aggregation) -> None: + # Log the output + logger = get_dagster_logger() + logger.info(f"Write {agg_max} to Redis.") pass From 60b321a0c7d587bb470138a68580346a73a688fa Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sat, 20 Aug 2022 17:37:00 +0100 Subject: [PATCH 03/16] Add ops to a job --- week_1/project/week_1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 9e07be25..8c15e69c 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -75,4 +75,4 @@ def put_redis_data(agg_max: Aggregation) -> None: @job def week_1_pipeline(): - pass + put_redis_data(process_data(get_s3_data())) From 3dbe4e09bb96a02d4b06e719770df5956d05f748 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Wed, 24 Aug 2022 11:26:07 +0100 Subject: [PATCH 04/16] Add resources for S3 and Redis --- week_2/dagster_ucr/resources.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..b4829561 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -91,13 +91,34 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String), + }, + description="A resource that can connect to S3." +) +def s3_resource(context) -> S3: """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"], + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int), + } +) +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"], + ) From 439c1940ca61575be81547f0fe5a2695726b2a59 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 28 Aug 2022 19:11:54 +0100 Subject: [PATCH 05/16] Complete ops and graph --- week_2/dagster_ucr/project/week_2.py | 45 ++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 3313403d..f59b1491 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -5,26 +5,47 @@ from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock])}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file", +) +def get_s3_data(context) -> List[Stock]: + stocks = context.resources.s3.get_data(context.op_config["s3_key"]) + return [Stock.from_list(stock) for stock in stocks] -@op -def process_data(): - # Use your op from week 1 - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + tags={"kind": "transformation"}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): - pass +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + out=Out(Nothing), + tags={"kind": "redis"}, + description="Write to Redis" +) +def put_redis_data(context, agg_max: Aggregation) -> None: + data = context.resources.redis.put_data(str(agg_max.date), str(agg_max.high)) + context.log.info(f"Write {data} to Redis.") @graph def week_2_pipeline(): - # Use your graph from week 1 - pass + s3_data = get_s3_data() + highest_stock = process_data(s3_data) + put_redis_data(highest_stock) local = { From 069a6043ed61a59eaa84b3c7056d9f4755d8abb7 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Mon, 29 Aug 2022 13:47:11 +0100 Subject: [PATCH 06/16] Add description to Redis resource --- week_2/dagster_ucr/resources.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index b4829561..e00f36e2 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -114,7 +114,8 @@ def s3_resource(context) -> S3: config_schema={ "host": Field(String), "port": Field(Int), - } + }, + description="A resource that connects to Redis.", ) def redis_resource(context) -> Redis: """This resource defines a Redis client""" From 3586228c1baf30c1ff54463c6ea8a6d85126e57b Mon Sep 17 00:00:00 2001 From: Ian Young Date: Mon, 29 Aug 2022 13:48:51 +0100 Subject: [PATCH 07/16] Use Dagster Type & Add logging example without context. --- week_2/dagster_ucr/project/week_2.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index f59b1491..18c0665c 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,6 +1,6 @@ from typing import List -from dagster import In, Nothing, Out, ResourceDefinition, graph, op +from dagster import In, Nothing, Out, ResourceDefinition, graph, op, get_dagster_logger from dagster_ucr.project.types import Aggregation, Stock from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource @@ -26,13 +26,16 @@ def get_s3_data(context) -> List[Stock]: def process_data(stocks: List[Stock]) -> Aggregation: # Find the stock with the highest value max_stock = max(stocks, key=lambda x: x.high) + # Log the output + logger = get_dagster_logger() + logger.info(f"Higest stock is: {max_stock}") return Aggregation(date=max_stock.date, high=max_stock.high) @op( ins={"agg_max": In(dagster_type=Aggregation)}, required_resource_keys={"redis"}, - out=Out(Nothing), + out=Out(dagster_type=Nothing), tags={"kind": "redis"}, description="Write to Redis" ) From 5a0ea7a61a751be7e66da0f568b6e254ae255ae8 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 4 Sep 2022 14:11:06 +0100 Subject: [PATCH 08/16] Copy ops and graph from week 2 --- week_3/project/week_3.py | 55 ++++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index f4753753..9410321e 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -13,34 +13,57 @@ op, sensor, static_partitioned_config, + get_dagster_logger, ) from project.resources import mock_s3_resource, redis_resource, s3_resource from project.sensors import get_s3_keys from project.types import Aggregation, Stock -@op -def get_s3_data(): - # Use your ops from week 2 - pass - - -@op -def process_data(): - # Use your ops from week 2 - pass +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock])}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file", +) +def get_s3_data(context) -> List[Stock]: + stocks = context.resources.s3.get_data(context.op_config["s3_key"]) + return [Stock.from_list(stock) for stock in stocks] -@op -def put_redis_data(): - # Use your ops from week 2 - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + tags={"kind": "transformation"}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + # Log the output + logger = get_dagster_logger() + logger.info(f"Higest stock is: {max_stock}") + return Aggregation(date=max_stock.date, high=max_stock.high) + + +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + out=Out(dagster_type=Nothing), + tags={"kind": "redis"}, + description="Write to Redis" +) +def put_redis_data(context, agg_max: Aggregation) -> None: + data = context.resources.redis.put_data(str(agg_max.date), str(agg_max.high)) + context.log.info(f"Write {data} to Redis.") @graph def week_3_pipeline(): - # Use your graph from week 2 - pass + s3_data = get_s3_data() + highest_stock = process_data(s3_data) + put_redis_data(highest_stock) local = { From 132fd2e302f31ebb86d6d4e4371738e98cac5ae1 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 4 Sep 2022 14:12:07 +0100 Subject: [PATCH 09/16] Add schedules --- week_3/project/week_3.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index 9410321e..cd4eaed2 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -114,10 +114,11 @@ def docker_config(): }, ) +# Schedule for local: Every 15 minutes +local_week_3_schedule = ScheduleDefinition(job=local_week_3_pipeline, cron_schedule="*/15 * * * *") -local_week_3_schedule = None # Add your schedule - -docker_week_3_schedule = None # Add your schedule +# Schedule for docker: Start of every hour +docker_week_3_schedule = ScheduleDefinition(job=docker_week_3_pipeline, cron_schedule="0 * * * *") @sensor From 0213ded7180b283ab1afd5c5b4590af3a425e81f Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 4 Sep 2022 14:46:49 +0100 Subject: [PATCH 10/16] Add docker_week_3_sensor --- week_3/project/week_3.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index cd4eaed2..3e5ccad7 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -121,6 +121,33 @@ def docker_config(): docker_week_3_schedule = ScheduleDefinition(job=docker_week_3_pipeline, cron_schedule="0 * * * *") -@sensor -def docker_week_3_sensor(): - pass +@sensor( + job=docker_week_3_pipeline, + minimum_interval_seconds=30, + description="Check S3 bucket for new files every 30 seconds." +) +def docker_week_3_sensor(context): + + # Check for new files in the S3 bucket. + new_files = get_s3_keys( + bucket="dagster", + prefix="prefix", + endpoint_url="http://host.docker.internal:4566" + ) + + if not new_files: + yield SkipReason("No new s3 files found in bucket.") + return + + log = get_dagster_logger() + log.info(f"RunRequest for {new_files}") + + # RunRequest for every new file. + for new_file in new_files: + yield RunRequest( + run_key=new_file, + run_config={ + "resources": {**docker["resources"]}, + "ops": {"get_s3_data": {"config": {"s3_key": new_file}}}, + } + ) From 0edb0244a9c2b791337aebdbd07dde43ca95cff8 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 4 Sep 2022 14:53:37 +0100 Subject: [PATCH 11/16] Add op retry policy to job --- week_3/project/week_3.py | 1 + 1 file changed, 1 insertion(+) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index 3e5ccad7..cb83297e 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -112,6 +112,7 @@ def docker_config(): "s3": s3_resource, "redis": redis_resource, }, + op_retry_policy=RetryPolicy(max_retries=10, delay=1) ) # Schedule for local: Every 15 minutes From 9e4757604f445d65db0a736a8d8512bd0e64e5d0 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 4 Sep 2022 15:38:21 +0100 Subject: [PATCH 12/16] Add static partitioned config for docker --- week_3/project/week_3.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index cb83297e..4ff3170f 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -92,8 +92,16 @@ def week_3_pipeline(): } -def docker_config(): - pass +# Create a fixed number of partitions (1-10) +partition_keys = [str(i) for i in range(1, 11)] + +@static_partitioned_config(partition_keys=partition_keys) +def docker_config(partition_key: str): + key = f'prefix/stock_{partition_key}.csv' + return { + "resources": {**docker["resources"]}, + "ops": {"get_s3_data": {"config": {"s3_key": key }}} + } local_week_3_pipeline = week_3_pipeline.to_job( From aa9ed44ea256939446a6a1ecb42c1c004917078c Mon Sep 17 00:00:00 2001 From: Ian Young Date: Mon, 5 Sep 2022 12:13:35 +0100 Subject: [PATCH 13/16] Fix passing docker config to job --- week_3/project/week_3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index 4ff3170f..52c6c83a 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -115,7 +115,7 @@ def docker_config(partition_key: str): docker_week_3_pipeline = week_3_pipeline.to_job( name="docker_week_3_pipeline", - config=docker_config, + config=docker, resource_defs={ "s3": s3_resource, "redis": redis_resource, From eefb2976eac8cb8018d79ade0c8b39f43c53c4cd Mon Sep 17 00:00:00 2001 From: Ian Young Date: Mon, 5 Sep 2022 12:14:16 +0100 Subject: [PATCH 14/16] Simplify run_config for sensor. We can copy the full config settings and override only the relevant parts. --- week_3/project/week_3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index 52c6c83a..d5b0ca11 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -156,7 +156,7 @@ def docker_week_3_sensor(context): yield RunRequest( run_key=new_file, run_config={ - "resources": {**docker["resources"]}, - "ops": {"get_s3_data": {"config": {"s3_key": new_file}}}, + **docker, + "ops": {"get_s3_data": {"config": {"s3_key": new_file}}} } ) From 700f72dff57e80efef97a15352190ff29c3afaea Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 11 Sep 2022 18:35:55 +0100 Subject: [PATCH 15/16] Simplified docker_config with arg unpacking --- week_3/project/week_3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index d5b0ca11..31878823 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -99,7 +99,7 @@ def week_3_pipeline(): def docker_config(partition_key: str): key = f'prefix/stock_{partition_key}.csv' return { - "resources": {**docker["resources"]}, + **docker, "ops": {"get_s3_data": {"config": {"s3_key": key }}} } From a8c4b3533cd0943cd1b63f81d1d0d23df39a4d15 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 11 Sep 2022 18:38:17 +0100 Subject: [PATCH 16/16] Fix schedule for docker so that it works with the static partition --- week_3/project/week_3.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/week_3/project/week_3.py b/week_3/project/week_3.py index 31878823..f6b82e52 100644 --- a/week_3/project/week_3.py +++ b/week_3/project/week_3.py @@ -127,7 +127,16 @@ def docker_config(partition_key: str): local_week_3_schedule = ScheduleDefinition(job=local_week_3_pipeline, cron_schedule="*/15 * * * *") # Schedule for docker: Start of every hour -docker_week_3_schedule = ScheduleDefinition(job=docker_week_3_pipeline, cron_schedule="0 * * * *") +@schedule( + cron_schedule="0 * * * *", + job=docker_week_3_pipeline, + tags={"kind": "schedule"}, + description="Run scheduled for the start of every hour" +) +def docker_week_3_schedule(): + for partition_key in partition_keys: + request = docker_week_3_pipeline.run_request_for_partition(partition_key=partition_key, run_key=partition_key) + yield request @sensor(