From 5871342cd9b91fc0206680d8c782abe6308378cf Mon Sep 17 00:00:00 2001 From: Shreshth Tuli Date: Wed, 1 Dec 2021 21:56:05 +0530 Subject: [PATCH] Bug fixes --- .gitignore | 1 + decider/Compression_Only.py | 13 ++ decider/Decider.py | 34 +++- decider/Layer_Only.py | 7 +- decider/Random.py | 9 +- decider/Semantic_Only.py | 7 +- main.py | 78 ++++---- scheduler/IQR_MMT_Random.py | 21 --- scheduler/LR_MMT_Random.py | 23 --- scheduler/MAD_MC_Random.py | 29 --- scheduler/MAD_MMT_Random.py | 24 --- scheduler/RLR_MMT_Random.py | 23 --- .../{Random_Random_Random.py => Random.py} | 8 +- scheduler/Random_Random_FirstFit.py | 14 -- scheduler/Random_Random_LeastFull.py | 15 -- scheduler/Scheduler.py | 166 +----------------- scheduler/Threshold_MC_Random.py | 23 --- serverless/Serverless.py | 127 ++++++++++++++ serverless/datacenter/server/controller.py | 14 ++ serverless/function/Task.py | 37 ++++ serverless/node/node.py | 65 +++++++ serverless/workload/AIBenchWorkload.py | 22 +++ serverless/workload/Workload.py | 5 + utils/Utils.py | 2 + 24 files changed, 371 insertions(+), 396 deletions(-) create mode 100644 decider/Compression_Only.py delete mode 100644 scheduler/IQR_MMT_Random.py delete mode 100644 scheduler/LR_MMT_Random.py delete mode 100644 scheduler/MAD_MC_Random.py delete mode 100644 scheduler/MAD_MMT_Random.py delete mode 100644 scheduler/RLR_MMT_Random.py rename scheduler/{Random_Random_Random.py => Random.py} (50%) delete mode 100644 scheduler/Random_Random_FirstFit.py delete mode 100644 scheduler/Random_Random_LeastFull.py delete mode 100644 scheduler/Threshold_MC_Random.py create mode 100644 serverless/function/Task.py create mode 100644 serverless/node/node.py create mode 100644 serverless/workload/Workload.py diff --git a/.gitignore b/.gitignore index be875a0..81bb517 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +temp/ # C extensions *.so diff --git a/decider/Compression_Only.py b/decider/Compression_Only.py new file mode 100644 index 0000000..0ffd610 --- /dev/null +++ b/decider/Compression_Only.py @@ -0,0 +1,13 @@ +from .Decider import * + +class CompressionOnlyDecider(Decider): + def __init__(self): + super().__init__() + + def decision(self, workflowlist): + results = [] + for CreationID, interval, SLA, application in workflowlist: + choice = self.choices[2] + tasklist = self.createTasks(CreationID, interval, SLA, application, choice) + results += tasklist + return results \ No newline at end of file diff --git a/decider/Decider.py b/decider/Decider.py index 41c4d6d..9b94220 100644 --- a/decider/Decider.py +++ b/decider/Decider.py @@ -1,18 +1,40 @@ -import math -from utils.MathUtils import * -from utils.MathConstants import * -import pandas as pd -from statistics import median -import numpy as np +from serverless.function.Task import * class Decider(): def __init__(self): self.env = None self.choices = ['layer', 'semantic', 'compression'] + self.dataset = list(filter(lambda k: '.md' not in k, os.listdir(SAMPLE_PATH))) + self.dataset = [os.path.join(SAMPLE_PATH, i) for i in self.dataset] def setEnvironment(self, env): self.env = env def decision(self, workflowlist): pass + + def getLayerInputs(self, cid, i): + paths = [] + for d in DSET: + d2 = DSET[0].split('.')[0] + paths.append(f'./temp/{d2}_L_{cid}_{i}.jpg') + return paths + + def createTasks(cid, interval, SLA, application, choice): + tasklist = [] + if choice == 'semantic': + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 0, [], [self.dataset[0]])) + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 1, [], [self.dataset[1]])) + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 2, [], [self.dataset[2]])) + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 3, [], [self.dataset[3]])) + elif choice == 'layer': + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 0, [], self.dataset)) + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 1, [0], self.getLayerInputs(cid, 0))) + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 2, [0, 1], self.getLayerInputs(cid, 1))) + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 3, [0, 1, 2], self.getLayerInputs(cid, 2))) + elif choice == 'compression': + tasklist.append(Task(cid, interval, SLA, application, choice, self.env, 0, [], self.dataset*2)) + return tasklist + + \ No newline at end of file diff --git a/decider/Layer_Only.py b/decider/Layer_Only.py index 3439b89..5cf7021 100644 --- a/decider/Layer_Only.py +++ b/decider/Layer_Only.py @@ -5,4 +5,9 @@ def __init__(self): super().__init__() def decision(self, workflowlist): - return [self.choices[0]] * len(workflowlist) \ No newline at end of file + results = [] + for CreationID, interval, SLA, application in workflowlist: + choice = self.choices[0] + tasklist = self.createTasks(CreationID, interval, SLA, application, choice) + results += tasklist + return results \ No newline at end of file diff --git a/decider/Random.py b/decider/Random.py index 06b009f..ce132d5 100644 --- a/decider/Random.py +++ b/decider/Random.py @@ -1,9 +1,14 @@ from .Decider import * -import random +import numpy as np class RandomDecider(Decider): def __init__(self): super().__init__() def decision(self, workflowlist): - return random.choices(self.choices, k = len(workflowlist)) \ No newline at end of file + results = [] + for CreationID, interval, SLA, application in workflowlist: + choice = np.random.choice(self.choices) + tasklist = self.createTasks(CreationID, interval, SLA, application, choice) + results += tasklist + return results \ No newline at end of file diff --git a/decider/Semantic_Only.py b/decider/Semantic_Only.py index 594b146..9502304 100644 --- a/decider/Semantic_Only.py +++ b/decider/Semantic_Only.py @@ -5,4 +5,9 @@ def __init__(self): super().__init__() def decision(self, workflowlist): - return [self.choices[1]] * len(workflowlist) \ No newline at end of file + results = [] + for CreationID, interval, SLA, application in workflowlist: + choice = self.choices[1] + tasklist = self.createTasks(CreationID, interval, SLA, application, choice) + results += tasklist + return results \ No newline at end of file diff --git a/main.py b/main.py index 15df291..adfe7c3 100644 --- a/main.py +++ b/main.py @@ -23,17 +23,10 @@ from decider.Random import RandomDecider from decider.Layer_Only import LayerOnlyDecider from decider.Semantic_Only import SemanticOnlyDecider +from decider.Compression_Only import CompressionOnlyDecider # Scheduler imports -from scheduler.IQR_MMT_Random import IQRMMTRScheduler -from scheduler.MAD_MMT_Random import MADMMTRScheduler -from scheduler.MAD_MC_Random import MADMCRScheduler -from scheduler.LR_MMT_Random import LRMMTRScheduler -from scheduler.Random_Random_FirstFit import RFScheduler -from scheduler.Random_Random_LeastFull import RLScheduler -from scheduler.RLR_MMT_Random import RLRMMTRScheduler -from scheduler.Threshold_MC_Random import TMCRScheduler -from scheduler.Random_Random_Random import RandomScheduler +from scheduler.Random import RandomScheduler # Auxiliary imports from stats.Stats import * @@ -55,7 +48,7 @@ CONTAINERS = HOSTS TOTAL_POWER = 1000 ROUTER_BW = 10000 -INTERVAL_TIME = 300 # seconds +INTERVAL_TIME = 5 # seconds NEW_CONTAINERS = 0 if HOSTS == 10 else 5 DB_NAME = '' DB_HOST = '' @@ -71,37 +64,32 @@ def initalizeEnvironment(environment, mode): ''' Can be AzureDatacenter ''' datacenter = eval(environment+'Datacenter(mode)') hostlist = datacenter.generateHosts() - exit() # Initialize workload - ''' Can be SWSD, BWGD2, Azure2017Workload, Azure2019Workload // DFW, AIoTW ''' - if environment != '': - workload = DFW(NEW_CONTAINERS, 1.5, db) - else: - workload = BWGD2(NEW_CONTAINERS, 1.5) - + ''' Can be AIBench ''' + workload = AIBenchWorkload(NEW_CONTAINERS, 1.5) + + # Initialize decider + ''' Can be Random ''' + decider = RandomDecider() + # Initialize scheduler - ''' Can be LRMMTR, RF, RL, RM, Random, RLRMMTR, TMCR, TMMR, TMMTR, GA, GOBI (arg = 'energy_latency_'+str(HOSTS)) ''' - scheduler = GOBIScheduler('energy_latency_'+str(HOSTS)) # GOBIScheduler('energy_latency_'+str(HOSTS)) + ''' Can be Random ''' + scheduler = RandomScheduler() # Initialize Environment - if environment != '': - env = Framework(scheduler, CONTAINERS, INTERVAL_TIME, hostlist, db, environment, logger) - else: - env = Simulator(TOTAL_POWER, ROUTER_BW, scheduler, CONTAINERS, INTERVAL_TIME, hostlist) + env = Serverless(scheduler, decider, INTERVAL_TIME, hostlist, environment) # Execute first step - newcontainerinfos = workload.generateNewContainers(env.interval) # New containers info - deployed = env.addContainersInit(newcontainerinfos) # Deploy new containers and get container IDs - start = time() - decision = scheduler.placement(deployed) # Decide placement using container ids - schedulingTime = time() - start - migrations = env.allocateInit(decision) # Schedule containers - workload.updateDeployedContainers(env.getCreationIDs(migrations, deployed)) # Update workload allocated using creation IDs - print("Deployed containers' creation IDs:", env.getCreationIDs(migrations, deployed)) + workloadlist = workload.generateNewContainers(env.interval) # New containers info + newtasklist = decider.decision(workloadlist) + decision, schedulingTime = scheduler.placement(newtasklist) # Decide placement using task objects + numdep = env.allocateInit(newtasklist, decision) # Schedule functions + print("New Tasks Size:", len(newtasklist)) + print("Waiting List Size:", len(env.waitinglist)) print("Containers in host:", env.getContainersInHosts()) - print("Schedule:", env.getActiveContainerList()) - printDecisionAndMigrations(decision, migrations) + print("Deployed:", numdep, "of", len(env.waitinglist + newtasklist)) + print("Decision:", decision) # Initialize stats stats = Stats(env, workload, datacenter, scheduler) @@ -109,22 +97,16 @@ def initalizeEnvironment(environment, mode): return datacenter, workload, scheduler, env, stats def stepSimulation(workload, scheduler, env, stats): - newcontainerinfos = workload.generateNewContainers(env.interval) # New containers info - if opts.env != '': print(newcontainerinfos) - deployed, destroyed = env.addContainers(newcontainerinfos) # Deploy new containers and get container IDs - start = time() - selected = scheduler.selection() # Select container IDs for migration - decision = scheduler.filter_placement(scheduler.placement(selected+deployed)) # Decide placement for selected container ids - schedulingTime = time() - start - migrations = env.simulationStep(decision) # Schedule containers - workload.updateDeployedContainers(env.getCreationIDs(migrations, deployed)) # Update workload deployed using creation IDs - print("Deployed containers' creation IDs:", env.getCreationIDs(migrations, deployed)) - print("Deployed:", len(env.getCreationIDs(migrations, deployed)), "of", len(newcontainerinfos), [i[0] for i in newcontainerinfos]) - print("Destroyed:", len(destroyed), "of", env.getNumActiveContainers()) + workloadlist = workload.generateNewContainers(env.interval) # New containers info + newtasklist = decider.decision(workloadlist) + decision, schedulingTime = scheduler.placement(env.waitinglist + newtasklist) # Decide placement using task objects + numdep = env.simulationStep(newtasklist, decision) # Schedule containers + print("New Tasks Size:", len(newtasklist)) + print("Waiting List Size:", len(env.waitinglist)) print("Containers in host:", env.getContainersInHosts()) - print("Num active containers:", env.getNumActiveContainers()) - print("Host allocation:", [(c.getHostID() if c else -1)for c in env.containerlist]) - printDecisionAndMigrations(decision, migrations) + print("Deployed:", numdep, "of", len(env.waitinglist + newtasklist)) + print("Destroyed:", numdes, "of", len(env.activetasklist)) + print("Decision:", decision) stats.saveStats(deployed, migrations, destroyed, selected, decision, schedulingTime) diff --git a/scheduler/IQR_MMT_Random.py b/scheduler/IQR_MMT_Random.py deleted file mode 100644 index 0bfce16..0000000 --- a/scheduler/IQR_MMT_Random.py +++ /dev/null @@ -1,21 +0,0 @@ -from .Scheduler import * - -class IQRMMTRScheduler(Scheduler): - def __init__(self): - super().__init__() - self.utilHistory = [] - - def updateUtilHistory(self): - hostUtils = [] - for host in self.env.hostlist: - hostUtils.append(host.getCPU()) - self.utilHistory.append(hostUtils) - - def selection(self): - self.updateUtilHistory() - selectedHostIDs = self.IQRSelection(self.utilHistory) - selectedVMIDs = self.MMTContainerSelection(selectedHostIDs) - return selectedVMIDs - - def placement(self, containerIDs): - return self.RandomPlacement(containerIDs) diff --git a/scheduler/LR_MMT_Random.py b/scheduler/LR_MMT_Random.py deleted file mode 100644 index 492e055..0000000 --- a/scheduler/LR_MMT_Random.py +++ /dev/null @@ -1,23 +0,0 @@ -from .Scheduler import * -import numpy as np -from copy import deepcopy - -class LRMMTRScheduler(Scheduler): - def __init__(self): - super().__init__() - self.utilHistory = [] - - def updateUtilHistory(self): - hostUtils = [] - for host in self.env.hostlist: - hostUtils.append(host.getCPU()) - self.utilHistory.append(hostUtils) - - def selection(self): - self.updateUtilHistory() - selectedHostIDs = self.LRSelection(self.utilHistory) - selectedVMIDs = self.MMTContainerSelection(selectedHostIDs) - return selectedVMIDs - - def placement(self, containerIDs): - return self.RandomPlacement(containerIDs) \ No newline at end of file diff --git a/scheduler/MAD_MC_Random.py b/scheduler/MAD_MC_Random.py deleted file mode 100644 index dbb4c30..0000000 --- a/scheduler/MAD_MC_Random.py +++ /dev/null @@ -1,29 +0,0 @@ -from .Scheduler import * -import numpy as np -from copy import deepcopy - - -class MADMCRScheduler(Scheduler): - def __init__(self): - super().__init__() - self.utilHistory = [] - self.utilHistoryContainer= [] - - def updateUtilHistoryContainer(self): - containerUtil = [(cid.getBaseIPS() if cid else 0) for cid in self.env.containerlist] - self.utilHistoryContainer.append(containerUtil) - - def updateUtilHistory(self): - hostUtils = [] - for host in self.env.hostlist: - hostUtils.append(host.getCPU()) - self.utilHistory.append(hostUtils) - - def selection(self): - self.updateUtilHistoryContainer() - selectedHostIDs = self.ThresholdHostSelection() - selectedVMIDs = self.MaxCorContainerSelection(selectedHostIDs,self.utilHistoryContainer) - return selectedVMIDs - - def placement(self, containerIDs): - return self.RandomPlacement(containerIDs) diff --git a/scheduler/MAD_MMT_Random.py b/scheduler/MAD_MMT_Random.py deleted file mode 100644 index 2c5bf13..0000000 --- a/scheduler/MAD_MMT_Random.py +++ /dev/null @@ -1,24 +0,0 @@ -from .Scheduler import * -import numpy as np -from copy import deepcopy - - -class MADMMTRScheduler(Scheduler): - def __init__(self): - super().__init__() - self.utilHistory = [] - - def updateUtilHistory(self): - hostUtils = [] - for host in self.env.hostlist: - hostUtils.append(host.getCPU()) - self.utilHistory.append(hostUtils) - - def selection(self): - self.updateUtilHistory() - selectedHostIDs = self.MADSelection(self.utilHistory) - selectedVMIDs = self.MMTContainerSelection(selectedHostIDs) - return selectedVMIDs - - def placement(self, containerIDs): - return self.RandomPlacement(containerIDs) diff --git a/scheduler/RLR_MMT_Random.py b/scheduler/RLR_MMT_Random.py deleted file mode 100644 index e762c8d..0000000 --- a/scheduler/RLR_MMT_Random.py +++ /dev/null @@ -1,23 +0,0 @@ -from .Scheduler import * -import numpy as np -from copy import deepcopy - -class RLRMMTRScheduler(Scheduler): - def __init__(self): - super().__init__() - self.utilHistory = [] - - def updateUtilHistory(self): - hostUtils = [] - for host in self.env.hostlist: - hostUtils.append(host.getCPU()) - self.utilHistory.append(hostUtils) - - def selection(self): - self.updateUtilHistory() - selectedHostIDs = self.RLRSelection(self.utilHistory) - selectedVMIDs = self.MMTContainerSelection(selectedHostIDs) - return selectedVMIDs - - def placement(self, containerIDs): - return self.RandomPlacement(containerIDs) \ No newline at end of file diff --git a/scheduler/Random_Random_Random.py b/scheduler/Random.py similarity index 50% rename from scheduler/Random_Random_Random.py rename to scheduler/Random.py index 659e5bb..1fc4f88 100644 --- a/scheduler/Random_Random_Random.py +++ b/scheduler/Random.py @@ -6,8 +6,6 @@ class RandomScheduler(Scheduler): def __init__(self): super().__init__() - def selection(self): - return self.RandomContainerSelection() - - def placement(self, containerIDs): - return self.RandomPlacement(containerIDs) \ No newline at end of file + def placement(self, tasks): + start = time() + return self.RandomPlacement(tasks), time() - start \ No newline at end of file diff --git a/scheduler/Random_Random_FirstFit.py b/scheduler/Random_Random_FirstFit.py deleted file mode 100644 index 391fab0..0000000 --- a/scheduler/Random_Random_FirstFit.py +++ /dev/null @@ -1,14 +0,0 @@ -from .Scheduler import * -import numpy as np -from copy import deepcopy - - -class RFScheduler(Scheduler): - def __init__(self): - super().__init__() - - def selection(self): - return self.RandomContainerSelection() - - def placement(self, containerIDs): - return self.FirstFitPlacement(containerIDs) diff --git a/scheduler/Random_Random_LeastFull.py b/scheduler/Random_Random_LeastFull.py deleted file mode 100644 index 678ad83..0000000 --- a/scheduler/Random_Random_LeastFull.py +++ /dev/null @@ -1,15 +0,0 @@ -from .Scheduler import * -import numpy as np -from copy import deepcopy - - -class RLScheduler(Scheduler): - def __init__(self): - super().__init__() - - def selection(self): - return self.RandomContainerSelection() - - def placement(self, containerIDs): - return self.MaxFullPlacement(containerIDs) - diff --git a/scheduler/Scheduler.py b/scheduler/Scheduler.py index fd24752..a20aa7e 100644 --- a/scheduler/Scheduler.py +++ b/scheduler/Scheduler.py @@ -4,6 +4,7 @@ import pandas as pd from statistics import median import numpy as np +from time import time class Scheduler(): def __init__(self): @@ -19,169 +20,12 @@ def placement(self, containerlist): pass def filter_placement(self, decision): - filtered_decision = [] - for cid, hid in decision: - if self.env.getContainerByID(cid).getHostID() != hid: - filtered_decision.append((cid, hid)) - return filtered_decision - - def getMigrationFromHost(self, hostID, decision): - containerIDs = [] - for (cid, _) in decision: - hid = self.env.getContainerByID(cid).getHostID() - if hid == hostID: - containerIDs.append(cid) - return containerIDs - - def getMigrationToHost(self, hostID, decision): - containerIDs = [] - for (cid, hid) in decision: - if hid == hostID: - containerIDs.append(cid) - return containerIDs - - # Host selection - - def ThresholdHostSelection(self): - selectedHostIDs = [] - for i, host in enumerate(self.env.hostlist): - if host.getCPU() > 70: - selectedHostIDs.append(i) - return selectedHostIDs - - def LRSelection(self, utilHistory): - if (len(utilHistory) < LOCAL_REGRESSION_BANDWIDTH): - return self.ThresholdHostSelection() - selectedHostIDs = []; x = list(range(LOCAL_REGRESSION_BANDWIDTH)) - for i,host in enumerate(self.env.hostlist): - hostL = [utilHistory[j][i] for j in range(len(utilHistory))] - _, estimates = loess(x, hostL[-LOCAL_REGRESSION_BANDWIDTH:], poly_degree=1, alpha=0.6) - weights = estimates['b'].values[-1] - predictedCPU = weights[0] + weights[1] * (LOCAL_REGRESSION_BANDWIDTH + 1) - if LOCAL_REGRESSION_CPU_MULTIPLIER * predictedCPU >= 100: - selectedHostIDs.append(i) - return selectedHostIDs - - def RLRSelection(self, utilHistory): - if (len(utilHistory) < LOCAL_REGRESSION_BANDWIDTH): - return self.ThresholdHostSelection() - selectedHostIDs = []; x = list(range(LOCAL_REGRESSION_BANDWIDTH)) - for i,host in enumerate(self.env.hostlist): - hostL = [utilHistory[j][i] for j in range(len(utilHistory))] - _, estimates = loess(x, hostL[-LOCAL_REGRESSION_BANDWIDTH:], poly_degree=1, alpha=0.6, robustify=True) - weights = estimates['b'].values[-1] - predictedCPU = weights[0] + weights[1] * (LOCAL_REGRESSION_BANDWIDTH + 1) - if LOCAL_REGRESSION_CPU_MULTIPLIER * predictedCPU >= 100: - selectedHostIDs.append(i) - return selectedHostIDs - - def MADSelection(self, utilHistory): - selectedHostIDs = [] - for i, host in enumerate(self.env.hostlist): - hostL = [utilHistory[j][i] for j in range(len(utilHistory))] - median_hostL = np.median(np.array(hostL)) - mad = np.median([abs(Utilhst-median_hostL) for Utilhst in hostL]) - ThresholdCPU = 100-LOCAL_REGRESSION_CPU_MULTIPLIER * mad - UtilizedCPU = host.getCPU() - if UtilizedCPU > ThresholdCPU: - selectedHostIDs.append(i) - return selectedHostIDs - - def IQRSelection(self, utilHistory): - selectedHostIDs = [] - for i, host in enumerate(self.env.hostlist): - hostL = [utilHistory[j][i] for j in range(len(utilHistory))] - q1, q3 = np.percentile(np.array(hostL), [25, 75]) - IQR = q3-q1 - ThresholdCPU = 100-LOCAL_REGRESSION_CPU_MULTIPLIER * IQR - UtilizedCPU = host.getCPU() - if UtilizedCPU > ThresholdCPU: - selectedHostIDs.append(i) - return selectedHostIDs - - # Container Selection - - def RandomContainerSelection(self): - selectableIDs = self.env.getSelectableContainers() - if selectableIDs == []: return [] - selectedCount = np.random.randint(0, len(selectableIDs)) + 1 - selectedIDs = []; - while len(selectedIDs) < selectedCount: - idChoice = np.random.choice(selectableIDs) - if self.env.containerlist[idChoice]: - selectedIDs.append(idChoice) - selectableIDs.remove(idChoice) - return selectedIDs - - def MMTContainerSelection(self, selectedHostIDs): - selectedContainerIDs = [] - for hostID in selectedHostIDs: - containerIDs = self.env.getContainersOfHost(hostID) - ramSize = [self.env.containerlist[cid].getContainerSize() for cid in containerIDs] - if ramSize: - mmtContainerID = containerIDs[ramSize.index(min(ramSize))] - selectedContainerIDs.append(mmtContainerID) - return selectedContainerIDs - - def MaxUseContainerSelection(self, selectedHostIDs): - selectedContainerIDs = [] - for hostID in selectedHostIDs: - containerIDs = self.env.getContainersOfHost(hostID) - if len(containerIDs): - containerIPS = [self.env.containerlist[cid].getBaseIPS() for cid in containerIDs] - selectedContainerIDs.append(containerIDs[containerIPS.index(max(containerIPS))]) - return selectedContainerIDs - - def MaxCorContainerSelection(self, selectedHostIDs,utilHistoryContainer): - selectedContainerIDs = [] - for hostID in selectedHostIDs: - containerIDs = self.env.getContainersOfHost(hostID) - if len(containerIDs): - hostL = [[utilHistoryContainer[j][cid] for j in range(len(utilHistoryContainer))] for cid in containerIDs] - data = pd.DataFrame(hostL) - data = data.T; RSquared = [] - for i in range(data.shape[1]): - x = np.array(data.drop(data.columns[i],axis=1)) - y = np.array(data.iloc[:,i]) - X1 = np.c_[x, np.ones(x.shape[0])] - y_pred = np.dot(X1, np.dot(np.linalg.pinv(np.dot(np.transpose(X1), X1)), np.dot(np.transpose(X1), y))) - corr = np.corrcoef(np.column_stack((y,y_pred)), rowvar=False) - RSquared.append(corr[0][1] if not np.isnan(corr).any() else 0) - selectedContainerIDs.append(containerIDs[RSquared.index(max(RSquared))]) - return selectedContainerIDs - - # Container placement - - def RandomPlacement(self, containerIDs): - decision = [] - for cid in containerIDs: - decision.append((cid, np.random.randint(0, len(self.env.hostlist)))) return decision - def FirstFitPlacement(self, containerIDs): - decision = [] - for cid in containerIDs: - for hostID in range(len(self.env.hostlist)): - if self.env.getPlacementPossible(cid, hostID): - decision.append((cid, hostID)); break - return decision + # Task placement - def LeastFullPlacement(self, containerIDs): + def RandomPlacement(self, tasks): decision = [] - hostIPSs = [(self.env.hostlist[i].getCPU(), i) for i in range(len(self.env.hostlist))] - for cid in containerIDs: - leastFullHost = min(hostIPSs) - decision.append((cid, leastFullHost[1])) - if len(hostIPSs) > 1: - hostIPSs.remove(leastFullHost) + for task in tasks: + decision.append(np.random.randint(0, len(self.env.hostlist))) return decision - - def MaxFullPlacement(self, containerIDs): - decision = [] - hostIPSs = [(self.env.hostlist[i].getCPU(), i) for i in range(len(self.env.hostlist))] - for cid in containerIDs: - maxFullHost = max(hostIPSs) - decision.append((cid, maxFullHost[1])) - if len(hostIPSs) > 1: - hostIPSs.remove(leastFullHost) - return decision \ No newline at end of file diff --git a/scheduler/Threshold_MC_Random.py b/scheduler/Threshold_MC_Random.py deleted file mode 100644 index a352d41..0000000 --- a/scheduler/Threshold_MC_Random.py +++ /dev/null @@ -1,23 +0,0 @@ -from .Scheduler import * -import numpy as np -from copy import deepcopy - - -class TMCRScheduler(Scheduler): - def __init__(self): - super().__init__() - self.utilHistoryContainer= [] - - def updateUtilHistoryContainer(self): - containerUtil = [(cid.getBaseIPS() if cid else 0) for cid in self.env.containerlist] - self.utilHistoryContainer.append(containerUtil) - - - def selection(self): - self.updateUtilHistoryContainer() - selectedHostIDs = self.ThresholdHostSelection() - selectedVMIDs = self.MaxCorContainerSelection(selectedHostIDs,self.utilHistoryContainer) - return selectedVMIDs - - def placement(self, containerIDs): - return self.RandomPlacement(containerIDs) diff --git a/serverless/Serverless.py b/serverless/Serverless.py index e69de29..12fe723 100644 --- a/serverless/Serverless.py +++ b/serverless/Serverless.py @@ -0,0 +1,127 @@ +from .node.Node import * +from .task.Task import * +from .datacenter.server.controller import * +from time import time, sleep +import multiprocessing, os +from joblib import Parallel, delayed +from copy import deepcopy + +num_cores = multiprocessing.cpu_count() + +class Serverless(): + # Total power in watt + # Total Router Bw + # Interval Time in seconds + def __init__(self, Scheduler, Decider, IntervalTime, hostinit, database, env): + self.hostlimit = len(hostinit) + self.scheduler = Scheduler + self.scheduler.setEnvironment(self) + self.decider = Decider + self.decider.setEnvironment(self) + self.hostlist = [] + self.completedtasklist = [] + self.activetasklist = [] + self.waitinglist = [] + self.intervaltime = IntervalTime + self.interval = 0 + self.inactiveContainers = [] + self.stats = None + self.environment = env + self.addHostlistInit(hostinit) + self.globalStartTime = time() + self.intervalAllocTimings = [] + os.makedirs('./temp', exist_ok = True) + + def addHostInit(self, IP, IPS, RAM, Disk, Bw, Powermodel): + assert len(self.hostlist) < self.hostlimit + host = Node(len(self.hostlist),IP,IPS, RAM, Disk, Bw, Powermodel, self) + self.hostlist.append(host) + + def addHostlistInit(self, hostList): + assert len(hostList) == self.hostlimit + for IP, IPS, RAM, Disk, Bw, Powermodel in hostList: + self.addHostInit(IP, IPS, RAM, Disk, Bw, Powermodel) + + def getTasksOfHost(self, hostID): + tasks = [] + for task in self.activetasklist: + if task.hostid == hostID: + tasks.append(task.id) + return tasks + + def getTaskByCID(self, creationID): + return self.functions[creationID] + + def getHostByID(self, hostID): + return self.hostlist[hostID] + + def getNumActiveTasks(self): + return len(self.activetasklist()) + + def getTasksofHost(self, hostID): + return [t.creationID() if t.hostid == hostID for t in self.activetasklist] + + def getTasksInHosts(self): + return [len(self.getContainersOfHost(host)) for host in range(self.hostlimit)] + + def visualSleep(self, t): + total = str(int(t//60))+" min, "+str(t%60)+" sec" + for i in range(int(t)): + print("\r>> Interval timer "+str(i//60)+" min, "+str(i%60)+" sec of "+total, end=' ') + sleep(1) + sleep(t % 1) + print() + + def canRun(self, task): + done = [False] * len(task.precendence) + for t in self.completedtasklist: + if t.creationID == task.creationID and t.taskID in task.precendence: + done[t.taskID] = True + return np.all(done) + + def destroyCompletedTasks(self): + destroyed = 0 + for i, task in enumerate(self.activetasklist): + outputexist = [os.path.exists(path) for path in task.output_imgs] + if np.all(outputexist): + destroyed += 1 + task.destroy() + self.activetasklist.remove(task) + self.completedtasklist.append(task) + return destroyed + + def allocateInit(self, newtasklist, decision): + self.interval += 1 + start = time(); deployed = 0 + for i, hid in enumerate(decision): + task = newtasklist[i] + if self.canRun(task): + task.hostid = hid; task.startAt = self.interval; task.runTask(self.getHostByID(hid).ip) + deployed += 1 + else: + self.waitinglist.append(task) + self.visualSleep(self.intervaltime) + for host in self.hostlist: + host.updateUtilizationMetrics() + return deployed + + def simulationStep(self, decision): + self.interval += 1 + start = time(); deployed = 0 + decisionwaiting, decisionnew = decision[:len(newtasklist)], decision[len(newtasklist):] + for i, hid in enumerate(decisionwaiting): + task = self.waitinglist[i] + if self.canRun(task): + task.hostid = hid; task.startAt = self.interval; task.runTask(self.getHostByID(hid).ip) + deployed += 1 + for i, hid in enumerate(decisionnew): + task = newtasklist[i] + if self.canRun(task): + task.hostid = hid; task.startAt = self.interval; task.runTask(self.getHostByID(hid).ip) + deployed += 1 + else: + self.waitinglist.append(task) + self.visualSleep(self.intervaltime) + for host in self.hostlist: + host.updateUtilizationMetrics() + return deployed diff --git a/serverless/datacenter/server/controller.py b/serverless/datacenter/server/controller.py index 31a6097..19fff59 100644 --- a/serverless/datacenter/server/controller.py +++ b/serverless/datacenter/server/controller.py @@ -19,4 +19,18 @@ def runFunctions(ip, funcname, input_img, output_img=None): if output_img: if os.path.getsize(output_img) == 0: return False + return True + +def runFunctionsAll(ip, funcname, input_imgs, output_imgs=None): + ''' Runs a function and return True if successful and false otherwise ''' + cmd = '' + for i in range(len(input_imgs)): + cmd += f'http http://{ip}:7071/api/{funcname} @{input_imgs[i]}' + if output_imgs: cmd += f' > {output_imgs[i]}' + if i < len(input_imgs) - 1: + cmd += ' && ' + subprocess.Popen(cmd, shell=True) + if output_imgs: + if os.path.getsize(output_imgs[0]) == 0: + return False return True \ No newline at end of file diff --git a/serverless/function/Task.py b/serverless/function/Task.py new file mode 100644 index 0000000..eb8d269 --- /dev/null +++ b/serverless/function/Task.py @@ -0,0 +1,37 @@ +from utils.utils import * + +class Task(): + def __init__(self, creationID, creationInterval, sla, application, choice, Framework, taskID, precedence, input_imgs, HostID = -1): + self.creationID = creationID + self.taskID = taskID + self.precedence = precedence + self.choice = choice + self.sla = sla + self.env = Framework + self.createAt = creationInterval + self.application = application + self.input_imgs = input_imgs + self.hostid = HostID + self.startAt = -1 + self.destroyAt = -1 + self.getOutputImgs() + + def getHostID(self): + return self.hostid + + def getOutputImgs(self): + ch = self.choice[0].upper() + self.output_imgs = [] + for img in self.input_imgs: + for d in DSET: + d2 = d.split('.')[0] + if d2 in img: + self.output_imgs.append(f'./temp/{d2}_{ch}_{self.creationID}_{self.taskID}.jpg') + break + + def runTask(self, ip): + runFunctions(ip, self.application, self.input_imgs, self.output_imgs) + + def destroy(self): + self.destroyAt = self.env.interval + self.hostid = -1 \ No newline at end of file diff --git a/serverless/node/node.py b/serverless/node/node.py new file mode 100644 index 0000000..d072a81 --- /dev/null +++ b/serverless/node/node.py @@ -0,0 +1,65 @@ +from metrics.Disk import * +from metrics.RAM import * +from metrics.Bandwidth import * +from serverless.datacenter.server.controller import * + +class Node(): + # IPS = Million Instructions per second capacity + # RAM = Ram in MB capacity + # Disk = Disk characteristics capacity + # Bw = Bandwidth characteristics capacity + def __init__(self, ID, IP, IPS, RAM_, Disk_, Bw, Powermodel, Serverless): + self.id = ID + self.ip = IP + self.ipsCap = IPS + self.ramCap = RAM_ + self.diskCap = Disk_ + self.bwCap = Bw + # Initialize utilization metrics + self.ips = 0 + self.ram = RAM(0, 0, 0) + self.bw = Bandwidth(0, 0) + self.disk = Disk(0, 0, 0) + self.powermodel = Powermodel + self.powermodel.allocHost(self) + self.powermodel.host = self + self.env = Serverless + + def getPower(self): + return self.powermodel.power() + + def getPowerFromIPS(self, ips): + return self.powermodel.powerFromCPU(min(100, 100 * (ips / self.ipsCap))) + + def getCPU(self): + # 0 - 100 last interval + return min(100, 100 * (self.ips / self.ipsCap)) + + def getBaseIPS(self): + return self.ips + + def getApparentIPS(self): + return self.ips + + def getIPSAvailable(self): + return self.ipsCap - self.ips + + def getCurrentRAM(self): + return self.ram.size, self.ram.read, self.ram.write + + def getRAMAvailable(self): + size, read, write = self.getCurrentRAM() + return max(0, (0.6 if self.ramCap.size < 4000 else 0.8) * self.ramCap.size - size), self.ramCap.read - read, self.ramCap.write - write + + def getCurrentDisk(self): + return self.disk.size, self.disk.read, self.disk.write + + def getDiskAvailable(self): + size, read, write = self.getCurrentDisk() + return self.diskCap.size - size, self.diskCap.read - read, self.diskCap.write - write + + def updateUtilizationMetrics(self): + host_data = gethostStat(self.ip) + self.ips = host_data['cpu'] * self.ipsCap / 100 + self.ram.size = host_data['memory'] + self.disk.size = host_data['disk'] \ No newline at end of file diff --git a/serverless/workload/AIBenchWorkload.py b/serverless/workload/AIBenchWorkload.py index e69de29..828d073 100644 --- a/serverless/workload/AIBenchWorkload.py +++ b/serverless/workload/AIBenchWorkload.py @@ -0,0 +1,22 @@ +from .Workload import * +from utils.Utils import * +from random import gauss, choices + +class AIBenchWorkload(Workload): + def __init__(self, num_workloads, std_dev): + super().__init__() + self.num_workloads = num_workloads + self.std_dev = std_dev + + def generateNewContainers(self, interval): + workloadlist = [] + applications = [name for name in os.listdir(FN_PATH) if os.path.isdir(FN_PATH+name)] + multiplier = np.array([2, 2, 2, 2, 2, 2, 2]) + weights = 1 - (multiplier / np.sum(multiplier)) + for i in range(max(1,int(gauss(self.num_workloads, self.std_dev)))): + CreationID = self.creation_id + SLA = np.random.randint(5,8) ## Update this based on intervals taken + application = choices(applications, weights=weights)[0] + workloadlist.append((CreationID, interval, SLA, application)) + self.creation_id += 1 + return workloadlist \ No newline at end of file diff --git a/serverless/workload/Workload.py b/serverless/workload/Workload.py new file mode 100644 index 0000000..499869b --- /dev/null +++ b/serverless/workload/Workload.py @@ -0,0 +1,5 @@ +import numpy as np + +class Workload(): + def __init__(self): + self.creation_id = 0 diff --git a/utils/Utils.py b/utils/Utils.py index dcb115f..966cb95 100644 --- a/utils/Utils.py +++ b/utils/Utils.py @@ -8,6 +8,8 @@ FN_PATH = './functions/' IPS_PATH = './serverless/datacenter/ips.json' SAMPLE_PATH = './samples/' +DSET = list(filter(lambda k: '.md' not in k, os.listdir(SAMPLE_PATH))) +DSET = [os.path.join(SAMPLE_PATH, i) for i in self.dataset] def printDecisionAndMigrations(decision, migrations): print('Decision: [', end='')