Skip to content

Commit

Permalink
Merge pull request #8 from RENCI/Remove-ASGS
Browse files Browse the repository at this point in the history
updating to use the new unified database (apsviz + asgs_dashboard -> …
  • Loading branch information
PhillipsOwen authored Feb 2, 2024
2 parents aecda09 + 0affcfb commit 5acc2ad
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 49 deletions.
39 changes: 13 additions & 26 deletions src/common/pg_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_job_defs(self):
sql: str = 'SELECT public.get_supervisor_job_defs_json()'

# get the data
ret_val = self.exec_sql('asgs', sql)
ret_val = self.exec_sql('apsviz', sql)

# return the data
return ret_val
Expand All @@ -73,7 +73,7 @@ def get_job_order(self, workflow_type: str):
sql: str = f"SELECT public.get_supervisor_job_order('{workflow_type}')"

# get the data
ret_val = self.exec_sql('asgs', sql)
ret_val = self.exec_sql('apsviz', sql)

# return the data
return ret_val
Expand All @@ -87,27 +87,14 @@ def reset_job_order(self, workflow_type_name: str) -> bool:

# declare an array of the job id and next job type id in sequence
workflow_job_types: dict = {
'ASGS': [
# record id, next job type
# -------------------------
'1, 23', # staging
'15, 30', # adcirc2cog-tiff
'21, 27', # adcirc-to-kalpana-cog
'19, 25', # ast-run-harvester
'17, 24', # obs-mod-ast
# '22, 21, # timeseries_ingest
'16, 19', # geotiff2cog
'11, 20', # load-geo-server
'14, 21' # final-staging
],
'ECFLOW': [
# record id, next job type
# -------------------------
'101, 23', # staging
'104, 30', # adcirc2cog-tiff
'111, 25', # adcirc-to-kalpana-cog
'106, 24', # obs-mod-ast
# '112, 21', # timeseries_ingest
'112, 21', # timeseries_ingest
'105, 19', # geotiff2cog
'102, 29', # load-geo-server
'110, 20', # collab-data-sync
Expand All @@ -127,7 +114,7 @@ def reset_job_order(self, workflow_type_name: str) -> bool:
sql = f"SELECT public.update_next_job_for_job({item}, '{workflow_type_name}')"

# and execute it
ret_val = self.exec_sql('asgs', sql)
ret_val = self.exec_sql('apsviz', sql)

# anything other than a list returned is an error
if ret_val != 0:
Expand All @@ -136,7 +123,7 @@ def reset_job_order(self, workflow_type_name: str) -> bool:

# if there were no errors, commit the updates
if not failed:
self.commit('asgs')
self.commit('apsviz')

# return to the caller
return failed
Expand All @@ -152,7 +139,7 @@ def get_run_list(self):
sql: str = 'SELECT public.get_supervisor_run_list()'

# return the data
return self.exec_sql('asgs', sql)
return self.exec_sql('apsviz', sql)

def update_next_job_for_job(self, job_name: str, next_process_id: int, workflow_type_name: str):
"""
Expand All @@ -168,11 +155,11 @@ def update_next_job_for_job(self, job_name: str, next_process_id: int, workflow_
sql = f"SELECT public.update_next_job_for_job('{job_name}', {next_process_id}, '{workflow_type_name}')"

# run the SQL
ret_val = self.exec_sql('asgs', sql)
ret_val = self.exec_sql('apsviz', sql)

# if there were no errors, commit the updates
if ret_val > -1:
self.commit('asgs')
self.commit('apsviz')

def update_job_image_version(self, job_name: str, image: str):
"""
Expand All @@ -187,11 +174,11 @@ def update_job_image_version(self, job_name: str, image: str):
sql = f"SELECT public.update_job_image('{job_name}', '{image}')"

# run the SQL
ret_val = self.exec_sql('asgs', sql)
ret_val = self.exec_sql('apsviz', sql)

# if there were no errors, commit the updates
if ret_val > -1:
self.commit('asgs')
self.commit('apsviz')

def update_run_status(self, instance_id: int, uid: str, status: str):
"""
Expand All @@ -208,11 +195,11 @@ def update_run_status(self, instance_id: int, uid: str, status: str):
sql = f"SELECT public.set_config_item({instance_id}, '{uid}', 'supervisor_job_status', '{status}')"

# run the SQL
ret_val = self.exec_sql('asgs', sql)
ret_val = self.exec_sql('apsviz', sql)

# if there were no errors, commit the updates
if ret_val > -1:
self.commit('asgs')
self.commit('apsviz')

def get_run_props(self, instance_id: int, uid: str):
"""
Expand All @@ -224,7 +211,7 @@ def get_run_props(self, instance_id: int, uid: str):
sql: str = f"SELECT * FROM public.get_run_prop_items_json({instance_id}, '{uid}')"

# get the data
ret_val = self.exec_sql('asgs', sql)
ret_val = self.exec_sql('apsviz', sql)

# check the result
if ret_val == -1:
Expand Down
1 change: 0 additions & 1 deletion src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ class WorkflowTypeName(str, Enum):
"""
Class enums for the supervisor workflow names
"""
ASGS = 'ASGS'
ECFLOW = 'ECFLOW'
HECRAS = 'HECRAS'

Expand Down
31 changes: 11 additions & 20 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

# specify the DB to get a connection
# note the extra comma makes this single item a singleton tuple
db_names: tuple = ('asgs',)
db_names: tuple = ('apsviz',)

# create a DB connection object
db_info: PGImplementation = PGImplementation(db_names, _logger=logger)
Expand Down Expand Up @@ -230,17 +230,12 @@ async def reset_job_order(workflow_type_name: WorkflowTypeName) -> json:
"""
Resets the job process order to the default for the workflow selected.
The normal sequence of ASGS jobs are:
staging -> adcirc to COGs -> adcirc Kalpana to COGs -> ast run harvester -> adcirc Time to COGs -> obs-mod-ast -> compute COGs geo-tiffs ->
load geoserver -> final staging -> complete
The normal sequence of ECFLOW jobs are:
staging -> adcirc to COGs -> adcirc Kalpana to COGs -> adcirc Time to COGs -> obs-mod-ast -> compute COGs geo-tiffs -> load geoserver ->
collaborator data sync -> final staging -> complete
The normal sequence of HECRAS jobs are
load geoserver from S3 -> complete
The normal sequence of ECFLOW jobs is:
staging -> ADCIRC to COG TIFFs -> ADCIRC Kalpana to COGs -> AST run harvester -> Obs/Mod AST -> Compute COGs to geo-TIFFs -> Timeseries ingest ->
Load geoserver -> Collaborator data sync -> Final staging -> Complete
The normal sequence of HECRAS jobs is
Load geoserver from S3 -> Complete
"""

# init the returned html status code
Expand Down Expand Up @@ -550,16 +545,12 @@ async def set_the_supervisor_job_order(workflow_type_name: WorkflowTypeName, job
Modifies the supervisor component's linked list of jobs. Select the workflow type, then select the job process name and the next job
process name.
The normal sequence of ASGS jobs are:
staging -> adcirc to COGs -> adcirc Kalpana to COGs -> ast run harvester -> adcirc Time to COGs -> obs-mod-ast -> compute COGs geo-tiffs ->
load geoserver -> final staging -> complete
The normal sequence of ECFLOW jobs are:
staging -> adcirc to COGs -> adcirc Kalpana to COGs -> adcirc Time to COGs -> obs-mod-ast -> compute COGs geo-tiffs -> load geoserver ->
collaborator data sync -> final staging -> complete
The normal sequence of ECFLOW jobs is:
staging -> ADCIRC to COG TIFFs -> ADCIRC Kalpana to COGs -> AST run harvester -> Obs/Mod AST -> Compute COGs to geo-TIFFs -> Timeseries ingest ->
Load geoserver -> Collaborator data sync -> Final staging -> Complete
The normal sequence of HECRAS jobs are
load geoserver from S3 -> complete
The normal sequence of HECRAS jobs is
Load geoserver from S3 -> Complete
"""
# init the returned html status code
status_code = 200
Expand Down
4 changes: 2 additions & 2 deletions src/test/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_access():
auth_header: dict = {'Content-Type': 'application/json', 'Authorization': f'Bearer {token["access_token"]}'}

# execute the post
ret_val = requests.get('http://localhost:4000/get_job_order/ASGS', headers=auth_header, timeout=10)
ret_val = requests.get('http://localhost:4000/get_job_order/ECFLOW', headers=auth_header, timeout=10)

# was the call unsuccessful
# was the call unsuccessful?
assert ret_val.status_code == 200

0 comments on commit 5acc2ad

Please sign in to comment.