-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo.py
78 lines (62 loc) · 1.94 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import pandas as pd
import requests
from dagster import (
Config,
Definitions,
MetadataValue,
Output,
asset,
job,
op
)
from seafowl.dataframe import dataframe_to_seafowl
from seafowl import SeafowlConnectionParams
from seafowl.types import QualifiedTableName
@asset
def hackernews_top_story_ids():
"""Get top stories from the HackerNews top stories endpoint.
API Docs: https://github.com/HackerNews/API#new-top-and-best-stories.
"""
top_story_ids = requests.get(
"https://hacker-news.firebaseio.com/v0/topstories.json"
).json()
return top_story_ids[:10]
# asset dependencies can be inferred from parameter names
@asset
def hackernews_top_stories(hackernews_top_story_ids):
"""Get items based on story ids from the HackerNews items endpoint."""
results = []
for item_id in hackernews_top_story_ids:
item = requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
).json()
results.append(item)
df = pd.DataFrame(results)
# recorded metadata can be customized
metadata = {
"num_records": len(df),
"preview": MetadataValue.md(df[["title", "by", "url"]].to_markdown()),
}
return Output(value=df, metadata=metadata)
class ExportToSeafowlConfig(Config):
url: str
secret: str
table: str
@op
def export_to_seafowl(
context, config: ExportToSeafowlConfig, data: pd.DataFrame
) -> None:
context.log.info("exporting to seafowl")
conn = SeafowlConnectionParams(
url=config.url,
secret=config.secret,
database=None)
destination = QualifiedTableName(schema="public", table=config.table)
dataframe_to_seafowl(data, conn, destination)
@job
def hn_stories_to_seafowl_pipeline():
export_to_seafowl(hackernews_top_stories.to_source_asset())
defs = Definitions(
jobs=[hn_stories_to_seafowl_pipeline],
assets=[hackernews_top_story_ids, hackernews_top_stories]
)