From ea7b1c79a8b02c43e428526d5a019b05676d7ea3 Mon Sep 17 00:00:00 2001 From: Yuu Ohmura Date: Wed, 31 Jan 2024 09:15:29 +0900 Subject: [PATCH] - add journey_activation and journey_summary - bug fix --- .../scripts/ingest_journey_activation.py | 51 +++++++++++++++++++ .../scripts/ingest_journey_summary.py | 51 +++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py create mode 100644 scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py diff --git a/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py new file mode 100644 index 00000000..0ec811f7 --- /dev/null +++ b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_activation.py @@ -0,0 +1,51 @@ +import requests +import pandas as pd +import pytd +import os +import json + +def get_journey_activation(base_url, headers, id): + url = base_url + '/entities/journeys/' + str(id) + '/activations' + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + data = res.json()['data'] + if data == None or len(data) == 0: + return None + + for d in data: + d['journey_id'] = id + + return data + +def get_all_journey_activation(base_url, headers, ids): + l = [] + for i in ids: + d = get_journey_activation(base_url, headers, i) + if d != None: + l.extend(d) + return l + +def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'): + print('ingest journey activation') + if len(journey_ids) == 0: + print('no jouney id') + return + id_list = journey_ids.split(',') + if len(id_list) == 0: + print('no jouney id') + return + print('count of target jouney: ' + str(len(id_list))) + base_url = 'https://%s' % cdp_api_endpoint + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_all_journey_activation(base_url, headers, id_list) + if len(l) == 0: + print('no import record') + return + df = pd.DataFrame(l) + df['time'] = int(session_unixtime) + df['attributes'] = df['attributes'].apply(json.dumps) + df['relationships'] = df['relationships'].apply(json.dumps) + client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack') diff --git a/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py new file mode 100644 index 00000000..444888d3 --- /dev/null +++ b/scenarios/monitoring/cdp_monitoring/scripts/ingest_journey_summary.py @@ -0,0 +1,51 @@ +import requests +import pandas as pd +import pytd +import os +import json + +def get_journey_summary(base_url, headers, id): + url = base_url + '/entities/journeys/' + str(id) + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + data = res.json()['data'] + if data == None or len(data) == 0: + return None + + for k in data: + if type(data[k]) is dict: + data[k] = json.dumps(data[k]) + data['journey_id'] = id + + return data + +def get_all_journey_summary(base_url, headers, ids): + l = [] + for i in ids: + d = get_journey_summary(base_url, headers, i) + if d != None: + l.append(d) + return l + +def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'): + print('ingest journey summary') + if len(journey_ids) == 0: + print('no jouney id') + return + id_list = journey_ids.split(',') + if len(id_list) == 0: + print('no jouney id') + return + print('count of target jouney: ' + str(len(id_list))) + base_url = 'https://%s' % cdp_api_endpoint + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_all_journey_summary(base_url, headers, id_list) + if len(l) == 0: + print('no import record') + return + df = pd.DataFrame(l) + df['time'] = int(session_unixtime) + client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')