Skip to content

Commit

Permalink
Merge pull request #109 from Ortec-Finance/bugfixes
Browse files Browse the repository at this point in the history
[Carbon Aware HPC] - 5 - Bugfixes for Multi Cluster Controller
  • Loading branch information
ZeidH committed Jul 12, 2024
1 parent 5088e2c commit 3ded66e
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 43 deletions.
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"-n rdlabs-experiment-cas-eu-west"
],
"env": {
"PROMETHEUS_TOKEN": "GET ME FROM SERVICE ACCOUNT AND PORT FORWARD THANOS",
"KOPF_RUN_ARGS": "--verbose"
},
},
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## v0.40.0
Bugfixes

## v0.39.0
Demo for GSF

Expand Down
5 changes: 3 additions & 2 deletions k8s/cluster-config/operators/sailfish/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ spec:
activatedTargetCluster:
description: The current best destination based on the trigger variables and cost function
type: string
status:
description: If the operator's decision resulted into a fallback, this will be the reason why
type: string
clusters:
type: array
description: "Defines the Remote Sailfish Clusters"
Expand All @@ -117,8 +120,6 @@ spec:
queue:
type: string
description: "To reference a remote Sailfish Cluster, use this, the Queues will be automatically generated"
query:
type: string
status:
type: string
reason:
Expand Down
85 changes: 53 additions & 32 deletions operator/multi-cluster-controller/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from tolerations import Tolerations
from bridge import Bridge
from score import Score
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Dynamically get the namespace the operator is running in
if os.path.isfile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"):
Expand Down Expand Up @@ -95,8 +98,6 @@ def modify_activemq_artemis(spec, name, namespace, uid, logger, **kwargs):
name, namespace, cluster["name"], owner_reference, logger
)

label = [f"ortec-finance.com/sailfish-cluster: {name}"]

