From 3c5f13556b1d14817800cc99f44e64ba77dd9e85 Mon Sep 17 00:00:00 2001 From: o-mura Date: Fri, 28 Jun 2024 10:55:50 +0900 Subject: [PATCH] - add ingest_revision to daily batch. (#403) - fix small bug. Co-authored-by: Yuu Ohmura --- .../monitoring/workflow_monitoring/README.md | 3 +- .../workflow_monitoring/common/settings.yaml | 1 + .../incremental_ingest.dig | 31 +++++++++++++++++-- .../initial_ingest_session_attempt.dig | 17 ++++++++++ .../workflow_monitoring/manual_ingest.dig | 15 --------- .../scripts/ingest_revision.py | 2 +- .../scripts/update_attempt.py | 3 ++ 7 files changed, 52 insertions(+), 20 deletions(-) diff --git a/scenarios/monitoring/workflow_monitoring/README.md b/scenarios/monitoring/workflow_monitoring/README.md index ca394bc6..f6a84f6f 100644 --- a/scenarios/monitoring/workflow_monitoring/README.md +++ b/scenarios/monitoring/workflow_monitoring/README.md @@ -35,8 +35,9 @@ You should set lower_limit_session_id option (initial_ingest_session task of ini | schedules | [/api/schedules](https://docs.digdag.io/api/) | | sessions | [/api/sessions](https://docs.digdag.io/api/) | | workflows | [/api/workflows](https://docs.digdag.io/api/) | +| revisions | [/api/projects/{id}/revisions](https://docs.digdag.io/api/) | -**projects** , **schedules** and **workflows** table is replaced daily by incremental_ingest.dig. +**projects** , **schedules** , **workflows** and **revisions**table is replaced daily by incremental_ingest.dig. **attempts** and **sessions** table is imported incrementally. # Next Step diff --git a/scenarios/monitoring/workflow_monitoring/common/settings.yaml b/scenarios/monitoring/workflow_monitoring/common/settings.yaml index 1f1bfac6..ca8d5292 100644 --- a/scenarios/monitoring/workflow_monitoring/common/settings.yaml +++ b/scenarios/monitoring/workflow_monitoring/common/settings.yaml @@ -11,5 +11,6 @@ td: attempts: attempts tasks: tasks revisions: revisions + revisions_history: revisions_history api_endpoint: api.treasuredata.com workflow_endpoint: api-workflow.treasuredata.com \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/incremental_ingest.dig b/scenarios/monitoring/workflow_monitoring/incremental_ingest.dig index 25051021..14ad9130 100644 --- a/scenarios/monitoring/workflow_monitoring/incremental_ingest.dig +++ b/scenarios/monitoring/workflow_monitoring/incremental_ingest.dig @@ -17,9 +17,9 @@ schedule: session_unixtime: ${session_unixtime} dest_db: ${td.database} dest_table: ${td.tables.projects} - api_endpoint: ${td.api_endpoint} +api_endpoint: ${td.api_endpoint} workflow_endpoint: ${td.workflow_endpoint} - docker: + docker: image: "digdag/digdag-python:3.10.1" _env: TD_API_KEY: ${secret:td.apikey} @@ -62,6 +62,31 @@ schedule: _env: TD_API_KEY: ${secret:td.apikey} ++incremental_ingest_project_revision: + +append_project_revision_history: + td>: + query: select * from ${td.tables.revisions} + database: ${td.database} + insert_into: ${td.tables.revisions_history} + + +get_all_project: + td>: + query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as project_ids from (select id from projects group by id) + store_last_results: true + + +ingest_project_revision: + py>: scripts.ingest_revision.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.revisions} + project_ids: ${td.last_results.project_ids} + api_endpoint: ${td.api_endpoint} + workflow_endpoint: ${td.workflow_endpoint} + docker: + image: "digdag/digdag-python:3.10.1" + _env: + TD_API_KEY: ${secret:td.apikey} + +ingest_incremental_attempt: +update_old_attempt: +check_old_attempt_with_runnnig: @@ -104,7 +129,7 @@ schedule: +ingest_incremental_session: +check_max_session_id: td>: - query: select max(id) as max_id, max(time) as last_datetime from ${td.tables.sessions} + query: select max(cast(id as INTEGER)) as max_id, max(time) as last_datetime from ${td.tables.sessions} store_last_results: true database: ${td.database} diff --git a/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig b/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig index 81848522..147cd3c6 100644 --- a/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig +++ b/scenarios/monitoring/workflow_monitoring/initial_ingest_session_attempt.dig @@ -35,6 +35,23 @@ _export: session_unixtime: ${session_unixtime} dest_db: ${td.database} dest_table: ${td.tables.schedules} + docker: + image: "digdag/digdag-python:3.10.1" + _env: + TD_API_KEY: ${secret:td.apikey} + ++initial_ingest_project_revision: + +get_all_project: + td>: + query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as project_ids from (select id from projects group by id) + store_last_results: true + + +ingest_project_revision: + py>: scripts.ingest_revision.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.revisions} + project_ids: ${td.last_results.project_ids} api_endpoint: ${td.api_endpoint} workflow_endpoint: ${td.workflow_endpoint} docker: diff --git a/scenarios/monitoring/workflow_monitoring/manual_ingest.dig b/scenarios/monitoring/workflow_monitoring/manual_ingest.dig index 9157dccf..26773997 100644 --- a/scenarios/monitoring/workflow_monitoring/manual_ingest.dig +++ b/scenarios/monitoring/workflow_monitoring/manual_ingest.dig @@ -3,9 +3,6 @@ _export: attempt_ids: - 605508773 - 605506079 - project_ids: - - 627610 - - 686558 +manual_ingest_attempt_task: py>: scripts.ingest_task.run @@ -20,15 +17,3 @@ _export: _env: TD_API_KEY: ${secret:td.apikey} -+manual_ingest_project_revision: - py>: scripts.ingest_revision.run - session_unixtime: ${session_unixtime} - dest_db: ${td.database} - dest_table: ${td.tables.revisions} - project_ids: ${project_ids} - api_endpoint: ${td.api_endpoint} - workflow_endpoint: ${td.workflow_endpoint} - docker: - image: "digdag/digdag-python:3.10.1" - _env: - TD_API_KEY: ${secret:td.apikey} \ No newline at end of file diff --git a/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py b/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py index 5bd5c248..8e6f4b3f 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/ingest_revision.py @@ -26,7 +26,7 @@ def insert_revision_info(import_unixtime, endpoint, apikey, dest_db, dest_table, client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack') def run(session_unixtime, dest_db, dest_table, project_ids, api_endpoint='api.treasuredata.com', workflow_endpoint='api-workflow.treasuredata.com'): - id_list = project_ids[1:-1].split(',') + id_list = project_ids.split(',') if len(id_list) == 0: print('no project id') return diff --git a/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py b/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py index c2bb396e..4576b23e 100644 --- a/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py +++ b/scenarios/monitoring/workflow_monitoring/scripts/update_attempt.py @@ -48,6 +48,9 @@ def run(session_unixtime, dest_db, dest_table, ids, api_endpoint='api.treasureda print('no update record') return id_list = ids.split(',') + if len(id_list) == 0: + print('no update record') + return delete_attempt_info(api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, id_list) workflow_url = 'https://%s/api/attempts' % workflow_endpoint + '/%s'