Skip to content

Commit

Permalink
Use extra_data to propagate HW Constraints node data (#58)
Browse files Browse the repository at this point in the history
* Refs #22027: Use user_input extra_data to propagate HW Constraints node data

Signed-off-by: eProsima <[email protected]>

* Refs #22027: Adapt python back-end orchestrator node for results requests

Signed-off-by: eProsima <[email protected]>

* Refs #22027: Replace sleeps with condition variable waits

Signed-off-by: eProsima <[email protected]>

* Refs #22027: Fix and adapt python back-end node to Front-end requirements

Signed-off-by: eProsima <[email protected]>

* Refs #22027: Update WP2 submodule

Signed-off-by: eProsima <[email protected]>

* Refs #22027: Properly install sustainml wp4 and 5

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: eProsima <[email protected]>
Signed-off-by: Mario Dominguez <[email protected]>
Co-authored-by: Mario Dominguez <[email protected]>
  • Loading branch information
JesusPoderoso and Mario-DL authored Dec 12, 2024
1 parent 9c6f91f commit d842bc5
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 71 deletions.
2 changes: 2 additions & 0 deletions sustainml_modules/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package_name + '/sustainml-wp1',
package_name + '/sustainml-wp2',
package_name + '/sustainml-wp3',
package_name + '/sustainml-wp5',
package_name + '/sustainml-wp4',
]

setup(
Expand Down
2 changes: 1 addition & 1 deletion sustainml_modules/sustainml_modules/sustainml-wp2
36 changes: 22 additions & 14 deletions sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import threading
import time
import signal
import sustainml_swig
import sys
from orchestrator_node import orchestrator_node, utils
from werkzeug.serving import make_server
Expand All @@ -30,27 +31,28 @@
# Flask server default route
@server.route('/')
def hello_world():
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.<br>'}), 200
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.'}), 200

# Send user input data to orchestrator
@server.route('/user_input', methods=['POST'])
def user_input():
data = request.json
ret = orchestrator.send_user_input(data)
if not ret:
task_id = orchestrator.send_user_input(data)
if task_id is None:
return jsonify({'error': 'Invalid input data'}), 400
return jsonify({'message': 'User input data sent successfully.<br>'}), 200
return jsonify({'message': 'User input data sent successfully.',
'task_id': utils.task_json(task_id)}), 200

# Retrieve Node status methods
@server.route('/status', methods=['GET'])
def status():
return jsonify({'status': f'{orchestrator.get_all_status()}'}), 200
return jsonify({'status': orchestrator.get_all_status()}), 200

@server.route('/status', methods=['POST'])
def status_args():
data = request.json
node_id = data.get('node_id')
return jsonify({'status': f'{orchestrator.get_status(node_id)}'}), 200
return jsonify({'status': orchestrator.get_status(node_id)}), 200

# Retrieve Node results methods
@server.route('/results', methods=['GET'])
Expand All @@ -64,20 +66,26 @@ def results():
model = orchestrator.get_results(utils.node_id.ML_MODEL_PROVIDER.value, last_task_id)
hardware = orchestrator.get_results(utils.node_id.HW_PROVIDER.value, last_task_id)
carbontracker = orchestrator.get_results(utils.node_id.CARBONTRACKER.value, last_task_id)
json = {f'{utils.string_node(utils.node_id.APP_REQUIREMENTS.value)}': f'{app_req}',
f'{utils.string_node(utils.node_id.ML_MODEL_METADATA.value)}': f'{metadata}',
f'{utils.string_node(utils.node_id.HW_CONSTRAINTS.value)}': f'{constraints}',
f'{utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)}': f'{model}',
f'{utils.string_node(utils.node_id.HW_PROVIDER.value)}': f'{hardware}',
f'{utils.string_node(utils.node_id.CARBONTRACKER.value)}': f'{carbontracker}'}
task_json = {'problem_id': last_task_id.problem_id(), 'iteration_id': last_task_id.iteration_id()}
json = {utils.string_node(utils.node_id.APP_REQUIREMENTS.value): app_req,
utils.string_node(utils.node_id.ML_MODEL_METADATA.value): metadata,
utils.string_node(utils.node_id.HW_CONSTRAINTS.value): constraints,
utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value): model,
utils.string_node(utils.node_id.HW_PROVIDER.value): hardware,
utils.string_node(utils.node_id.CARBONTRACKER.value): carbontracker,
'task_id': task_json}
return jsonify(json), 200

