From 79a0914106e9c99e1e05475b5b3511c7b2bbedc4 Mon Sep 17 00:00:00 2001 From: Shreshth Tuli Date: Wed, 1 Dec 2021 18:49:22 +0000 Subject: [PATCH] Bug fixes in layer splits --- .gitignore | 1 + decider/Decider.py | 14 ++++++-------- main.py | 25 +++++-------------------- serverless/Serverless.py | 15 +++++++++++---- utils/Utils.py | 6 ++++++ 5 files changed, 29 insertions(+), 32 deletions(-) diff --git a/.gitignore b/.gitignore index 81bb517..a468d21 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__/ *.py[cod] *$py.class temp/ +logs/ # C extensions *.so diff --git a/decider/Decider.py b/decider/Decider.py index f51e553..d34dd75 100644 --- a/decider/Decider.py +++ b/decider/Decider.py @@ -16,24 +16,22 @@ def decision(self, workflowlist): def getLayerInputs(self, cid, app, i): paths = [] for d in DSET: - d2 = DSET[0].split('.')[0] + d2 = d.split('.')[0] paths.append(f'./temp/{cid}_{i}_{app}_{d2}_L.jpg') return paths def createTasks(self, cid, interval, SLA, app, choice): tasklist = [] if choice == 'semantic': - tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], [self.dataset[0]])) - tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 1, [], [self.dataset[1]])) - tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 2, [], [self.dataset[2]])) - tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 3, [], [self.dataset[3]])) + tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset)) + tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 1, [], self.dataset)) + tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 2, [], self.dataset)) + tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 3, [], self.dataset)) elif choice == 'layer': tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset)) tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 1, [0], self.getLayerInputs(cid, app, 0))) - tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 2, [0, 1], self.getLayerInputs(cid, app, 1))) - tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 3, [0, 1, 2], self.getLayerInputs(cid, app, 2))) elif choice == 'compression': - tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset*2)) + tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset)) return tasklist diff --git a/main.py b/main.py index cb74564..a6837e7 100644 --- a/main.py +++ b/main.py @@ -43,15 +43,9 @@ # Global constants NUM_SIM_STEPS = 100 -HOSTS = 10 * 5 if opts.env == '' else 10 -CONTAINERS = HOSTS -TOTAL_POWER = 1000 -ROUTER_BW = 10000 +HOSTS = 10 INTERVAL_TIME = 5 # seconds -NEW_CONTAINERS = 0 if HOSTS == 10 else 5 -DB_NAME = '' -DB_HOST = '' -DB_PORT = 0 +NEW_CONTAINERS = 0 HOSTS_IP = [] def initalizeEnvironment(environment, mode): @@ -111,21 +105,12 @@ def saveStats(stats, datacenter, workload, env, end=True): dirname += "_" + workload.__class__.__name__ dirname += "_" + str(NUM_SIM_STEPS) dirname += "_" + str(HOSTS) - dirname += "_" + str(CONTAINERS) - dirname += "_" + str(TOTAL_POWER) - dirname += "_" + str(ROUTER_BW) dirname += "_" + str(INTERVAL_TIME) dirname += "_" + str(NEW_CONTAINERS) if not os.path.exists("logs"): os.mkdir("logs") if os.path.exists(dirname): shutil.rmtree(dirname, ignore_errors=True) os.mkdir(dirname) stats.generateDatasets(dirname) - if 'Datacenter' in datacenter.__class__.__name__: - saved_env, saved_workload, saved_datacenter, saved_scheduler, saved_sim_scheduler = stats.env, stats.workload, stats.datacenter, stats.scheduler, stats.simulated_scheduler - stats.env, stats.workload, stats.datacenter, stats.scheduler, stats.simulated_scheduler = None, None, None, None, None - with open(dirname + '/' + dirname.split('/')[1] +'.pk', 'wb') as handle: - pickle.dump(stats, handle) - stats.env, stats.workload, stats.datacenter, stats.scheduler, stats.simulated_scheduler = saved_env, saved_workload, saved_datacenter, saved_scheduler, saved_sim_scheduler if not end: return stats.generateGraphs(dirname) stats.generateCompleteDatasets(dirname) @@ -137,10 +122,10 @@ def saveStats(stats, datacenter, workload, env, end=True): datacenter, workload, scheduler, decider, env, stats = initalizeEnvironment(opts.env, int(opts.mode)) for step in range(NUM_SIM_STEPS): - print(color.BOLD+"Simulation Interval:", step, color.ENDC) + print(color.GREEN+"Execution Interval:", step, color.ENDC) stepSimulation(workload, scheduler, decider, env, stats) - if step % 10 == 0: saveStats(stats, datacenter, workload, env, end = False) + # if step % 10 == 0: saveStats(stats, datacenter, workload, env, end = False) datacenter.cleanup() - saveStats(stats, datacenter, workload, env) + # saveStats(stats, datacenter, workload, env) diff --git a/serverless/Serverless.py b/serverless/Serverless.py index 9cfa88e..0eb0ae5 100644 --- a/serverless/Serverless.py +++ b/serverless/Serverless.py @@ -1,6 +1,7 @@ from .node.Node import * from .task.Task import * from .datacenter.server.controller import * +from utils.Utils import * from time import time, sleep import multiprocessing, os from joblib import Parallel, delayed @@ -81,14 +82,19 @@ def canRun(self, task): return np.all(done) def destroyCompletedTasks(self): - destroyed = 0 + destroyed = 0; toremove = [] for i, task in enumerate(self.activetasklist): outputexist = [os.path.exists(path) for path in task.output_imgs] if np.all(outputexist): destroyed += 1 task.destroy() - self.activetasklist.remove(task) + toremove.append(task) self.completedtasklist.append(task) + if task.choice in ['semantic', 'compression']: + delfiles(task.creationID, task.taskID) + if task.choice == 'layer' and task.taskID == 1: + delfiles(task.creationID) + for task in toremove: self.activetasklist.remove(task) return destroyed def allocateInit(self, newtasklist, decision): @@ -108,13 +114,13 @@ def allocateInit(self, newtasklist, decision): def simulationStep(self, newtasklist, decision): self.interval += 1 - start = time(); deployed = 0 + start = time(); deployed = 0; toremove = [] decisionwaiting, decisionnew = decision[:len(self.waitinglist)], decision[len(self.waitinglist):] for i, hid in enumerate(decisionwaiting): task = self.waitinglist[i] if self.canRun(task): task.hostid = hid; task.startAt = self.interval; task.runTask(self.getHostByID(hid).ip) - self.activetasklist.append(task); deployed += 1 + toremove.append(task); self.activetasklist.append(task); deployed += 1 for i, hid in enumerate(decisionnew): task = newtasklist[i] if self.canRun(task): @@ -122,6 +128,7 @@ def simulationStep(self, newtasklist, decision): self.activetasklist.append(task); deployed += 1 else: self.waitinglist.append(task) + for task in toremove: self.waitinglist.remove(task) self.visualSleep(self.intervaltime) for host in self.hostlist: host.updateUtilizationMetrics() diff --git a/utils/Utils.py b/utils/Utils.py index 05e8720..070a8c9 100644 --- a/utils/Utils.py +++ b/utils/Utils.py @@ -2,6 +2,7 @@ import logging import json import re +from glob import glob from subprocess import call, run, PIPE from .ColorUtils import * @@ -25,6 +26,11 @@ def unixify(paths): if '.py' in file or '.sh' in file: _ = os.system("bash -c \"dos2unix "+path+file+" 2&> /dev/null\"") +def delfiles(creationID, taskID=None): + fileList = glob(f'./temp/{creationID}_**' if taskID is None else f'./temp/{creationID}_{taskID}_**') + for filePath in fileList: + os.remove(filePath) + def getdigit(string): for s in string: if s.isdigit():