diff --git a/src/neurocaas_contrib/Interface_S3.py b/src/neurocaas_contrib/Interface_S3.py index 14c5714..d729d64 100755 --- a/src/neurocaas_contrib/Interface_S3.py +++ b/src/neurocaas_contrib/Interface_S3.py @@ -91,6 +91,43 @@ def download(s3path,localpath,display = False): else: raise +def download_multi(s3path,localpath,force,display = False): + """Download function. Takes an s3 path to a "folder" (path prefix that ends with backslack), and local object path as input. Will attempt to download all data at the given location to the local path. + :param s3path: full path to an object in s3. Assumes the s3://bucketname/key syntax. + :param localpath: full path to the object name locally (i.e. with basename attached). + :param force: will not redownload if data of the same name already lives here + :param display: (optional) Defaults to false. If true, displays a progress bar. + :return: bool (True if successful download for all files, False otherwise) + + + """ + assert s3path.startswith("s3://") + bucketname,keyname = s3path.split("s3://")[-1].split("/",1) + + try: + transfer = S3Transfer(s3_client) + + + # adapted from https://stackoverflow.com/questions/49772151/download-a-folder-from-s3-using-boto3 + bucket = s3.Bucket(bucketname) + no_duplicate = 1 + for obj in bucket.objects.filter(Prefix = keyname): + obj_keyname = obj.key + if (os.path.basename(obj_keyname) in os.listdir(localpath)) and (not force): + print("Data already exists at this location. Set force = true to overwrite") + no_duplicate = 0 + else: + progress = ProgressPercentage_d(transfer._manager._client,bucketname,obj_keyname,display = display) + transfer.download_file(bucketname,obj_keyname,os.path.join(localpath,os.path.basename(obj_keyname)),callback = progress) + return no_duplicate + + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == "404": + print("The object does not exist.") + raise + else: + raise + def upload(localpath,s3path,display = False): """Upload function. Takes a local object paht and s3 path to the desired key as input. :param localpath: full path to the object name locally (i.e. with basename attached). diff --git a/src/neurocaas_contrib/cli_commands.py b/src/neurocaas_contrib/cli_commands.py index 3fdbc4f..41495e1 100755 --- a/src/neurocaas_contrib/cli_commands.py +++ b/src/neurocaas_contrib/cli_commands.py @@ -1042,6 +1042,35 @@ def get_data(obj,outputpath,force,display): kwargs["display"] = display ncsm.get_data(**kwargs) + +@workflow.command(help = "get multiple registered datasets from S3") +@click.option("-o", + "--outputpath", + help = "path to write output to.", + default = None) +@click.option("-f", + "--force", + help = "if true, will redownload even if exists at intended output location", + is_flag = True) +@click.option("-d", + "--display", + help = "if true, will show download progress", + is_flag = True) +@click.pass_obj +def get_data_multi(obj,outputpath,force,display): + """Gets multiple registered datasets from S3. + + """ + path = obj["storage"]["path"] + ncsm = NeuroCAASScriptManager.from_registration(path) + kwargs = {} + if outputpath is not None: + kwargs["path"] = outputpath + kwargs["force"] = force + kwargs["display"] = display + ncsm.get_data_multi(**kwargs) + + @workflow.command(help = "get a registered config from S3") @click.option("-o", "--outputpath", diff --git a/src/neurocaas_contrib/local_envs/autolfads-dev/stack_config_template.json b/src/neurocaas_contrib/local_envs/autolfads-dev/stack_config_template.json new file mode 100644 index 0000000..85721f6 --- /dev/null +++ b/src/neurocaas_contrib/local_envs/autolfads-dev/stack_config_template.json @@ -0,0 +1,42 @@ +{ + "PipelineName": "autolfads-torch", + "REGION": "us-east-1", + "STAGE": "websubstack", + "Lambda": { + "CodeUri": "../../protocols", + "Handler": "submit_start.handler_multisession", + "Launch": true, + "LambdaConfig": { + "AMI": "ami-08f59b6fdb19d0344", + "INSTANCE_TYPE": "p2.8xlarge", + "REGION": "us-east-1", + "IAM_ROLE": "SSMRole", + "KEY_NAME": "testkeystack-custom-dev-key-pair", + "WORKING_DIRECTORY": "~/bin", + "COMMAND": "cd /home/ubuntu; sudo -u ubuntu neurocaas_contrib/run_main_cli.sh \"{}\" \"{}\" \"{}\" \"{}\" \"lfads-torch/run_main.sh\"; . neurocaas_contrib/ncap_utils/workflow.sh;", + "EXECUTION_TIMEOUT": 900, + "SSM_TIMEOUT": 172000 + } + }, + "UXData": { + "Affiliates": [ + { + "AffiliateName": "traviscipermagroup", + "UserNames": [ + "cipermauser1", + "cipermauser2" + ], + "UserInput": true, + "ContactEmail": "NOTE: KEEP THIS AFFILIATE TO ENABLE EASY TESTING" + }, + { + "AffiliateName": "systemsneura1639759179", + "UserNames": [ + "systemsneura1639759178" + ], + "UserInput": true, + "ContactEmail": "NOTE: KEEP THIS AFFILIATE TO ENABLE EASY TESTING" + } + ] + } +} \ No newline at end of file diff --git a/src/neurocaas_contrib/scripting.py b/src/neurocaas_contrib/scripting.py index a6cedce..4257f47 100755 --- a/src/neurocaas_contrib/scripting.py +++ b/src/neurocaas_contrib/scripting.py @@ -10,7 +10,7 @@ import json import zipfile from .log import NeuroCAASCertificate,NeuroCAASDataStatus,NeuroCAASDataStatusLegacy -from .Interface_S3 import download,upload +from .Interface_S3 import download,upload,download_multi dir_loc = os.path.abspath(os.path.dirname(__file__)) @@ -166,7 +166,7 @@ def __init__(self,path,write = True): self.path = path ## The subdirectories to expect/create at the given location. self.subdirs = {"data":"inputs","config":"configs","results":"results","logs":"logs"} - #self.pathtemplate = {"s3":None,"localsource":None,"local":None} + #self.pathtemplate = {"s3":None,"local":None,"local":None} self.registration = { "data":{}, "config":{}, @@ -178,6 +178,7 @@ def __init__(self,path,write = True): self.write() def write(self): + print("\n\n\n\n" + "Registering at {}".format(str(self.path)) + "\n\n\n\n") with open(os.path.join(self.path,"registration.json"),"w") as reg: json.dump(self.registration,reg) @@ -205,7 +206,7 @@ def register_data(self,s3path): ## canc check existence later. assert str(s3path).startswith("s3://"), "must be given in s3 form" self.registration["data"]["s3"] = str(s3path) - self.registration["data"].pop("localsource","False") + self.registration["data"].pop("local","False") self.registration["data"].pop("local","False") self.write() @@ -215,7 +216,7 @@ def register_data_local(self,localpath): """ ## canc check existence later. - self.registration["data"]["localsource"] = str(localpath) + self.registration["data"]["local"] = str(localpath) self.registration["data"].pop("s3","False") self.registration["data"].pop("local","False") self.write() @@ -228,7 +229,7 @@ def register_config(self,s3path): ## canc check existence later. assert str(s3path).startswith("s3://"), "must be given in s3 form" self.registration["config"]["s3"] = str(s3path) - self.registration["config"].pop("localsource","False") + self.registration["config"].pop("local","False") self.registration["config"].pop("local","False") self.write() @@ -238,7 +239,7 @@ def register_config_local(self,localpath): """ ## canc check existence later. - self.registration["config"]["localsource"] = str(localpath) + self.registration["config"]["local"] = str(localpath) self.registration["config"].pop("s3","False") self.registration["config"].pop("local","False") self.write() @@ -255,7 +256,7 @@ def register_file(self,name,s3path): self.registration["additional_files"][name] = {} ## populate self.registration["additional_files"][name]["s3"] = str(s3path) - self.registration["additional_files"][name].pop("localsource","False") + self.registration["additional_files"][name].pop("local","False") self.registration["additional_files"][name].pop("local","False") self.write() @@ -269,7 +270,7 @@ def register_file_local(self,name,localpath): #self.registration["additional_files"][name] = {k:v for k,v in self.pathtemplate.items()} self.registration["additional_files"][name] = {} ## populate - self.registration["additional_files"][name]["localsource"] = str(localpath) + self.registration["additional_files"][name]["local"] = str(localpath) self.registration["additional_files"][name].pop("s3","False") self.registration["additional_files"][name].pop("local","False") self.write() @@ -280,14 +281,14 @@ def register_resultpath(self,s3path): """ assert s3path.startswith("s3://"), "must be given in s3 form" self.registration["resultpath"]["s3"] = str(s3path) - self.registration["resultpath"].pop("localsource","False") + self.registration["resultpath"].pop("local","False") self.write() def register_resultpath_local(self,localpath): """Given an local path, registers that as the location where we will upload job data. Give a folder, where you want to generate two subdirectories, "logs", and "process_results". Logs and analysis results will be sent to these respective locations. """ - self.registration["resultpath"]["localsource"] = str(localpath) + self.registration["resultpath"]["local"] = str(localpath) self.registration["resultpath"].pop("s3","False") self.write() @@ -305,8 +306,8 @@ def get_data(self,path = None,force = False,display = False): source = "s3" except KeyError: try: - data_localsource = self.registration["data"]["localsource"] - data_name = os.path.basename(data_localsource) + data_local = self.registration["data"]["local"] + data_name = os.path.basename(data_local) source = "local" except: raise AssertionError("Data not registered. Run register_data first.") @@ -325,10 +326,52 @@ def get_data(self,path = None,force = False,display = False): if source == "s3": download(data_s3path,data_localpath,display) elif source == "local": - shutil.copy(data_localsource,data_localpath) + shutil.copy(data_local,data_localpath) self.registration["data"]["local"] = data_localpath self.write() return 1 + + def get_data_multi(self,path = None,force = False,display = False): + """Get currently registered data. If desired, you can pass a path where you would like data to be moved. Otherwise, it will be moved to self.path/self.subdirs[data] + :param path: (optional) the location you want to write data to. + :param force: (optional) by default, will not redownload if data of the same name already lives here. Can override with force = True + :param display: (optional) by default, will not display downlaod progress. + :return: bool (True if downloaded, False if not) + + """ + try: + data_s3path = self.registration["data"]["s3"] + source = "s3" + except KeyError: + try: + data_local = self.registration["data"]["local"] + source = "local" + except: + raise AssertionError("Data not registered. Run register_data first.") + + if path is None: + path = os.path.join(self.path,self.subdirs["data"]) + mkdir_notexists(path) + #pass the local directory instead of a filename -- will populate with all files in remote dir + data_localpath = path + + if source == "s3": + download_success = download_multi(data_s3path,data_localpath,force,display) + if not download_success: + return 0 + elif source == "local": + for filename in os.listdir(data_local): + source_name = os.path.join(data_local,filename) + dest_name = os.path.join(data_localpath,filename) + + if os.path.exists(dest_name) and not force: + print(f"{filename} already exists at this location. Set force = true to overwrite") + return 0 + + shutil.copy(source_name,dest_name) + self.registration["data"]["local"] = str(data_localpath) + self.write() + return 1 def get_config(self,path = None,force = False,display = False): """Get currently registered config. If desired, you can pass a path where you would like config to be moved. Otherwise, it will be moved to self.path/self.subdirs[config] @@ -344,8 +387,8 @@ def get_config(self,path = None,force = False,display = False): source = "s3" except KeyError: try: - config_localsource = self.registration["config"]["localsource"] - config_name = os.path.basename(config_localsource) + config_local = self.registration["config"]["local"] + config_name = os.path.basename(config_local) source = "local" except: raise AssertionError("Config not registered. Run register_config first.") @@ -364,7 +407,7 @@ def get_config(self,path = None,force = False,display = False): if source == "s3": download(config_s3path,config_localpath,display) elif source == "local": - shutil.copy(config_localsource,config_localpath) + shutil.copy(config_local,config_localpath) self.registration["config"]["local"] = config_localpath self.write() return 1 @@ -385,8 +428,8 @@ def get_file(self,varname,path = None,force = False,display = False): source = "s3" except KeyError: try: - file_localsource = self.registration["additional_files"][varname]["localsource"] - file_name = os.path.basename(file_localsource) + file_local = self.registration["additional_files"][varname]["local"] + file_name = os.path.basename(file_local) source = "local" except: raise AssertionError("File not registered. Run register_file first.") @@ -405,7 +448,7 @@ def get_file(self,varname,path = None,force = False,display = False): if source == "s3": download(file_s3path,file_localpath,display) elif source == "local": - shutil.copy(file_localsource,file_localpath) + shutil.copy(file_local,file_localpath) self.registration["additional_files"][varname]["local"] = file_localpath self.write() return 1 @@ -422,7 +465,7 @@ def put_result(self,localfile,display = False): upload(localfile,fullpath,display) except KeyError: try: - fullpath = os.path.join(self.registration["resultpath"]["localsource"],"process_results",filename) + fullpath = os.path.join(self.registration["resultpath"]["local"],"process_results",filename) os.makedirs(os.path.dirname(fullpath),exist_ok = True) shutil.copy(localfile,fullpath) except: @@ -521,7 +564,7 @@ def get_resultpath(self,filepath): resultpath = os.path.join(self.registration["resultpath"]["s3"],"process_results",basename) except KeyError: try: - resultpath = os.path.join(self.registration["resultpath"]["localsource"],"process_results",basename) + resultpath = os.path.join(self.registration["resultpath"]["local"],"process_results",basename) except KeyError: raise KeyError("Not registered.") return resultpath diff --git a/tests/test_Interface_S3.py b/tests/test_Interface_S3.py index 1291022..a18d5c8 100644 --- a/tests/test_Interface_S3.py +++ b/tests/test_Interface_S3.py @@ -16,7 +16,8 @@ def setup_simple_bucket(monkeypatch): username = "user" contents = { "file.json":{"data":"element"}, - "config.json":{"param1":1} + "config.json":{"param1":1}, + "another.json":{"please":"help"} } session = localstack_client.session.Session() @@ -32,13 +33,95 @@ def setup_simple_bucket(monkeypatch): writeobj.put(Body = content) return bucketname,username,contents,s3_client,s3_resource +@pytest.fixture +def setup_complex_bucket(monkeypatch): + """Makes a simple bucket in localstack named testinterface with the following internal structure: + s3://testinterface + |- user + |-file.json + |-config.json + """ + bucketname = "testinterface" + username = "user" + data_dirname = "inputs" + config_dirname = "configs" + contents_datasets = { + "file.json":{"data":"element"}, + "another.json":{"please":"help"} + } + contents_configs = { + "config.json":{"param1":1}, + } + + session = localstack_client.session.Session() + s3_client = session.client("s3") + s3_resource = session.resource("s3") + monkeypatch.setattr(Interface_S3, "s3_client", session.client("s3")) ## TODO I don't think these are scoped correctly w/o a context manager. + monkeypatch.setattr(Interface_S3, "s3", session.resource("s3")) + s3_client.create_bucket(Bucket = bucketname) + for name,content in contents_datasets.items(): + key = os.path.join(username,data_dirname,name) + writeobj = s3_resource.Object(bucketname,key) + content = bytes(json.dumps(content).encode("UTF-8")) + writeobj.put(Body = content) + for name,content in contents_configs.items(): + key = os.path.join(username,config_dirname,name) + writeobj = s3_resource.Object(bucketname,key) + content = bytes(json.dumps(content).encode("UTF-8")) + writeobj.put(Body = content) + return bucketname,username,data_dirname,config_dirname,s3_client,s3_resource + + def test_download(setup_simple_bucket,tmp_path): download_loc = tmp_path / "downloc" download_loc.mkdir() bucket,username,contents,s3_client,s3_resource = setup_simple_bucket s3path = f"s3://{bucket}/{username}/file.json" - Interface_S3.download(s3path,str(download_loc / "file.json")) - Interface_S3.download(s3path,str(download_loc / "file.json"),display = True) + Interface_S3.download(s3path,str(download_loc / "config.json")) + Interface_S3.download(s3path,str(download_loc / "config.json"),display = True) + for obj in s3_resource.Bucket(bucket).objects.all(): + print("\n" + str(obj)) + print("\n\n\n\n") + for item in os.listdir(download_loc): + print("\n" + str(download_loc) + "/" + str(item)) + s3_resource.Bucket("testinterface").objects.all().delete() + + +def test_download_multi_simple(setup_simple_bucket,tmp_path): + download_loc = tmp_path / "downloc" + download_loc.mkdir() + bucket,username,contents,s3_client,s3_resource = setup_simple_bucket + s3path = f"s3://{bucket}/{username}" + Interface_S3.download_multi(s3path,str(download_loc)) + # Interface_S3.download_multi(s3path,str(download_loc),display = True) + for obj in s3_resource.Bucket(bucket).objects.all(): + print("\n" + str(obj)) + print("\n\n\n\n") + for item in os.listdir(download_loc): + print("\n" + str(download_loc) + "/" + str(item)) + s3_resource.Bucket("testinterface").objects.all().delete() + +def test_download_multi_complex(setup_complex_bucket,tmp_path): + download_loc = tmp_path / "downloc" + download_loc.mkdir() + bucket,username,data_dir,config_dir,s3_client,s3_resource = setup_complex_bucket + s3path = f"s3://{bucket}/{username}/{data_dir}" + assert Interface_S3.download_multi(s3path,str(download_loc)) + for item in download_loc.iterdir(): + item.unlink() + # for item in os.listdir(download_loc): + # print("\n" + str(download_loc) + "/" + str(item)) + assert Interface_S3.download_multi(s3path,str(download_loc),display = True) + assert not Interface_S3.download_multi(s3path,str(download_loc)) + + # for obj in s3_resource.Bucket(bucket).objects.all(): + # print("\n" + str(obj)) + # print("\n\n\n\n") + # for item in os.listdir(download_loc): + # print("\n" + str(download_loc) + "/" + str(item)) + + s3_resource.Bucket("testinterface").objects.all().delete() + def test_upload(setup_simple_bucket,tmp_path): upload_loc = tmp_path / "uploc" diff --git a/tests/test_mats/log/DATASTATUS.json b/tests/test_mats/log/DATASTATUS.json index f75a51a..6fb3476 100644 --- a/tests/test_mats/log/DATASTATUS.json +++ b/tests/test_mats/log/DATASTATUS.json @@ -4,16 +4,17 @@ "input": "groupname/inputs/dataset.ext", "status": "SUCCESS", "reason": 0, - "memory_usage": "8192 MB", - "cpu_usage": "12.3 %", - "job_start": "2021-05-20T13:09:14Z", - "job_finish": "2021-05-20T13:09:15Z", + "memory_usage": "385604 MB", + "cpu_usage": "11.7 %", + "job_start": "2023-08-25T12:14:59Z", + "job_finish": "2023-08-25T12:15:00Z", "std": { - "0": "Thu May 20 13:09:14 EDT 2021\n", - "1": "Thu May 20 13:09:15 EDT 2021\n", - "2": "Thu May 20 13:09:15 EDT 2021\n", - "3": "Thu May 20 13:09:15 EDT 2021\n", - "4": "Thu May 20 13:09:15 EDT 2021\n", - "5": "Setting up network. This could take a while. \n" + "0": "/home/cbwash2/neurocaas_contrib/tests/test_mats/sendtime.sh: line 4: activate: No such file or directory\n", + "1": "Fri 25 Aug 2023 12:14:59 PM EDT\n", + "2": "Fri 25 Aug 2023 12:14:59 PM EDT\n", + "3": "Fri 25 Aug 2023 12:14:59 PM EDT\n", + "4": "Fri 25 Aug 2023 12:14:59 PM EDT\n", + "5": "Fri 25 Aug 2023 12:14:59 PM EDT\n", + "6": "Setting up network. This could take a while. \n" } } \ No newline at end of file diff --git a/tests/test_mats/log/certificate.txt b/tests/test_mats/log/certificate.txt index 49603a0..88f9ca2 100644 --- a/tests/test_mats/log/certificate.txt +++ b/tests/test_mats/log/certificate.txt @@ -1,6 +1,6 @@ PER ENVIRONMENT MONITORING: ================ -DATANAME: groupname/inputs/dataset.ext | STATUS: SUCCESS | TIME: 2021_05_20_13_09_16 (finished) | LAST COMMAND: Thu May 20 13:09:14 EDT 2021 Thu May 20 13:09:15 EDT 2021 Thu May 20 13:09:15 EDT 2021 Thu May 20 13:09:15 EDT 2021 Thu May 20 13:09:15 EDT 2021 Setting up network. This could take a while. | CPU_USAGE: 12.3 % +DATANAME: groupname/inputs/dataset.ext | STATUS: SUCCESS | TIME: 2023_08_25_12_15_00 (finished) | LAST COMMAND: /home/cbwash2/neurocaas_contrib/tests/test_mats/sendtime.sh: line 4: activate: No such file or directory Fri 25 Aug 2023 12:14:59 PM EDT Fri 25 Aug 2023 12:14:59 PM EDT Fri 25 Aug 2023 12:14:59 PM EDT Fri 25 Aug 2023 12:14:59 PM EDT Fri 25 Aug 2023 12:14:59 PM EDT Setting up network. This could take a while. | CPU_USAGE: 11.7 % ================ Once jobs start, these logs will be updated regularly. Allow some time [~1 minute] after all jobs finish for results to appear. For more information, see DATASET_NAME: files for stdout and stderr output. diff --git a/tests/test_mats/log/logfile.txt b/tests/test_mats/log/logfile.txt index 0446673..68581f5 100644 --- a/tests/test_mats/log/logfile.txt +++ b/tests/test_mats/log/logfile.txt @@ -1,6 +1,7 @@ -Thu May 20 13:09:14 EDT 2021 -Thu May 20 13:09:15 EDT 2021 -Thu May 20 13:09:15 EDT 2021 -Thu May 20 13:09:15 EDT 2021 -Thu May 20 13:09:15 EDT 2021 +/home/cbwash2/neurocaas_contrib/tests/test_mats/sendtime.sh: line 4: activate: No such file or directory +Fri 25 Aug 2023 12:14:59 PM EDT +Fri 25 Aug 2023 12:14:59 PM EDT +Fri 25 Aug 2023 12:14:59 PM EDT +Fri 25 Aug 2023 12:14:59 PM EDT +Fri 25 Aug 2023 12:14:59 PM EDT Setting up network. This could take a while. diff --git a/tests/test_scripting.py b/tests/test_scripting.py index 2b5890a..9f99b44 100755 --- a/tests/test_scripting.py +++ b/tests/test_scripting.py @@ -513,4 +513,418 @@ def test_cleanup(self,tmp_path,setup_full_bucket): config = s3_client.download_file(bucketname,f"{username}/results/job__test/process_results/config.json",str(subdir / "config.json")) +#modified tests based on assumption that input dir is passed, but data +#also uses download as usual but download_multi for get_data_multi case +class Test_NeuroCAASScriptManager(): + def test_init(self,tmp_path): + subdir = tmp_path / "subdir" + with pytest.raises(AssertionError): + ncsm = scripting.NeuroCAASScriptManager(subdir) + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir,write = False) + assert not os.path.exists(os.path.join(subdir,"registration.json")) + ncsm = scripting.NeuroCAASScriptManager(subdir) + assert os.path.exists(os.path.join(subdir,"registration.json")) + + @pytest.mark.parametrize("input_path",["s3://bucket/groupname/inputs/filename.txt", + "s3://bucket/groupname/inputs"]) + def test_register_dataset(self,tmp_path,input_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_data(input_path) + with open(os.path.join(subdir,"registration.json"),"r") as fp: + data = json.load(fp) + assert data["data"]["s3"] == input_path + + @pytest.mark.parametrize("input_path",["groupname/inputs/filename.txt", + "groupname/inputs"]) + def test_register_dataset_local(self,tmp_path,input_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_data_local(input_path) + with open(os.path.join(subdir,"registration.json"),"r") as fp: + data = json.load(fp) + assert data["data"]["localsource"] == input_path + + def test_register_config(self,tmp_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_config("s3://bucket/groupname/configs/filename.txt") + with open(os.path.join(subdir,"registration.json"),"r") as fp: + config = json.load(fp) + assert config["config"]["s3"] == "s3://bucket/groupname/configs/filename.txt" + + def test_register_config_local(self,tmp_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_config_local("groupname/configs/filename.txt") + with open(os.path.join(subdir,"registration.json"),"r") as fp: + config = json.load(fp) + assert config["config"]["localsource"] == "groupname/configs/filename.txt" + + def test_register_file(self,tmp_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_file("addfile","s3://bucket/groupname/configs/addfile.txt") + with open(os.path.join(subdir,"registration.json"),"r") as fp: + fi = json.load(fp) + assert fi["additional_files"]["addfile"]["s3"] == "s3://bucket/groupname/configs/addfile.txt" + + def test_register_file_local(self,tmp_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_file_local("addfile","groupname/configs/addfile.txt") + with open(os.path.join(subdir,"registration.json"),"r") as fp: + fi = json.load(fp) + assert fi["additional_files"]["addfile"]["localsource"] == "groupname/configs/addfile.txt" + + def test_register_resultpath(self,tmp_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_resultpath("s3://bucket/groupname/resuts/job__test/") + + with open(os.path.join(subdir,"registration.json"),"r") as fp: + fi = json.load(fp) + assert fi["resultpath"]["s3"] == "s3://bucket/groupname/resuts/job__test/" + + def test_register_resultpath(self,tmp_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_resultpath_local("groupname/resuts/job__test/") + + with open(os.path.join(subdir,"registration.json"),"r") as fp: + fi = json.load(fp) + assert fi["resultpath"]["localsource"] == "groupname/resuts/job__test/" + + def test_from_registration(self,tmp_path): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm2 = scripting.NeuroCAASScriptManager.from_registration(subdir) + assert ncsm.registration == ncsm2.registration + + @pytest.mark.parametrize("source",["s3","local"]) + def test_get_data(self,tmp_path,setup_full_bucket,source): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + contentkey = "inputs/file.json" + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + with pytest.raises(AssertionError): + ncsm.get_data() + if source == "s3": + sourcepath = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_data(sourcepath) + elif source == "local": + sourcepath = tmp_path / f"{username}/{contentkey}" + os.makedirs(os.path.dirname(sourcepath)) + with open(str(sourcepath),"w") as f: + json.dump(contents[contentkey],f) + ncsm.register_data_local(sourcepath) + # assert ncsm.get_data() + # assert ncsm.registration["data"]["local"] == str(subdir / "inputs" / "file.json") + # assert not ncsm.get_data() + # assert ncsm.get_data(force = True) + assert ncsm.get_data(path = tmp_path) + assert ncsm.registration["data"]["local"] == str(tmp_path / "file.json") + assert not ncsm.get_data(path = tmp_path) + s3_resource.Bucket("testinterface").objects.all().delete() + + + # registers the data via input dir path instead of file path + @pytest.mark.parametrize("source",["local","s3"]) + def test_get_data_multi(self,tmp_path,setup_full_bucket,source): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + contentdirkey = "inputs" + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + with pytest.raises(AssertionError): + ncsm.get_data_multi() + if source == "s3": + sourcepath = f"s3://{bucketname}/{username}/{contentdirkey}" + ncsm.register_data(sourcepath) + elif source == "local": + for key in contents.keys(): + if key.split("/")[0] == "inputs": + sourcepath = tmp_path / f"{username}/{key}" + if not os.path.exists(os.path.dirname(sourcepath)): + os.makedirs(os.path.dirname(sourcepath)) + with open(str(sourcepath),"w") as f: + json.dump(contents[str(key)],f) + ncsm.register_data_local(os.path.dirname(sourcepath)) + assert ncsm.get_data_multi() + assert ncsm.registration["data"]["local"] == str(subdir / "inputs") + assert not ncsm.get_data_multi() + assert ncsm.get_data_multi(force = True) + assert ncsm.get_data_multi(path = tmp_path) + assert ncsm.registration["data"]["local"] == str(tmp_path) + assert not ncsm.get_data_multi(path = tmp_path) + + + @pytest.mark.parametrize("source",["s3","local"]) + def test_get_config(self,tmp_path,setup_full_bucket,source): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + contentkey = "configs/config.json" + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + with pytest.raises(AssertionError): + ncsm.get_config() + if source == "s3": + sourcepath = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_config(sourcepath) + elif source == "local": + sourcepath = tmp_path / f"{username}/{contentkey}" + os.makedirs(os.path.dirname(sourcepath)) + with open(str(sourcepath),"w") as f: + json.dump(contents[contentkey],f) + ncsm.register_config_local(sourcepath) + assert ncsm.get_config() + assert ncsm.registration["config"]["local"] == str(subdir / "configs" / "config.json") + assert not ncsm.get_config() + assert ncsm.get_config(force = True) + assert ncsm.get_config(path = tmp_path) + assert ncsm.registration["config"]["local"] == str(tmp_path / "config.json") + assert not ncsm.get_config(path = tmp_path) + + @pytest.mark.parametrize("source",["s3","local"]) + def test_get_file(self,tmp_path,setup_full_bucket,source): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + contentkey = "inputs/extra.json" + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + filename = "extra" + with pytest.raises(AssertionError): + ncsm.get_file(filename) + if source == "s3": + sourcepath = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_file(filename,sourcepath) + elif source == "local": + sourcepath = tmp_path / f"{username}/{contentkey}" + os.makedirs(os.path.dirname(sourcepath)) + with open(str(sourcepath),"w") as f: + json.dump(contents[contentkey],f) + ncsm.register_file_local(filename,sourcepath) + assert ncsm.get_file(filename) + assert ncsm.registration["additional_files"][filename]["local"] == str(subdir / "inputs" / "extra.json") + assert not ncsm.get_file(filename) + assert ncsm.get_file(filename,force = True) + assert ncsm.get_file(filename,path = tmp_path) + assert ncsm.registration["additional_files"][filename]["local"] == str(tmp_path / "extra.json") + assert not ncsm.get_file(filename,path = tmp_path) + + @pytest.mark.parametrize("source",["s3","local"]) + def test_put_result(self,tmp_path,setup_full_bucket,source): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + subdir = tmp_path / "subdir" + subdir.mkdir() + fullpath = subdir / "file.txt" + fullpath.open("w") + writepath = tmp_path / "outdir" + writepath.mkdir() + s3path = f"s3://{bucketname}/{username}/results/job__test" + ncsm = scripting.NeuroCAASScriptManager(subdir) + with pytest.raises(AssertionError): + ncsm.put_result(str(fullpath)) + if source == "s3": + ncsm.register_resultpath(s3path) + ncsm.put_result(str(fullpath)) + loc = s3_client.download_file(bucketname,f"{username}/results/job__test/process_results/file.txt",str(subdir / "file2.txt")) + elif source == "local": + ncsm.register_resultpath_local(writepath) + ncsm.put_result(str(fullpath)) + assert os.path.exists(os.path.join(writepath,"process_results",os.path.basename(fullpath))) + + def test_get_name(self,tmp_path): + contents_empty = {"s3":None,"local":None} + contents_remote = {"s3":"s3://bucket/group/inputs/key.txt","local":None} + contents_full = {"s3":"s3://bucket/group/inputs/key.txt","local":"here/key.txt"} + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + with pytest.raises(AssertionError): + ncsm.get_name(contents_empty) + with pytest.raises(AssertionError): + nr = ncsm.get_name(contents_remote) + nf = ncsm.get_name(contents_full) + assert nf == "key.txt" + + def test_get_group(self,tmp_path): + contents_empty = {"s3":None,"local":None} + contents_remote = {"s3":"s3://bucket/group/inputs/key.txt","local":None} + contents_full = {"s3":"s3://bucket/group/inputs/key.txt","local":"here/key.txt"} + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + with pytest.raises(AssertionError): + ncsm.get_group(contents_empty) + gr = ncsm.get_group(contents_remote) + gf = ncsm.get_group(contents_full) + assert gr == gf == "group" + + def test_get_path(self,tmp_path): + contents_empty = {"s3":None,"local":None} + contents_remote = {"s3":"s3://bucket/group/inputs/key.txt","local":None} + contents_full = {"s3":"s3://bucket/group/inputs/key.txt","local":"here/key.txt"} + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + with pytest.raises(AssertionError): + ncsm.get_path(contents_empty) + with pytest.raises(AssertionError): + ncsm.get_path(contents_remote) + pf = ncsm.get_path(contents_full) + assert pf == "here/key.txt" + + @pytest.mark.parametrize("filetype,datacontentkey",[("data","inputs"),("data","inputs/file.json"),("config","inputs"),("file","inputs")]) + def test_get_names(self,tmp_path,filetype,setup_full_bucket,datacontentkey): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + if filetype == "data": + contentkey = datacontentkey + s3path = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_data(s3path) + + if contentkey == "inputs/file.json": + ncsm.get_data() + assert ncsm.get_dataname() == os.path.basename(contentkey) + else: + ncsm.get_data_multi() + assert ncsm.get_dataname() == "inputs" + if filetype == "config": + contentkey = "configs/config.json" + s3path = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_config(s3path) + ncsm.get_config() + assert ncsm.get_configname() == os.path.basename(contentkey) + if filetype == "file": + contentkey = "inputs/extra.json" + s3path = f"s3://{bucketname}/{username}/{contentkey}" + name = "extra" + ncsm.register_file(name,s3path) + ncsm.get_file(name) + assert ncsm.get_filename(name) == os.path.basename(contentkey) + + @pytest.mark.parametrize("filetype,datacontentkey",[("data","inputs"),("data","inputs/file.json"),("config","inputs"),("file","inputs")]) + def test_get_paths(self,tmp_path,filetype,setup_full_bucket,datacontentkey): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + if filetype == "data": + contentkey = datacontentkey + s3path = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_data(s3path) + if contentkey == "inputs/file.json": + ncsm.get_data() + assert ncsm.get_datapath() == os.path.join(subdir,"inputs",os.path.basename(contentkey)) + else: + ncsm.get_data_multi() + assert ncsm.get_datapath() == os.path.join(subdir,"inputs") + + if filetype == "config": + contentkey = "configs/config.json" + s3path = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_config(s3path) + ncsm.get_config() + assert ncsm.get_configpath() == os.path.join(subdir,"configs",os.path.basename(contentkey)) + if filetype == "file": + contentkey = "inputs/extra.json" + s3path = f"s3://{bucketname}/{username}/{contentkey}" + name = "extra" + ncsm.register_file(name,s3path) + ncsm.get_file(name) + assert ncsm.get_filepath(name) == os.path.join(subdir,"inputs",os.path.basename(contentkey)) + + @pytest.mark.parametrize("contentkey",["inputs","inputs/file.json"]) + def test_get_bucket_name(self,tmp_path,setup_full_bucket,contentkey): + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + s3path = f"s3://{bucketname}/{username}/{contentkey}" + ncsm.register_data(s3path) + ncsm.get_data() if contentkey == "inputs/file.json" else ncsm.get_data_multi() + assert ncsm.get_bucket_name() == bucketname + + def test_get_resultpath(self,tmp_path,setup_full_bucket): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + s3path = f"s3://{bucketname}/{username}/results/job__test" + ncsm.register_resultpath(s3path) + s3filepath = ncsm.get_resultpath("path/to/file.txt") + s3dirpath = ncsm.get_resultpath("path/to/dir") + assert s3filepath == os.path.join(s3path,"process_results","file.txt") + assert s3dirpath == os.path.join(s3path,"process_results","dir") + + @pytest.mark.parametrize("logging",["local","s3"]) + def test_log_command(self,tmp_path,setup_full_bucket,logging): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + + badscript = os.path.join(loc,"test_mats","sendtime_br.sh") + goodscript = os.path.join(loc,"test_mats","sendtime.sh") + logpath = os.path.join(loc,"test_mats","log") + + if logging == "local": + brcode = ncsm.log_command(shlex.split(badscript),"s3://fake/fake/results/job__test/logs/DATASET_NAME-file.json_STATUS.txt.json") + gdcode = ncsm.log_command(shlex.split(goodscript),"s3://fake/fake/results/job__test/logs/DATASET_NAME-file.json_STATUS.txt.json") + assert "log.txt" in os.listdir(subdir / "logs") + assert "certificate.txt" in os.listdir(subdir / "logs") + with open(os.path.join(subdir,"logs","certificate.txt"),"r") as f: + lines = f.readlines() + assert lines[2].startswith("DATANAME: groupname/inputs/dataset.ext | STATUS: SUCCESS") + + if logging == "s3": + brcode = ncsm.log_command(shlex.split(badscript),f"s3://{bucketname}/{username}/results/job__test/logs/DATASET_NAME-file.json_STATUS.txt.json") + status = s3_client.download_file(bucketname,f"{username}/results/job__test/logs/DATASET_NAME-file.json_STATUS.txt.json",os.path.join(subdir,"status.json")) + certificate = s3_client.download_file(bucketname,f"{username}/results/job__test/logs/certificate.txt",os.path.join(subdir,"certificate.txt")) + + with open(os.path.join(subdir,"certificate.txt"),"r") as f: + lines = f.readlines() + assert lines[2].startswith("DATANAME: groupname/inputs/dataset.ext | STATUS: FAILED") + assert lines[10].startswith("WARNING: this is a template certificate") + with open(os.path.join(subdir,"status.json"),"r") as f: + status = json.load(f) + gdcode = ncsm.log_command(shlex.split(goodscript),f"s3://{bucketname}/{username}/results/job__test/logs/DATASET_NAME-file.json_STATUS.txt.json") + status = s3_client.download_file(bucketname,f"{username}/results/job__test/logs/DATASET_NAME-file.json_STATUS.txt.json",os.path.join(subdir,"status.json")) + certificate = s3_client.download_file(bucketname,f"{username}/results/job__test/logs/certificate.txt",os.path.join(subdir,"certificate.txt")) + with open(os.path.join(subdir,"certificate.txt"),"r") as f: + lines = f.readlines() + assert lines[2].startswith("DATANAME: groupname/inputs/dataset.ext | STATUS: SUCCESS") + assert lines[10].startswith("WARNING: this is a template certificate") + with open(os.path.join(subdir,"status.json"),"r") as f: + status = json.load(f) + + def test_cleanup(self,tmp_path,setup_full_bucket): + bucketname,username,contents,s3_client,s3_resource = setup_full_bucket + s3path = f"s3://{bucketname}/{username}/results/job__test" + contentkey = "configs/config.json" + subdir = tmp_path / "subdir" + subdir.mkdir() + ncsm = scripting.NeuroCAASScriptManager(subdir) + ncsm.register_config(f"s3://{bucketname}/{username}/{contentkey}") + ncsm.register_resultpath(s3path) + ncsm.cleanup() + update = s3_client.download_file(bucketname,f"{username}/results/job__test/process_results/update.txt",str(subdir / "update.txt")) + config = s3_client.download_file(bucketname,f"{username}/results/job__test/process_results/config.json",str(subdir / "config.json"))