diff --git a/scenarios/monitoring/workflow_monitoring/common/settings.yaml b/scenarios/monitoring/workflow_monitoring/common/settings.yaml index 7c35dc12..1f1bfac6 100644 --- a/scenarios/monitoring/workflow_monitoring/common/settings.yaml +++ b/scenarios/monitoring/workflow_monitoring/common/settings.yaml @@ -10,5 +10,6 @@ td: sessions: sessions attempts: attempts tasks: tasks + revisions: revisions api_endpoint: api.treasuredata.com workflow_endpoint: api-workflow.treasuredata.com \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/manual_ingest.dig b/scenarios/monitoring/workflow_monitoring/manual_ingest.dig index 57c8b186..b8fc7bd6 100644 --- a/scenarios/monitoring/workflow_monitoring/manual_ingest.dig +++ b/scenarios/monitoring/workflow_monitoring/manual_ingest.dig @@ -3,6 +3,9 @@ _export: attempt_ids: - 605508773 - 605506079 + project_ids: + - 627610 + - 686558 +manual_ingest_attempt_task: py>: scripts.ingest_task.run @@ -14,3 +17,14 @@ _export: image: "digdag/digdag-python:3.9" _env: TD_API_KEY: ${secret:td.apikey} + ++manual_ingest_project_revision: + py>: scripts.ingest_revision.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.revisions} + project_ids: ${project_ids} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py new file mode 100644 index 00000000..5bd5c248 --- /dev/null +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py @@ -0,0 +1,40 @@ +import requests +import os +import pytd +import pandas as pd +import json + +def get_revision_info(base_url, headers, ids): + l = [] + for i in ids: + url = base_url % i + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + revisions = res.json()['revisions'] + for r in revisions: + r['projectid'] = i + l.extend(revisions) + return l + +def insert_revision_info(import_unixtime, endpoint, apikey, dest_db, dest_table, revisions): + df = pd.DataFrame(revisions) + df['time'] = int(import_unixtime) + df['userInfo'] = df['userInfo'].apply(json.dumps) + client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack') + +def run(session_unixtime, dest_db, dest_table, project_ids, api_endpoint='api.treasuredata.com', workflow_endpoint='api-workflow.treasuredata.com'): + id_list = project_ids[1:-1].split(',') + if len(id_list) == 0: + print('no project id') + return + + workflow_url = 'https://%s/api/projects' % workflow_endpoint + '/%s/revisions' + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_revision_info(workflow_url, headers, id_list) + if len(l) == 0: + print('no insert record') + return + insert_revision_info(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l) \ No newline at end of file