From 547663d19bb06034ad2e2325eb2d09d5b1b84ab4 Mon Sep 17 00:00:00 2001 From: Yuu Ohmura Date: Fri, 26 Jan 2024 11:49:08 +0900 Subject: [PATCH 1/3] add the function for ingesting revision data --- .../workflow_monitoring/common/settings.yaml | 1 + .../workflow_monitoring/manual_ingest.dig | 14 +++++++ .../scripts/ingest_revision.py | 40 +++++++++++++++++++ 3 files changed, 55 insertions(+) create mode 100644 scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py diff --git a/scenarios/monitoring/workflow_monitoring/common/settings.yaml b/scenarios/monitoring/workflow_monitoring/common/settings.yaml index 7c35dc12..1f1bfac6 100644 --- a/scenarios/monitoring/workflow_monitoring/common/settings.yaml +++ b/scenarios/monitoring/workflow_monitoring/common/settings.yaml @@ -10,5 +10,6 @@ td: sessions: sessions attempts: attempts tasks: tasks + revisions: revisions api_endpoint: api.treasuredata.com workflow_endpoint: api-workflow.treasuredata.com \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/manual_ingest.dig b/scenarios/monitoring/workflow_monitoring/manual_ingest.dig index 57c8b186..b8fc7bd6 100644 --- a/scenarios/monitoring/workflow_monitoring/manual_ingest.dig +++ b/scenarios/monitoring/workflow_monitoring/manual_ingest.dig @@ -3,6 +3,9 @@ _export: attempt_ids: - 605508773 - 605506079 + project_ids: + - 627610 + - 686558 +manual_ingest_attempt_task: py>: scripts.ingest_task.run @@ -14,3 +17,14 @@ _export: image: "digdag/digdag-python:3.9" _env: TD_API_KEY: ${secret:td.apikey} + ++manual_ingest_project_revision: + py>: scripts.ingest_revision.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.revisions} + project_ids: ${project_ids} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py new file mode 100644 index 00000000..5bd5c248 --- /dev/null +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py @@ -0,0 +1,40 @@ +import requests +import os +import pytd +import pandas as pd +import json + +def get_revision_info(base_url, headers, ids): + l = [] + for i in ids: + url = base_url % i + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + revisions = res.json()['revisions'] + for r in revisions: + r['projectid'] = i + l.extend(revisions) + return l + +def insert_revision_info(import_unixtime, endpoint, apikey, dest_db, dest_table, revisions): + df = pd.DataFrame(revisions) + df['time'] = int(import_unixtime) + df['userInfo'] = df['userInfo'].apply(json.dumps) + client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack') + +def run(session_unixtime, dest_db, dest_table, project_ids, api_endpoint='api.treasuredata.com', workflow_endpoint='api-workflow.treasuredata.com'): + id_list = project_ids[1:-1].split(',') + if len(id_list) == 0: + print('no project id') + return + + workflow_url = 'https://%s/api/projects' % workflow_endpoint + '/%s/revisions' + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_revision_info(workflow_url, headers, id_list) + if len(l) == 0: + print('no insert record') + return + insert_revision_info(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l) \ No newline at end of file From eebceaf347f8803845fdaae0d4bd55895a1e8ad6 Mon Sep 17 00:00:00 2001 From: Yuu Ohmura Date: Mon, 29 Jan 2024 16:20:58 +0900 Subject: [PATCH 2/3] add activations list to data --- .../cdp_monitoring/common/settings.yaml | 2 + .../cdp_monitoring/incremental_ingest.dig | 25 +++++++++++ .../cdp_monitoring/initial_ingest.dig | 20 +++++++++ .../scripts/ingest_activation.py | 45 +++++++++++++++++++ 4 files changed, 92 insertions(+) create mode 100644 scenarios/monitoring/cdp_monitoring/scripts/ingest_activation.py diff --git a/scenarios/monitoring/cdp_monitoring/common/settings.yaml b/scenarios/monitoring/cdp_monitoring/common/settings.yaml index 3a17a162..b78087e6 100644 --- a/scenarios/monitoring/cdp_monitoring/common/settings.yaml +++ b/scenarios/monitoring/cdp_monitoring/common/settings.yaml @@ -7,6 +7,8 @@ td: entities_history: entities_history journey_statistics: journey_statistics journey_statistics_history: journey_statistics_history + activations: activations + activations_history: activations_history api_endpoint: api.treasuredata.com cdp_api_endpoint: api-cdp.treasuredata.com diff --git a/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig b/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig index 21e1434b..12c65233 100644 --- a/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig +++ b/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig @@ -48,6 +48,31 @@ schedule: _env: TD_API_KEY: ${secret:td.apikey} ++incremental_ingest_activations: + +append_activations_history: + td>: + query: select * from ${td.tables.activations} + insert_into: ${td.tables.activations_history} + + +get_current_parent_segment_list: + td>: + query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as ids from ${td.tables.parent_segments} + store_last_results: true + database: ${td.database} + + +ingest_activations: + py>: scripts.ingest_activation.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.activations} + ids: ${td.last_results.ids} + api_endpoint: ${td.api_endpoint} + cdp_api_endpoint: ${td.cdp_api_endpoint} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + +incremental_ingest_journey_statistics: +append_journey_statistics_history: td>: diff --git a/scenarios/monitoring/cdp_monitoring/initial_ingest.dig b/scenarios/monitoring/cdp_monitoring/initial_ingest.dig index 02e36245..2529dbdd 100644 --- a/scenarios/monitoring/cdp_monitoring/initial_ingest.dig +++ b/scenarios/monitoring/cdp_monitoring/initial_ingest.dig @@ -40,6 +40,26 @@ _export: _env: TD_API_KEY: ${secret:td.apikey} ++initial_ingest_activations: + +get_current_parent_segment_list: + td>: + query: select ARRAY_AGG(id) as ids from ${td.tables.parent_segments} + store_last_results: true + database: ${td.database} + +ingest_activations: + py>: scripts.ingest_activation.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.activations} + ids: ${td.last_results.ids} + api_endpoint: ${td.api_endpoint} + cdp_api_endpoint: ${td.cdp_api_endpoint} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + + +initial_ingest_journey_statistics: +get_current_journey_list: td>: diff --git a/scenarios/monitoring/cdp_monitoring/scripts/ingest_activation.py b/scenarios/monitoring/cdp_monitoring/scripts/ingest_activation.py new file mode 100644 index 00000000..240f0398 --- /dev/null +++ b/scenarios/monitoring/cdp_monitoring/scripts/ingest_activation.py @@ -0,0 +1,45 @@ +import requests +import pandas as pd +import pytd +import os +import json + +def get_activations_per_audience(base_url, headers, id): + url = base_url % id + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + activations = res.json() + for a in activations: + a['ps_id'] = id + return activations + +def get_all_activations(base_url, headers, id_list): + l = [] + for i in id_list: + l.extend(get_activations_per_audience(base_url=base_url, headers=headers, id=i)) + return l + +def insert_activations(import_unixtime, endpoint, apikey, dest_db, dest_table, activations): + df = pd.DataFrame(activations) + df['time'] = int(import_unixtime) + df['columns'] = df['columns'].apply(json.dumps) + df['connectorConfig'] = df['connectorConfig'].apply(json.dumps) + df['createdBy'] = df['createdBy'].apply(json.dumps) + df['updatedBy'] = df['updatedBy'].apply(json.dumps) + df['executions'] = df['executions'].apply(json.dumps) + df['notifyOn'] = df['notifyOn'].apply(json.dumps) + df['emailRecipients'] = df['emailRecipients'].apply(json.dumps) + client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack') + +def run(session_unixtime, dest_db, dest_table, ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'): + id_list = ids.split(',') + if len(id_list) == 0: + print('no parent id') + return + cdp_url = 'https://%s/audiences' % cdp_api_endpoint + '/%s/syndications' + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_all_activations(cdp_url, headers, id_list) + insert_activations(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l) From ea7b1c79a8b02c43e428526d5a019b05676d7ea3 Mon Sep 17 00:00:00 2001 From: Yuu Ohmura Date: Wed, 31 Jan 2024 09:15:29 +0900 Subject: [PATCH 3/3] - add journey_activation and journey_summary - bug fix --- .../scripts/ingest_journey_activation.py | 51 +++++++++++++++++++ .../scripts/ingest_journey_summary.py | 51 +++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py create mode 100644 scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py diff --git a/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py new file mode 100644 index 00000000..0ec811f7 --- /dev/null +++ b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py @@ -0,0 +1,51 @@ +import requests +import pandas as pd +import pytd +import os +import json + +def get_journey_activation(base_url, headers, id): + url = base_url + '/entities/journeys/' + str(id) + '/activations' + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + data = res.json()['data'] + if data == None or len(data) == 0: + return None + + for d in data: + d['journey_id'] = id + + return data + +def get_all_journey_activation(base_url, headers, ids): + l = [] + for i in ids: + d = get_journey_activation(base_url, headers, i) + if d != None: + l.extend(d) + return l + +def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'): + print('ingest journey activation') + if len(journey_ids) == 0: + print('no jouney id') + return + id_list = journey_ids.split(',') + if len(id_list) == 0: + print('no jouney id') + return + print('count of target jouney: ' + str(len(id_list))) + base_url = 'https://%s' % cdp_api_endpoint + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_all_journey_activation(base_url, headers, id_list) + if len(l) == 0: + print('no import record') + return + df = pd.DataFrame(l) + df['time'] = int(session_unixtime) + df['attributes'] = df['attributes'].apply(json.dumps) + df['relationships'] = df['relationships'].apply(json.dumps) + client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack') diff --git a/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py new file mode 100644 index 00000000..444888d3 --- /dev/null +++ b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py @@ -0,0 +1,51 @@ +import requests +import pandas as pd +import pytd +import os +import json + +def get_journey_summary(base_url, headers, id): + url = base_url + '/entities/journeys/' + str(id) + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + data = res.json()['data'] + if data == None or len(data) == 0: + return None + + for k in data: + if type(data[k]) is dict: + data[k] = json.dumps(data[k]) + data['journey_id'] = id + + return data + +def get_all_journey_summary(base_url, headers, ids): + l = [] + for i in ids: + d = get_journey_summary(base_url, headers, i) + if d != None: + l.append(d) + return l + +def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'): + print('ingest journey summary') + if len(journey_ids) == 0: + print('no jouney id') + return + id_list = journey_ids.split(',') + if len(id_list) == 0: + print('no jouney id') + return + print('count of target jouney: ' + str(len(id_list))) + base_url = 'https://%s' % cdp_api_endpoint + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_all_journey_summary(base_url, headers, id_list) + if len(l) == 0: + print('no import record') + return + df = pd.DataFrame(l) + df['time'] = int(session_unixtime) + client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')