Skip to content

Commit

Permalink
Merge pull request #1233 from flatironinstitute/dev_remove_slurm_inte…
Browse files Browse the repository at this point in the history
…gration

Remove slurm integration
  • Loading branch information
pgunn authored Nov 21, 2023
2 parents 650ca37 + 0fbf7df commit 637b7ac
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 301 deletions.
11 changes: 0 additions & 11 deletions SLURM/README

This file was deleted.

135 changes: 0 additions & 135 deletions SLURM/demo_slurm.py

This file was deleted.

33 changes: 0 additions & 33 deletions SLURM/slurmStart.sh

This file was deleted.

164 changes: 43 additions & 121 deletions caiman/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,72 +100,38 @@ def extract_patch_coordinates(dims: tuple,

return list(map(np.sort, coords_flat)), shapes

def start_server(slurm_script: str = None, ipcluster: str = "ipcluster", ncpus: int = None) -> None:
def start_server(ipcluster: str = "ipcluster", ncpus: int = None) -> None:
"""
programmatically start the ipyparallel server
Args:
ncpus: int
ncpus
number of processors
ipcluster : str
ipcluster
ipcluster binary file name; requires 4 path separators on Windows. ipcluster="C:\\\\Anaconda3\\\\Scripts\\\\ipcluster.exe"
Default: "ipcluster"
"""
logger.info("Starting cluster...")
if ncpus is None:
ncpus = psutil.cpu_count()

if slurm_script is None:

if ipcluster == "ipcluster":
subprocess.Popen(f"ipcluster start -n {ncpus}", shell=True, close_fds=(os.name != 'nt'))
else:
subprocess.Popen(shlex.split(f"{ipcluster} start -n {ncpus}"),
shell=True,
close_fds=(os.name != 'nt'))
time.sleep(1.5)
# Check that all processes have started
client = ipyparallel.Client()
time.sleep(1.5)
while len(client) < ncpus:
sys.stdout.write(".") # Give some visual feedback of things starting
sys.stdout.flush() # (de-buffered)
time.sleep(0.5)
logger.debug('Making sure everything is up and running')
client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go
if ipcluster == "ipcluster":
subprocess.Popen(f"ipcluster start -n {ncpus}", shell=True, close_fds=(os.name != 'nt'))
else:
shell_source(slurm_script)
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
logger.debug([pdir, profile])
c = Client(ipython_dir=pdir, profile=profile)
ee = c[:]
ne = len(ee)
logger.info(f'Running on {ne} engines.')
c.close()
sys.stdout.write("start_server: done\n")


def shell_source(script: str) -> None:
""" Run a source-style bash script, copy resulting env vars to current process. """
# XXX This function is weird and maybe not a good idea. People easily might expect
# it to handle conditionals. Maybe just make them provide a key-value file
#introduce echo to indicate the end of the output
pipe = subprocess.Popen(f". {script}; env; echo 'FINISHED_CLUSTER'", stdout=subprocess.PIPE, shell=True)

env = dict()
while True:
line = pipe.stdout.readline().decode('utf-8').rstrip()
if 'FINISHED_CLUSTER' in line: # find the keyword set above to determine the end of the output stream
break
logger.debug("shell_source parsing line[" + str(line) + "]")
lsp = str(line).split("=", 1)
if len(lsp) > 1:
env[lsp[0]] = lsp[1]

os.environ.update(env)
pipe.stdout.close()

subprocess.Popen(shlex.split(f"{ipcluster} start -n {ncpus}"),
shell=True,
close_fds=(os.name != 'nt'))
time.sleep(1.5)
# Check that all processes have started
client = ipyparallel.Client()
time.sleep(1.5)
while len(client) < ncpus:
sys.stdout.write(".") # Give some visual feedback of things starting
sys.stdout.flush() # (de-buffered)
time.sleep(0.5)
logger.debug('Making sure everything is up and running')
client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go

def stop_server(ipcluster: str = 'ipcluster', pdir: str = None, profile: str = None, dview=None) -> None:
"""
Expand All @@ -185,61 +151,33 @@ def stop_server(ipcluster: str = 'ipcluster', pdir: str = None, profile: str = N
dview.terminate()
else:
logger.info("Stopping cluster...")
try:
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
is_slurm = True
except:
logger.debug('stop_server: not a slurm cluster')
is_slurm = False

if is_slurm:
if pdir is None and profile is None:
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
c = Client(ipython_dir=pdir, profile=profile)
ee = c[:]
ne = len(ee)
logger.info(f'Shutting down {ne} engines.')
c.close()
c.shutdown(hub=True)
shutil.rmtree('profile_' + str(profile))
try:
shutil.rmtree('./log/')
except:
logger.info('creating log folder') # FIXME Not what this means

files = glob.glob('*.log')
os.mkdir('./log')

for fl in files:
shutil.move(fl, './log/')

if ipcluster == "ipcluster":
proc = subprocess.Popen("ipcluster stop",
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))
else:
if ipcluster == "ipcluster":
proc = subprocess.Popen("ipcluster stop",
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))
else:
proc = subprocess.Popen(shlex.split(ipcluster + " stop"),
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))

line_out = proc.stderr.readline()
if b'CRITICAL' in line_out:
logger.info("No cluster to stop...")
elif b'Stopping' in line_out:
st = time.time()
logger.debug('Waiting for cluster to stop...')
while (time.time() - st) < 4:
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
else:
logger.error(line_out)
logger.error('**** Unrecognized syntax in ipcluster output, waiting for server to stop anyways ****')
proc = subprocess.Popen(shlex.split(ipcluster + " stop"),
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))

line_out = proc.stderr.readline()
if b'CRITICAL' in line_out:
logger.info("No cluster to stop...")
elif b'Stopping' in line_out:
st = time.time()
logger.debug('Waiting for cluster to stop...')
while (time.time() - st) < 4:
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
else:
logger.error(line_out)
logger.error('**** Unrecognized syntax in ipcluster output, waiting for server to stop anyways ****')

proc.stderr.close()
proc.stderr.close()

logger.info("stop_cluster(): done")

Expand All @@ -256,7 +194,6 @@ def setup_cluster(backend:str = 'multiprocessing',
'multiprocessing' - Use multiprocessing library
'ipyparallel' - Use ipyparallel instead (better on Windows?)
'single' - Don't be parallel (good for debugging, slow)
'SLURM' - Try to use SLURM batch system (untested, involved).
Most backends will try, by default, to stop a running cluster if
it is running before setting up a new one, or throw an error if
they find one.
Expand All @@ -273,7 +210,7 @@ def setup_cluster(backend:str = 'multiprocessing',
Returns:
c: ipyparallel.Client object; only used for ipyparallel and SLURM backends, else None
c: ipyparallel.Client object; only used for ipyparallel backends, else None
dview: multicore processing engine that is used for parallel processing.
If backend is 'multiprocessing' then dview is Pool object.
If backend is 'ipyparallel' then dview is a DirectView object.
Expand Down Expand Up @@ -325,21 +262,6 @@ def setup_cluster(backend:str = 'multiprocessing',
c = None
n_processes = 1

elif backend == 'SLURM':
# Override n_processes from above because with slurm you're using cluster resources, not machine-local resources
# Warning: This code may no longer work; it has not been tested in a very, very long time
n_processes = int(os.environ.get('SLURM_NPROCS'))
try:
stop_server()
except:
logger.debug('Nothing to stop')
slurm_script = os.environ.get('SLURMSTART_SCRIPT') # An example of this is in the source repo under 'SLURM/slurmStart.sh'
logger.info([str(n_processes), slurm_script])
start_server(slurm_script=slurm_script, ncpus=n_processes)
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
logger.info([pdir, profile])
c = Client(ipython_dir=pdir, profile=profile)
dview = c[:]
else:
raise Exception('Unknown Backend')

Expand Down
Loading

0 comments on commit 637b7ac

Please sign in to comment.