logger.info(f"Applying BrokerConfiguration to {SAILFISH_BROKER_NAME}")
bridgeConfig.update_activemq_artemis(
SAILFISH_BROKER_NAME, namespace, patch, logger
Expand All @@ -106,28 +107,30 @@ def modify_activemq_artemis(spec, name, namespace, uid, logger, **kwargs):
logger.warning(f"ActiveMQArtemis {name} not found in namespace {namespace}")


@kopf.on.timer("ortec-finance.com", "v1alpha1", "sailfishclusters", interval=2)
@kopf.on.update("ortec-finance.com", "v1alpha1", "sailfishclusters")
@kopf.on.timer("ortec-finance.com", "v1alpha1", "sailfishclusters", interval=60)
def poll_sailfish_clusters_status(spec, patch, logger, **kwargs):
logger.info("Polling Sailfish Clusters Status")
cluster_statuses = []

## Append Remote Sailfish Clusters
for cluster in spec.get("clusters", []):
queue_name = cluster.get("queue", f"sailfish{cluster['name']}")
query = next(
(
item
for item in spec.get("triggers")
if item["clusterRef"] == cluster["name"]
),
None,
)["query"]
if query:
trigger = None
if "triggers" in spec:
trigger = next(
(
item
for item in spec.get("triggers")
if item["clusterRef"] == cluster["name"]
),
None,
)
if trigger and "query" in trigger:
cluster_statuses.append(
{
"name": cluster["name"],
"queue": queue_name,
"query": query,
"status": "active",
}
)
Expand Down Expand Up @@ -158,29 +161,34 @@ def get_active_sailfish_clusters(sailfish_cluster):
return activeClusters


@kopf.on.timer("ortec-finance.com", "v1alpha1", "sailfishclusters", interval=2)
@kopf.on.update("ortec-finance.com", "v1alpha1", "sailfishclusters")
@kopf.on.timer("ortec-finance.com", "v1alpha1", "sailfishclusters", interval=60)
def poll_sailfish_cluster_best_destination(spec, patch, status, logger, **kwargs):
evaluator = PrometheusEvaluator()
cluster_statuses = []

for cluster in spec.get("clusters", []):
for cluster in status.get("clusters", []):
if cluster["status"] == "inactive":
logger.info(f"Skipping Inactive cluster {cluster['name']}")
continue
trigger_statuses = []
for trigger in spec.get("triggers"):
if trigger["clusterRef"] == cluster["name"]:
value = evaluator.evaluate_query(trigger["query"])
if value is None:
logger.error(
f"Was not able to get a value from the query: {trigger['query']}"
if "triggers" in spec:
for trigger in spec.get("triggers"):
if trigger["clusterRef"] == cluster["name"]:
value = evaluator.evaluate_query(trigger["query"])
if value is None:
logger.error(
f"Was not able to get a value from the query: {trigger['query']}"
)
raise kopf.PermanentError("Value is None. Cannot proceed.")
scaled_value = variableScore.apply_scaler(value, trigger["scaler"])

trigger_statuses.append(
{
"name": trigger["name"],
"value": scaled_value,
}
)
raise kopf.PermanentError("Value is None. Cannot proceed.")
scaled_value = variableScore.apply_scaler(value, trigger["scaler"])

trigger_statuses.append(
{
"name": trigger["name"],
"value": scaled_value,
}
)

score = variableScore.sum_trigger_values(trigger_statuses)

Expand All @@ -203,9 +211,22 @@ def poll_sailfish_cluster_best_destination(spec, patch, status, logger, **kwargs
}
)

reward = variableScore.cost_function(logger, cluster_statuses)
if cluster_statuses:
reward = variableScore.cost_function(logger, cluster_statuses)
scheduler = {
"clusters": cluster_statuses,
"activatedTargetCluster": reward["name"],
"status": "Healthy",
}

else: # Fallback to Local cluster if no triggers are set.
for cluster in spec.get("clusters", []):
if "queue" in cluster:
scheduler = {
"activatedTargetCluster": cluster["name"],
"status": "Unhealthy configuration: No triggers set. Fallback to Local Cluster",
}

scheduler = {"clusters": cluster_statuses, "activatedTargetCluster": reward["name"]}
if "scheduler" in status and status["scheduler"] == scheduler:
logger.info("No change in scheduler status")
else:
Expand Down
6 changes: 3 additions & 3 deletions operator/multi-cluster-controller/prometheus.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import re

from prometheus_api_client import PrometheusConnect


Expand All @@ -20,7 +22,7 @@ def __init__(self):
) as file:
self.namespace = file.read().strip()
else:
TOKEN = "GET ME FROM SERVICE ACCOUNT AND PORT FORWARD THANOS"
TOKEN = os.getenv("PROMETHEUS_TOKEN")

self.prom = PrometheusConnect(
url=url, headers={"Authorization": f"Bearer {TOKEN}"}, disable_ssl=True
Expand Down Expand Up @@ -60,8 +62,6 @@ def evaluate_expression(self, expression):
return self.compare(result_value, operator, float(threshold))

def parse_expression(self, expression):
import re

"""Parse the expression into a query, operator, and threshold."""
# This regex allows spaces around the operator and handles complex queries
match = re.search(
Expand Down
10 changes: 4 additions & 6 deletions operator/multi-cluster-controller/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ def sum_trigger_values(self, trigger_statuses):
result += trigger["value"]
return result

def cost_function(self, logger, cluster_results, operator="MIN"):
if operator == "MIN":
winner = min(cluster_results, key=lambda x: x["score"])
elif operator == "MAX":
winner = max(cluster_results, key=lambda x: x["score"])
else:
def cost_function(self, logger, cluster_results, operator=min):
try:
winner = operator(cluster_results, key=lambda x: x["score"])
except Exception as e:
logger.error("Operator not supported")
raise kopf.PermanentError("Operator not supported")
logger.info(
Expand Down

0 comments on commit 3ded66e

Please sign in to comment.