Skip to content

Commit

Permalink
Bug fixes in layer splits
Browse files Browse the repository at this point in the history
  • Loading branch information
shreshthtuli committed Dec 1, 2021
1 parent edd2f8e commit 79a0914
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ __pycache__/
*.py[cod]
*$py.class
temp/
logs/

# C extensions
*.so
Expand Down
14 changes: 6 additions & 8 deletions decider/Decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,22 @@ def decision(self, workflowlist):
def getLayerInputs(self, cid, app, i):
paths = []
for d in DSET:
d2 = DSET[0].split('.')[0]
d2 = d.split('.')[0]
paths.append(f'./temp/{cid}_{i}_{app}_{d2}_L.jpg')
return paths

def createTasks(self, cid, interval, SLA, app, choice):
tasklist = []
if choice == 'semantic':
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], [self.dataset[0]]))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 1, [], [self.dataset[1]]))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 2, [], [self.dataset[2]]))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 3, [], [self.dataset[3]]))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 1, [], self.dataset))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 2, [], self.dataset))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 3, [], self.dataset))
elif choice == 'layer':
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 1, [0], self.getLayerInputs(cid, app, 0)))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 2, [0, 1], self.getLayerInputs(cid, app, 1)))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 3, [0, 1, 2], self.getLayerInputs(cid, app, 2)))
elif choice == 'compression':
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset*2))
tasklist.append(Task(cid, interval, SLA, app, choice, self.env, 0, [], self.dataset))
return tasklist


Expand Down
25 changes: 5 additions & 20 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,9 @@

# Global constants
NUM_SIM_STEPS = 100
HOSTS = 10 * 5 if opts.env == '' else 10
CONTAINERS = HOSTS
TOTAL_POWER = 1000
ROUTER_BW = 10000
HOSTS = 10
INTERVAL_TIME = 5 # seconds
NEW_CONTAINERS = 0 if HOSTS == 10 else 5
DB_NAME = ''
DB_HOST = ''
DB_PORT = 0
NEW_CONTAINERS = 0
HOSTS_IP = []

def initalizeEnvironment(environment, mode):
Expand Down Expand Up @@ -111,21 +105,12 @@ def saveStats(stats, datacenter, workload, env, end=True):
dirname += "_" + workload.__class__.__name__
dirname += "_" + str(NUM_SIM_STEPS)
dirname += "_" + str(HOSTS)
dirname += "_" + str(CONTAINERS)
dirname += "_" + str(TOTAL_POWER)
dirname += "_" + str(ROUTER_BW)
dirname += "_" + str(INTERVAL_TIME)
dirname += "_" + str(NEW_CONTAINERS)
if not os.path.exists("logs"): os.mkdir("logs")
if os.path.exists(dirname): shutil.rmtree(dirname, ignore_errors=True)
os.mkdir(dirname)
stats.generateDatasets(dirname)
if 'Datacenter' in datacenter.__class__.__name__:
saved_env, saved_workload, saved_datacenter, saved_scheduler, saved_sim_scheduler = stats.env, stats.workload, stats.datacenter, stats.scheduler, stats.simulated_scheduler
stats.env, stats.workload, stats.datacenter, stats.scheduler, stats.simulated_scheduler = None, None, None, None, None
with open(dirname + '/' + dirname.split('/')[1] +'.pk', 'wb') as handle:
pickle.dump(stats, handle)
stats.env, stats.workload, stats.datacenter, stats.scheduler, stats.simulated_scheduler = saved_env, saved_workload, saved_datacenter, saved_scheduler, saved_sim_scheduler
if not end: return
stats.generateGraphs(dirname)
stats.generateCompleteDatasets(dirname)
Expand All @@ -137,10 +122,10 @@ def saveStats(stats, datacenter, workload, env, end=True):
datacenter, workload, scheduler, decider, env, stats = initalizeEnvironment(opts.env, int(opts.mode))

for step in range(NUM_SIM_STEPS):
print(color.BOLD+"Simulation Interval:", step, color.ENDC)
print(color.GREEN+"Execution Interval:", step, color.ENDC)
stepSimulation(workload, scheduler, decider, env, stats)
if step % 10 == 0: saveStats(stats, datacenter, workload, env, end = False)
# if step % 10 == 0: saveStats(stats, datacenter, workload, env, end = False)

datacenter.cleanup()
saveStats(stats, datacenter, workload, env)
# saveStats(stats, datacenter, workload, env)

15 changes: 11 additions & 4 deletions serverless/Serverless.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .node.Node import *
from .task.Task import *
from .datacenter.server.controller import *
from utils.Utils import *
from time import time, sleep
import multiprocessing, os
from joblib import Parallel, delayed
Expand Down Expand Up @@ -81,14 +82,19 @@ def canRun(self, task):
return np.all(done)

def destroyCompletedTasks(self):
destroyed = 0
destroyed = 0; toremove = []
for i, task in enumerate(self.activetasklist):
outputexist = [os.path.exists(path) for path in task.output_imgs]
if np.all(outputexist):
destroyed += 1
task.destroy()
self.activetasklist.remove(task)
toremove.append(task)
self.completedtasklist.append(task)
if task.choice in ['semantic', 'compression']:
delfiles(task.creationID, task.taskID)
if task.choice == 'layer' and task.taskID == 1:
delfiles(task.creationID)
for task in toremove: self.activetasklist.remove(task)
return destroyed

def allocateInit(self, newtasklist, decision):
Expand All @@ -108,20 +114,21 @@ def allocateInit(self, newtasklist, decision):

def simulationStep(self, newtasklist, decision):
self.interval += 1
start = time(); deployed = 0
start = time(); deployed = 0; toremove = []
decisionwaiting, decisionnew = decision[:len(self.waitinglist)], decision[len(self.waitinglist):]
for i, hid in enumerate(decisionwaiting):
task = self.waitinglist[i]
if self.canRun(task):
task.hostid = hid; task.startAt = self.interval; task.runTask(self.getHostByID(hid).ip)
self.activetasklist.append(task); deployed += 1
toremove.append(task); self.activetasklist.append(task); deployed += 1
for i, hid in enumerate(decisionnew):
task = newtasklist[i]
if self.canRun(task):
task.hostid = hid; task.startAt = self.interval; task.runTask(self.getHostByID(hid).ip)
self.activetasklist.append(task); deployed += 1
else:
self.waitinglist.append(task)
for task in toremove: self.waitinglist.remove(task)
self.visualSleep(self.intervaltime)
for host in self.hostlist:
host.updateUtilizationMetrics()
Expand Down
6 changes: 6 additions & 0 deletions utils/Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import json
import re
from glob import glob
from subprocess import call, run, PIPE
from .ColorUtils import *

Expand All @@ -25,6 +26,11 @@ def unixify(paths):
if '.py' in file or '.sh' in file:
_ = os.system("bash -c \"dos2unix "+path+file+" 2&> /dev/null\"")

def delfiles(creationID, taskID=None):
fileList = glob(f'./temp/{creationID}_**' if taskID is None else f'./temp/{creationID}_{taskID}_**')
for filePath in fileList:
os.remove(filePath)

def getdigit(string):
for s in string:
if s.isdigit():
Expand Down

0 comments on commit 79a0914

Please sign in to comment.