Skip to content

Commit

Permalink
- add ingest_revision to daily batch. (#403)
Browse files Browse the repository at this point in the history
- fix small bug.

Co-authored-by: Yuu Ohmura <[email protected]>
  • Loading branch information
o-mura and Yuu Ohmura authored Jun 28, 2024
1 parent 68ef90f commit 3c5f135
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 20 deletions.
3 changes: 2 additions & 1 deletion scenarios/monitoring/workflow_monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ You should set lower_limit_session_id option (initial_ingest_session task of ini
| schedules | [/api/schedules](https://docs.digdag.io/api/) |
| sessions | [/api/sessions](https://docs.digdag.io/api/) |
| workflows | [/api/workflows](https://docs.digdag.io/api/) |
| revisions | [/api/projects/{id}/revisions](https://docs.digdag.io/api/) |

**projects** , **schedules** and **workflows** table is replaced daily by incremental_ingest.dig.
**projects** , **schedules** , **workflows** and **revisions**table is replaced daily by incremental_ingest.dig.
**attempts** and **sessions** table is imported incrementally.

# Next Step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ td:
attempts: attempts
tasks: tasks
revisions: revisions
revisions_history: revisions_history
api_endpoint: api.treasuredata.com
workflow_endpoint: api-workflow.treasuredata.com
31 changes: 28 additions & 3 deletions scenarios/monitoring/workflow_monitoring/incremental_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ schedule:
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.projects}
api_endpoint: ${td.api_endpoint}
api_endpoint: ${td.api_endpoint}
workflow_endpoint: ${td.workflow_endpoint}
docker:
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}
Expand Down Expand Up @@ -62,6 +62,31 @@ schedule:
_env:
TD_API_KEY: ${secret:td.apikey}

+incremental_ingest_project_revision:
+append_project_revision_history:
td>:
query: select * from ${td.tables.revisions}
database: ${td.database}
insert_into: ${td.tables.revisions_history}

+get_all_project:
td>:
query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as project_ids from (select id from projects group by id)
store_last_results: true

+ingest_project_revision:
py>: scripts.ingest_revision.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.revisions}
project_ids: ${td.last_results.project_ids}
api_endpoint: ${td.api_endpoint}
workflow_endpoint: ${td.workflow_endpoint}
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

+ingest_incremental_attempt:
+update_old_attempt:
+check_old_attempt_with_runnnig:
Expand Down Expand Up @@ -104,7 +129,7 @@ schedule:
+ingest_incremental_session:
+check_max_session_id:
td>:
query: select max(id) as max_id, max(time) as last_datetime from ${td.tables.sessions}
query: select max(cast(id as INTEGER)) as max_id, max(time) as last_datetime from ${td.tables.sessions}
store_last_results: true
database: ${td.database}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ _export:
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.schedules}
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}

+initial_ingest_project_revision:
+get_all_project:
td>:
query: select ARRAY_JOIN(ARRAY_AGG(id), ',') as project_ids from (select id from projects group by id)
store_last_results: true

+ingest_project_revision:
py>: scripts.ingest_revision.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.revisions}
project_ids: ${td.last_results.project_ids}
api_endpoint: ${td.api_endpoint}
workflow_endpoint: ${td.workflow_endpoint}
docker:
Expand Down
15 changes: 0 additions & 15 deletions scenarios/monitoring/workflow_monitoring/manual_ingest.dig
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ _export:
attempt_ids:
- 605508773
- 605506079
project_ids:
- 627610
- 686558

+manual_ingest_attempt_task:
py>: scripts.ingest_task.run
Expand All @@ -20,15 +17,3 @@ _export:
_env:
TD_API_KEY: ${secret:td.apikey}

+manual_ingest_project_revision:
py>: scripts.ingest_revision.run
session_unixtime: ${session_unixtime}
dest_db: ${td.database}
dest_table: ${td.tables.revisions}
project_ids: ${project_ids}
api_endpoint: ${td.api_endpoint}
workflow_endpoint: ${td.workflow_endpoint}
docker:
image: "digdag/digdag-python:3.10.1"
_env:
TD_API_KEY: ${secret:td.apikey}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def insert_revision_info(import_unixtime, endpoint, apikey, dest_db, dest_table,
client.load_table_from_dataframe(df, dest_table, if_exists='append', fmt='msgpack')

def run(session_unixtime, dest_db, dest_table, project_ids, api_endpoint='api.treasuredata.com', workflow_endpoint='api-workflow.treasuredata.com'):
id_list = project_ids[1:-1].split(',')
id_list = project_ids.split(',')
if len(id_list) == 0:
print('no project id')
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def run(session_unixtime, dest_db, dest_table, ids, api_endpoint='api.treasureda
print('no update record')
return
id_list = ids.split(',')
if len(id_list) == 0:
print('no update record')
return
delete_attempt_info(api_endpoint, os.environ['TD_API_KEY'], dest_db, dest_table, id_list)

workflow_url = 'https://%s/api/attempts' % workflow_endpoint + '/%s'
Expand Down

0 comments on commit 3c5f135

Please sign in to comment.