From c6e11db3aaeb681f75f357a95411c0081af5db4a Mon Sep 17 00:00:00 2001 From: Kevin G Date: Wed, 17 Aug 2022 22:49:23 -0700 Subject: [PATCH 1/4] Dagster Week 1 --- .gitignore | 2 +- week_1/project/week_1.py | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 4d76f705..406c1da6 100644 --- a/.gitignore +++ b/.gitignore @@ -152,7 +152,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ # Mac OS .DS_Store \ No newline at end of file diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index ccfe963b..2384c7eb 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -1,4 +1,5 @@ import csv +import logging from datetime import datetime from typing import List @@ -51,15 +52,22 @@ def get_s3_data(context): @op -def process_data(): - pass +def process_data(raw_stocks: List[Stock]) -> Aggregation: + highest_stock = max(raw_stocks, key=lambda x: x.high) + return Aggregation(date=highest_stock.date, high=highest_stock.high) @op -def put_redis_data(): +def put_redis_data(aggregation: Aggregation): + logging.info(f"Put {aggregation} in redis") pass -@job +@job() def week_1_pipeline(): - pass + put_redis_data(process_data(get_s3_data())) + + +week_1_pipeline.execute_in_process( + run_config={'ops': {'get_s3_data': {'config': {'s3_key': "week_1/data/stock.csv"}}}} +) From dda97c6686e685284b7e27afb738372c261f8f77 Mon Sep 17 00:00:00 2001 From: Kevin G Date: Wed, 17 Aug 2022 22:54:27 -0700 Subject: [PATCH 2/4] removing redundant parentheses --- week_1/project/week_1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 2384c7eb..2e61ffd7 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -63,7 +63,7 @@ def put_redis_data(aggregation: Aggregation): pass -@job() +@job def week_1_pipeline(): put_redis_data(process_data(get_s3_data())) From 63f846306200a42746d2afeb4b00b1c1f1affbbe Mon Sep 17 00:00:00 2001 From: Kevin G Date: Thu, 25 Aug 2022 19:42:13 -0700 Subject: [PATCH 3/4] Challenge problem --- week_1/project/config.yaml | 5 ++++- week_1/project/week_1_challenge.py | 25 +++++++++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/week_1/project/config.yaml b/week_1/project/config.yaml index 2fac04e9..c85492e9 100644 --- a/week_1/project/config.yaml +++ b/week_1/project/config.yaml @@ -1,4 +1,7 @@ ops: get_s3_data: config: - s3_key: week_1/data/stock.csv \ No newline at end of file + s3_key: week_1/data/stock.csv + process_data: + config: + n: 3 \ No newline at end of file diff --git a/week_1/project/week_1_challenge.py b/week_1/project/week_1_challenge.py index 7d7abe1f..650e166d 100644 --- a/week_1/project/week_1_challenge.py +++ b/week_1/project/week_1_challenge.py @@ -12,6 +12,7 @@ job, op, usable_as_dagster_type, + get_dagster_logger ) from pydantic import BaseModel @@ -60,16 +61,28 @@ def get_s3_data(context): return output -@op -def process_data(): - pass +@op(out=DynamicOut(), config_schema={"n": int}) +def process_data(context, raw_stocks: List[Stock]) -> DynamicOutput: + n = context.op_config["n"] + top_stocks = nlargest(n, raw_stocks, key=lambda x: x.high) + aggregations = [Aggregation(date=stock.date, high=stock.high) for stock in top_stocks] + get_dagster_logger().info(aggregations) + for idx, stock in enumerate(aggregations): + yield DynamicOutput(stock, mapping_key=str(idx+1)) @op -def put_redis_data(): - pass +def put_redis_data(aggregation=DynamicOut): + get_dagster_logger().info(f"Put Aggregation {aggregation} in redis") @job def week_1_pipeline(): - pass + aggregates = process_data(get_s3_data()) + aggregates.map(put_redis_data) + + +week_1_pipeline.execute_in_process( + run_config={'ops': {'get_s3_data': {'config': {'s3_key': "week_1/data/stock.csv"}}}, + {'process_data'}: {'config': {'n': 3}}} +) From 1a702898ffdaa32bf709aef9f9c3eb73423d7d6e Mon Sep 17 00:00:00 2001 From: Kevin G Date: Sun, 28 Aug 2022 16:25:08 -0500 Subject: [PATCH 4/4] week 2 pipeline --- week_2/dagster_ucr/project/week_2.py | 37 ++++++++++++++++++---------- week_2/dagster_ucr/resources.py | 23 +++++++++++++---- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 8e32a715..8a859a5b 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,30 +1,41 @@ from typing import List - +import logging from dagster import In, Nothing, Out, ResourceDefinition, graph, op from dagster_ucr.project.types import Aggregation, Stock from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + required_resource_keys={'s3'}, + config_schema={"s3_key": str}, + out={"stocks": Out(dagster_type=List[Stock])}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file") +def get_s3_data(context): + stocks = [] + for record in context.resources.s3.get_data(context.op_config["s3_key"]): + stocks.append(Stock.from_list(record)) + return stocks @op -def process_data(): - # Use your op from week 1 - pass +def process_data(raw_stocks: List[Stock]) -> Aggregation: + highest_stock = max(raw_stocks, key=lambda x: x.high) + return Aggregation(date=highest_stock.date, high=highest_stock.high) -@op -def put_redis_data(): - pass - +@op( + required_resource_keys={'redis'}, + ins={"aggregation": In(dagster_type=Aggregation)}, + description="Put Aggregation data into Redis", + tags={"kind": "redis"} +) +def put_redis_data(context, aggregation: Aggregation): + context.resources.redis.put_data(aggregation.date, aggregation.high) @graph def week_2_pipeline(): - # Use your graph from week 1 - pass + put_redis_data(process_data(get_s3_data())) local = { diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..e0ba7b3d 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -91,13 +91,26 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={"bucket": str, "access_key": str, "secret_key": str, "endpoint_url": str}, + description="S3 resource" +) +def s3_resource(context): """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"] + ) -@resource -def redis_resource(): +@resource( + config_schema={"host": str, "port": int}, description="Redis resource" +) +def redis_resource(context): """This resource defines a Redis client""" + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"]) pass