Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workflow revision and activations of Audience Studio as monitoring objects #389

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scenarios/monitoring/cdp_monitoring/common/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ td:
entities_history: entities_history
journey_statistics: journey_statistics
journey_statistics_history: journey_statistics_history
activations: activations
activations_history: activations_history
api_endpoint: api.treasuredata.com
cdp_api_endpoint: api-cdp.treasuredata.com

25 changes: 25 additions & 0 deletions scenarios/monitoring/cdp_monitoring/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ schedule:
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_activations:
+append_activations_history:
td>:
query: select * from ${td.tables.activations}
insert_into: ${td.tables.activations_history}

+get_current_parent_segment_list:
td>:
query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as ids from ${td.tables.parent_segments}
store_last_results: true
database: ${td.database}

+ingest_activations:
py>: scripts.ingest_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.activations}
ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_journey_statistics:
+append_journey_statistics_history:
td>:
Expand Down
20 changes: 20 additions & 0 deletions scenarios/monitoring/cdp_monitoring/initial_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ _export:
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_ingest_activations:
+get_current_parent_segment_list:
td>:
query: select ARRAY_AGG(id) as ids from ${td.tables.parent_segments}
store_last_results: true
database: ${td.database}
+ingest_activations:
py>: scripts.ingest_activation.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.activations}
ids: ${td.last_results.ids}
api_endpoint: ${td.api_endpoint}
cdp_api_endpoint: ${td.cdp_api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}


+initial_ingest_journey_statistics:
+get_current_journey_list:
td>:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import requests
import pandas as pd
import pytd
import os
import json

def get_activations_per_audience(base_url, headers, id):
url = base_url % id
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
activations = res.json()
for a in activations:
a['ps_id'] = id
return activations

def get_all_activations(base_url, headers, id_list):
l = []
for i in id_list:
l.extend(get_activations_per_audience(base_url=base_url, headers=headers, id=i))
return l

def insert_activations(import_unixtime, endpoint, apikey, dest_db, dest_table, activations):
df = pd.DataFrame(activations)
df['time'] = int(import_unixtime)
df['columns'] = df['columns'].apply(json.dumps)
df['connectorConfig'] = df['connectorConfig'].apply(json.dumps)
df['createdBy'] = df['createdBy'].apply(json.dumps)
df['updatedBy'] = df['updatedBy'].apply(json.dumps)
df['executions'] = df['executions'].apply(json.dumps)
df['notifyOn'] = df['notifyOn'].apply(json.dumps)
df['emailRecipients'] = df['emailRecipients'].apply(json.dumps)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')

def run(session_unixtime, dest_db, dest_table, ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
id_list = ids.split(',')
if len(id_list) == 0:
print('no parent id')
return
cdp_url = 'https://%s/audiences' % cdp_api_endpoint + '/%s/syndications'
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_activations(cdp_url, headers, id_list)
insert_activations(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import requests
import pandas as pd
import pytd
import os
import json

def get_journey_activation(base_url, headers, id):
url = base_url + '/entities/journeys/' + str(id) + '/activations'
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
data = res.json()['data']
if data == None or len(data) == 0:
return None

for d in data:
d['journey_id'] = id

return data

def get_all_journey_activation(base_url, headers, ids):
l = []
for i in ids:
d = get_journey_activation(base_url, headers, i)
if d != None:
l.extend(d)
return l

def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
print('ingest journey activation')
if len(journey_ids) == 0:
print('no jouney id')
return
id_list = journey_ids.split(',')
if len(id_list) == 0:
print('no jouney id')
return
print('count of target jouney: ' + str(len(id_list)))
base_url = 'https://%s' % cdp_api_endpoint
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_journey_activation(base_url, headers, id_list)
if len(l) == 0:
print('no import record')
return
df = pd.DataFrame(l)
df['time'] = int(session_unixtime)
df['attributes'] = df['attributes'].apply(json.dumps)
df['relationships'] = df['relationships'].apply(json.dumps)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import requests
import pandas as pd
import pytd
import os
import json

def get_journey_summary(base_url, headers, id):
url = base_url + '/entities/journeys/' + str(id)
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
data = res.json()['data']
if data == None or len(data) == 0:
return None

for k in data:
if type(data[k]) is dict:
data[k] = json.dumps(data[k])
data['journey_id'] = id

return data

def get_all_journey_summary(base_url, headers, ids):
l = []
for i in ids:
d = get_journey_summary(base_url, headers, i)
if d != None:
l.append(d)
return l

def run(session_unixtime, dest_db, dest_table, journey_ids, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'):
print('ingest journey summary')
if len(journey_ids) == 0:
print('no jouney id')
return
id_list = journey_ids.split(',')
if len(id_list) == 0:
print('no jouney id')
return
print('count of target jouney: ' + str(len(id_list)))
base_url = 'https://%s' % cdp_api_endpoint
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_all_journey_summary(base_url, headers, id_list)
if len(l) == 0:
print('no import record')
return
df = pd.DataFrame(l)
df['time'] = int(session_unixtime)
client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack')
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ td:
sessions: sessions
attempts: attempts
tasks: tasks
revisions: revisions
api_endpoint: api.treasuredata.com
workflow_endpoint: api-workflow.treasuredata.com
14 changes: 14 additions & 0 deletions scenarios/monitoring/workflow_monitoring/manual_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ _export:
attempt_ids:
- 605508773
- 605506079
project_ids:
- 627610
- 686558

+manual_ingest_attempt_task:
py>: scripts.ingest_task.run
Expand All @@ -14,3 +17,14 @@ _export:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}

+manual_ingest_project_revision:
py>: scripts.ingest_revision.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.revisions}
project_ids: ${project_ids}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:td.apikey}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import requests
import os
import pytd
import pandas as pd
import json

def get_revision_info(base_url, headers, ids):
l = []
for i in ids:
url = base_url % i
print(url)
res = requests.get(url=url, headers=headers)
if res.status_code != requests.codes.ok:
res.raise_for_status()
revisions = res.json()['revisions']
for r in revisions:
r['projectid'] = i
l.extend(revisions)
return l

def insert_revision_info(import_unixtime, endpoint, apikey, dest_db, dest_table, revisions):
df = pd.DataFrame(revisions)
df['time'] = int(import_unixtime)
df['userInfo'] = df['userInfo'].apply(json.dumps)
client = pytd.Client(apikey=apikey, endpoint=endpoint, database=dest_db)
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

def run(session_unixtime, dest_db, dest_table, project_ids, api_endpoint='api.treasuredata.com', workflow_endpoint='api-workflow.treasuredata.com'):
id_list = project_ids[1:-1].split(',')
if len(id_list) == 0:
print('no project id')
return

workflow_url = 'https://%s/api/projects' % workflow_endpoint + '/%s/revisions'
headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']}
l = get_revision_info(workflow_url, headers, id_list)
if len(l) == 0:
print('no insert record')
return
insert_revision_info(session_unixtime, 'https://%s' % api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, l)
Loading