Skip to content

Commit

Permalink
Merge pull request #570 from scipion-em/fix-gpu-assignment
Browse files Browse the repository at this point in the history
V3.10.1: hotfix: Avoid double Gpu assignment in scipion parallelize p…
  • Loading branch information
fonsecareyna82 authored Oct 14, 2024
2 parents 9f739aa + b8202bb commit 45096b2
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
V3.10.1: hotfix: Avoid double Gpu assignment in scipion parallelize protocols with concurrent GPU steps
V3.10.0
developers:
- stepsExecutionMode = STEPS_SERIAL at class level. Should a class attribute for future versions. Now both approaches are valid.
Expand Down
2 changes: 1 addition & 1 deletion pyworkflow/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
VERSION_1_1 = '1.1.0'
VERSION_1_2 = '1.2.0'
VERSION_2_0 = '2.0.0'
VERSION_3_0 = '3.10.0'
VERSION_3_0 = '3.10.1'

# For a new release, define a new constant and assign it to LAST_VERSION
# The existing one has to be added to OLD_VERSIONS list.
Expand Down
20 changes: 10 additions & 10 deletions pyworkflow/protocol/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ def runSteps(self, steps,

class StepThread(threading.Thread):
""" Thread to run Steps in parallel. """
def __init__(self, thId, step, lock):
def __init__(self, step, lock):
threading.Thread.__init__(self)
self.thId = thId
self.thId = step.getObjId()
self.step = step
self.lock = lock

Expand Down Expand Up @@ -262,7 +262,7 @@ def getGpuList(self):
return []
else:
return gpus
def getFreeGpuSlot(self, nodeId=None):
def getFreeGpuSlot(self, stepId=None):
""" Returns a free gpu slot available or None. If node is passed it also reserves it for that node
:param node: node to make the reserve of Gpus
Expand All @@ -272,10 +272,10 @@ def getFreeGpuSlot(self, nodeId=None):
if node < 0:
gpus = self.gpuDict[node]

if nodeId is not None:
if stepId is not None:
self.gpuDict.pop(node)
self.gpuDict[nodeId] = gpus
logger.info("GPUs %s assigned to thread %s" % (gpus, nodeId))
self.gpuDict[stepId] = gpus
logger.info("GPUs %s assigned to step %s" % (gpus, stepId))
else:
logger.info("Free gpu slot found at %s" % node)
return gpus
Expand All @@ -288,14 +288,14 @@ def freeGpusSlot(self, node):
if gpus is not None:
self.gpuDict.pop(node)
self.gpuDict[-node-1] = gpus
logger.info("GPUs %s freed from thread %s" % (gpus, node))
logger.info("GPUs %s freed from step %s" % (gpus, node))
else:
logger.debug("node %s not found in GPU slots" % node)
logger.debug("step id %s not found in GPU slots" % node)

def _isStepRunnable(self, step):
""" Overwrite this method to check GPUs availability"""

if self.gpuList and step.needsGPU() and self.getFreeGpuSlot() is None:
if self.gpuList and step.needsGPU() and self.getFreeGpuSlot(step.getObjId()) is None:
logger.info("Can't run step %s. Needs gpus and there are no free gpu slots" % step)
return False

Expand Down Expand Up @@ -361,7 +361,7 @@ def runSteps(self, steps,
node = freeNodes.pop(0) # take an available node
runningSteps[node] = step
logger.debug("Running step %s on node %s" % (step, node))
t = StepThread(node, step, sharedLock)
t = StepThread(step, sharedLock)
# won't keep process up if main thread ends
t.daemon = True
t.start()
Expand Down
21 changes: 15 additions & 6 deletions pyworkflow/protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,18 @@ class MyOutput(enum.Enum):
# prerequisites of steps, but is easier to keep it
stepsExecutionMode = STEPS_SERIAL

@classmethod
def modeSerial(cls):
""" Returns true if steps are run one after another"""
# Maybe this property can be inferred from the
# prerequisites of steps, but is easier to keep it
return cls.stepsExecutionMode == STEPS_SERIAL

@classmethod
def modeParallel(cls):
""" Returns true if steps are run in parallel"""
return not cls.modeSerial()

def __init__(self, **kwargs):
Step.__init__(self, **kwargs)
self._size = None
Expand Down Expand Up @@ -1316,7 +1328,7 @@ def _stepFinished(self, step):
prot_id=self.getObjId(),
prot_name=self.getClassName(),
step_id=step._index))
if step.isFailed() and self.stepsExecutionMode == STEPS_PARALLEL:
if step.isFailed() and self.modeParallel():
# In parallel mode the executor will exit to close
# all working threads, so we need to close
self._endRun()
Expand Down Expand Up @@ -2434,7 +2446,7 @@ def runProtocolMain(projectPath, protDbPath, protId):
executor = None
nThreads = max(protocol.numberOfThreads.get(), 1)

if protocol.stepsExecutionMode == STEPS_PARALLEL and nThreads > 1:
if protocol.modeParallel() and nThreads > 1:
if protocol.useQueueForSteps():
executor = QueueStepExecutor(hostConfig,
protocol.getSubmitDict(),
Expand Down Expand Up @@ -2531,10 +2543,7 @@ class ProtStreamingBase(Protocol):
Minimum number of threads is 3 and should run in parallel mode.
"""

def __init__(self, **kwargs):

super().__init__()
self.stepsExecutionMode = STEPS_PARALLEL
stepsExecutionMode = STEPS_PARALLEL
def _insertAllSteps(self):
# Insert the step that generates the steps
self._insertFunctionStep(self.resumableStepGeneratorStep, str(datetime.now()), needsGPU=False)
Expand Down

0 comments on commit 45096b2

Please sign in to comment.