Skip to content

Commit

Permalink
add the function for ingesting revision data
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuu Ohmura committed Jan 26, 2024
1 parent 0ce91c8 commit 547663d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ td:
sessions: sessions
attempts: attempts
tasks: tasks
revisions: revisions
api_endpoint: api.treasuredata.com
workflow_endpoint: api-workflow.treasuredata.com
14 changes: 14 additions & 0 deletions scenarios/monitoring/workflow_monitoring/manual_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ _export:
attempt_ids:
- 605508773
- 605506079
project_ids:
- 627610
- 686558

+manual_ingest_attempt_task:
py>: scripts.ingest_task.run
Expand All @@ -14,3 +17,14 @@ _export:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+manual_ingest_project_revision:
py>: scripts.ingest_revision.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.revisions}
project_ids: ${project_ids}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import requests
import os
import pytd
import pandas as pd
import json

def get_revision_info(base_url, headers, ids):
l = []
for i in ids:
url = base_url % i
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
revisions = res.json()['revisions']
for r in revisions:
r['projectid'] = i
l.extend(revisions)
return l

def insert_revision_info(import_unixtime, endpoint, apikey, dest_db, dest_table, revisions):
df = pd.DataFrame(revisions)
df['time'] = int(import_unixtime)
df['userInfo'] = df['userInfo'].apply(json.dumps)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

def run(session_unixtime, dest_db, dest_table, project_ids, api_endpoint='api.treasuredata.com', workflow_endpoint='api-workflow.treasuredata.com'):
id_list = project_ids[1:-1].split(',')
if len(id_list) == 0:
print('no project id')
return

workflow_url = 'https://%s/api/projects' % workflow_endpoint + '/%s/revisions'
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_revision_info(workflow_url, headers, id_list)
if len(l) == 0:
print('no insert record')
return
insert_revision_info(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l)

0 comments on commit 547663d

Please sign in to comment.