@server.route('/results', methods=['POST'])
def results_args():
data = request.json
node_id = data.get('node_id')
task_id = data.get('task_id')
return jsonify({f'{utils.string_node(node_id)}': f'{orchestrator.get_results(node_id, task_id)}'}), 200
json_task = data.get('task_id')
if json_task is not None:
task_id = sustainml_swig.set_task_id(json_task.get('problem_id', 0), json_task.get('iteration_id', 0))
else:
task_id = None
return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200

# Flask server shutdown route
@server.route('/shutdown', methods=['GET'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@
"""SustainML Orchestrator Node API specification."""

from . import utils
import numpy as np

from sustainml_swig import OrchestratorNodeHandle as cpp_OrchestratorNodeHandle
from sustainml_swig import OrchestratorNode as cpp_OrchestratorNode
from sustainml_swig import NodeStatus
import sustainml_swig
import threading

class OrchestratorNodeHandle(cpp_OrchestratorNodeHandle):

def __init__(self):

self.node_status_ = {}
self.condition = threading.Condition()
self.last_task_id = None
self.node_status_ = {}
self.result_status = {}
# Parent class constructor
super().__init__()

Expand All @@ -46,14 +50,37 @@ def on_new_node_output(
self,
id : int,
data):
task = sustainml_swig.get_task_id(id, data)
if (self.last_task_id is None and task is not None) or (
self.last_task_id is not None and task is not None and task > self.last_task_id):
self.last_task_id = task
if task is None:
task_id = sustainml_swig.get_task_id(id, data)
if task_id is None:
print(utils.string_node(id), "node output received.")
else:
print(utils.string_node(id), "node output received from task", utils.string_task(task))
print(utils.string_node(id), "node output received from task", utils.string_task(task_id))
self.register_result(task_id, id)

def register_task(self, task_id):
with self.condition:
if (self.last_task_id is None and task_id is not None) or (
self.last_task_id is not None and task_id is not None and task_id > self.last_task_id):
self.last_task_id = task_id
self.result_status[utils.string_task(task_id)] = {
utils.node_id.APP_REQUIREMENTS.value: False,
utils.node_id.CARBONTRACKER.value: False,
utils.node_id.HW_CONSTRAINTS.value: False,
utils.node_id.HW_PROVIDER.value: False,
utils.node_id.ML_MODEL_METADATA.value: False,
utils.node_id.ML_MODEL_PROVIDER.value: False
}

def register_result(self, task_id, node_id):
with self.condition:
if utils.string_task(task_id) not in self.result_status:
self.register_task(task_id)
self.result_status[utils.string_task(task_id)][node_id] = True
self.condition.notify_all()

def results_available(self, task_id, node_id):
with self.condition:
return self.result_status[utils.string_task(task_id)].get(node_id, False)

class Orchestrator:

Expand All @@ -76,110 +103,150 @@ def get_last_task_id(self):
return self.handler_.last_task_id

def get_all_status(self):
output = ""
json_output = {}
for key, value in self.handler_.node_status_.items():
output += utils.string_node(key) + " node status " + utils.string_status(value) + "<br>"
if output == "":
output = "No nodes have reported their status yet.\n"
return output
json_output[utils.string_node(key)] = utils.string_status(value)
return json_output

def get_status(self, node_id):
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
if node_id is None:
return self.get_all_status()
else:
return utils.string_status(utils.node_status.INACTIVE.value)
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
else:
return utils.string_status(utils.node_status.INACTIVE.value)

def get_app_requirements(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.APP_REQUIREMENTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_app_requirements(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
app_requirements_str_list = node_data.app_requirements()
json_output = {'app_requirements': f'{utils.string_std_vector(app_requirements_str_list)}<br>'}
json_output = {'task_id': task_json,
'app_requirements': utils.string_std_vector(app_requirements_str_list)}
return json_output

def get_model_metadata(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_METADATA.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_metadata(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
keywords_str_list = node_data.keywords()
metadata_str_list = node_data.ml_model_metadata()
json_output = {'keywords': f'{utils.string_std_vector(keywords_str_list)}<br>',
'metadata': f'{utils.string_std_vector(metadata_str_list)}<br>'}
json_output = {'task_id': task_json,
'keywords': utils.string_std_vector(keywords_str_list),
'metadata': utils.string_std_vector(metadata_str_list)}
return json_output

def get_hw_constraints(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_CONSTRAINTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_constraints(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
max_value = node_data.max_memory_footprint()
required_hardware = node_data.hardware_required()
json_output = {'max_memory_footprint': f'{max_value}<br>',
'hardware_required': f'{utils.string_std_vector(required_hardware)}<br>'}
json_output = {'task_id': task_json,
'max_memory_footprint': max_value,
'hardware_required': utils.string_std_vector(required_hardware)}
return json_output

def get_ml_model_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
model = node_data.model()
model_path = node_data.model_path()
model_properties = node_data.model_properties()
model_properties_path = node_data.model_properties_path()
input_batch = node_data.input_batch()
target_latency = node_data.target_latency()
json_output = {'model': f'{model}<br>',
'model_path': f'{model_path}<br>',
'model_properties': f'{model_properties}<br>',
'model_properties_path': f'{model_properties_path}<br>',
'input_batch': f'{utils.string_std_vector(input_batch)}<br>',
'target_latency': f'{target_latency}<br>'}
json_output = {'task_id': task_json,
'model': model,
'model_path': model_path,
'model_properties': model_properties,
'model_properties_path': model_properties_path,
'input_batch': utils.string_std_vector(input_batch),
'target_latency': target_latency}
return json_output

def get_hw_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
hw_description = node_data.hw_description()
power_consumption = node_data.power_consumption()
latency = node_data.latency()
memory_footprint_of_ml_model = node_data.memory_footprint_of_ml_model()
json_output = {'hw_description': f'{hw_description}<br>',
'power_consumption': f'{power_consumption}<br>',
'latency': f'{latency}<br>',
'memory_footprint_of_ml_model': f'{memory_footprint_of_ml_model}<br>'}
json_output = {'task_id': task_json,
'hw_description': hw_description,
'power_consumption': power_consumption,
'latency': latency,
'memory_footprint_of_ml_model': memory_footprint_of_ml_model}
return json_output

def get_carbontracker(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.CARBONTRACKER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_carbontracker(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
carbon_footprint = node_data.carbon_footprint()
energy_consumption = node_data.energy_consumption()
carbon_intensity = node_data.carbon_intensity()
json_output = {'carbon_footprint': f'{carbon_footprint}<br>',
'energy_consumption': f'{energy_consumption}<br>',
'carbon_intensity': f'{carbon_intensity}<br>'}
json_output = {'task_id': task_json,
'carbon_footprint': carbon_footprint,
'energy_consumption': energy_consumption,
'carbon_intensity': carbon_intensity}
return json_output

def get_results(self, node_id, task_id):
if task_id is None:
task_id = self.get_last_task_id()

if node_id == utils.node_id.APP_REQUIREMENTS.value:
return self.get_app_requirements(task_id)
elif node_id == utils.node_id.ML_MODEL_METADATA.value:
Expand All @@ -193,19 +260,40 @@ def get_results(self, node_id, task_id):
elif node_id == utils.node_id.CARBONTRACKER.value:
return self.get_carbontracker(task_id)
else:
return utils.string_node(node_id) + " node does not have any results to show.<br>"
message = utils.string_node(node_id) + " node does not have any results to show."
return {'message': message, 'task_id': utils.task_json(task_id)}

def send_user_input(self, json_data):
pair = self.node_.prepare_new_task()
task_id = pair[0]
user_input = pair[1]
self.handler_.register_task(task_id)

user_input.task_id(task_id)
if (json_data.get('modality') is not None):
user_input.modality(json_data.get('modality'))
if (json_data.get('problem_type') is not None):
user_input.problem_definition(json_data.get('problem_type'))
if (json_data.get('problem_short_description') is not None):
user_input.problem_short_description(json_data.get('problem_short_description'))
#user_input.evaluation_metrics(evaluation_metrics)
#user_input.model(model)

return self.node_.start_task(task_id, user_input)
# TODO add missing fields

# Prepare extra data
hw_req = utils.default_hw_requirement
mem_footprint = utils.default_mem_footprint
if (json_data.get('hardware_required') is not None):
hw_req = json_data.get('hardware_required')
if (json_data.get('max_memory_footprint') is not None):
mem_footprint = json_data.get('max_memory_footprint')

# Add extra data to user user_input
extra_data = {'hardware_required': hw_req,
'max_memory_footprint': mem_footprint}
json_obj = utils.json_dict(extra_data)
data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8)
user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist()))

if self.node_.start_task(task_id, user_input):
return task_id
else:
return None
Loading

0 comments on commit d842bc5

Please sign in to comment.