From 0bb8554ab6ff2b7bd5607df7d307a0ef5bd93be0 Mon Sep 17 00:00:00 2001 From: Shettland Date: Tue, 16 Jul 2024 11:32:33 +0200 Subject: [PATCH 1/4] Better sftp cleaning and download logging --- relecov_tools/sftp_handle.py | 145 +++++++++++++++++++++-------------- 1 file changed, 86 insertions(+), 59 deletions(-) diff --git a/relecov_tools/sftp_handle.py b/relecov_tools/sftp_handle.py index c485149e..0160fb17 100755 --- a/relecov_tools/sftp_handle.py +++ b/relecov_tools/sftp_handle.py @@ -189,9 +189,6 @@ def recursive_list(folder_name): log.error("Invalid folder at remote sftp %s", e) raise for attribute in attribute_list: - # Messy workaround for corrupted folder - if "ion_torrent" in attribute.filename: - continue if stat.S_ISDIR(attribute.st_mode): abspath = os.path.join(folder_name, attribute.filename) directory_list.append(abspath) @@ -266,9 +263,9 @@ def create_local_folder(self, folder, date): log.info("Creating folder %s to download files", folder) platform_storage_folder = self.platform_storage_folder if platform_storage_folder == folder: - local_folder_path = os.path.join(platform_storage_folder, date) + local_folder_path = platform_storage_folder else: - local_folder_path = os.path.join(platform_storage_folder, folder, date) + local_folder_path = os.path.join(platform_storage_folder, folder) os.makedirs(local_folder_path, exist_ok=True) log.info("created the folder to download files %s", local_folder_path) return local_folder_path @@ -332,7 +329,6 @@ def verify_md5_checksum(self, local_folder, fetched_files, fetched_md5): hash_dict = relecov_tools.utils.read_md5_checksum(fetched_md5, avoid_chars) if not hash_dict: error_text = "md5sum file could not be read, md5 hashes won't be validated" - log.warning(error_text) self.include_warning(error_text) return fetched_files, False # check md5 checksum for each file @@ -362,7 +358,8 @@ def create_files_with_metadata_info( metadata_file (str): Name of the downloaded metadata file to rename it """ samples_to_delete = [] - prefix_file_name = "_".join(local_folder.split("/")[-3:-1]) + prefix_file_name = "_".join(local_folder.split("/")[-2:]) + prefix_file_name = prefix_file_name.replace("_tmp_processing", "") # TODO: Move these prefixes to configuration.json new_metadata_file = "metadata_lab_" + prefix_file_name + ".xlsx" sample_data_file = "samples_data_" + prefix_file_name + ".json" @@ -373,7 +370,6 @@ def create_files_with_metadata_info( data = copy.deepcopy(samples_dict) for sample, values in data.items(): if not all(val for val in values): - log.error(str(error_text % sample)) self.include_error(str(error_text % sample), sample) samples_to_delete.append(sample) continue @@ -416,7 +412,6 @@ def remove_duplicated_values(self, sample_file_dict): clean_sample_dict = {key: sample_file_dict[key] for key in non_duplicated_keys} if dup_samples_list: error_text = "Multiple samples in metadata pointing to the same file: %s" - log.warning(error_text % duplicated_dict) self.include_warning(error_text % duplicated_dict) stderr.print(f"[Orange]{error_text}") stderr.print("[Orange]These samples won't be processed: ", dup_samples_list) @@ -517,12 +512,10 @@ def get_sample_fastq_file_names(self, local_folder, meta_f_path): continue if row[index_layout] == "paired" and row[index_fastq_r2] is None: error_text = "Sample %s is paired-end, but no R2 given" - log.error(str(error_text % str(row[index_sampleID]))) self.include_error(error_text % str(row[index_sampleID]), s_name) row_complete = False if row[index_layout] == "single" and row[index_fastq_r2] is not None: error_text = "Sample %s is single-end, but R1&R2 given" - log.error(str(error_text % str(row[index_sampleID]))) self.include_error(error_text % str(row[index_sampleID]), s_name) row_complete = False if row_complete: @@ -537,9 +530,9 @@ def get_sample_fastq_file_names(self, local_folder, meta_f_path): ].strip() else: log_text = "Fastq_R1 not defined in Metadata for sample %s" - log.error(str(log_text % s_name)) stderr.print(f"[red]{str(log_text % s_name)}") self.include_error(entry=str(log_text % s_name), sample=s_name) + del sample_file_dict[s_name] else: self.include_warning(entry=f"Row {counter} skipped. No sample ID given") # Remove duplicated files @@ -596,7 +589,6 @@ def download_remote_metafile(target_meta_file): meta_df_list.append(loc_meta_df) except (ParserError, EmptyDataError, MetadataError, KeyError) as e: error_text = f"Could not process {os.path.basename(loc_meta)}: {e}" - log.error(error_text) self.include_error(error_text) os.remove(loc_meta) if meta_df_list: @@ -653,9 +645,8 @@ def validate_remote_files(self, remote_folder, local_folder): metafiles_list = sorted( sum([list(fi.values()) for _, fi in sample_files_dict.items()], []) ) - if all(file in filtered_files_list for file in metafiles_list): + if sorted(filtered_files_list) == sorted(metafiles_list): log.info("Files in %s match with metadata file", remote_folder) - stderr.print("Successfully validated files based on metadata") else: log_text = "Some files in %s do not match the ones described in metadata" log.warning(log_text % remote_folder) @@ -664,12 +655,12 @@ def validate_remote_files(self, remote_folder, local_folder): mismatch_files = [fi for fi in filtered_files_list if fi not in set_list] mismatch_rev = [fi for fi in set_list if fi not in filtered_files_list] - error_text1 = "Files in folder missing in metadata %s" - log.warning(error_text1 % str(mismatch_files)) - self.include_warning(error_text1 % str(mismatch_files)) - error_text2 = "Files in metadata missing in folder %s" - log.warning(error_text2 % str(mismatch_rev)) - self.include_warning(error_text2 % str(mismatch_rev)) + if mismatch_files: + error_text1 = "Files in folder missing in metadata %s" + self.include_warning(error_text1 % str(mismatch_files)) + if mismatch_rev: + error_text2 = "Files in metadata missing in folder %s" + self.include_warning(error_text2 % str(mismatch_rev)) # Try to check if the metadata filename lacks the proper extension log.info("Trying to match files without proper file extension") sample_files_dict = self.process_filedict( @@ -679,33 +670,63 @@ def validate_remote_files(self, remote_folder, local_folder): raise FileNotFoundError( "No files from metadata found in %s" % remote_folder ) + stderr.print("[blue]Finished validating files based on metadata") return sample_files_dict, local_meta_file - def delete_remote_files(self, remote_folder, files): + def delete_remote_files(self, remote_folder, files=None, skip_seqs=False): """Delete files from remote folder Args: remote_folder (str): path to folder in remote repository - files (list(str)): list of file basenames in remote repository + files (list(str), optional): list of target filenames in remote repository + skip_seqs (bool, optional): Skip sequencing files based on extension """ stderr.print(f"[blue]Deleting files in {remote_folder}...") - empty_folder = True - for file in files: + if files is None: + files_to_remove = self.get_file_list(remote_folder) + else: + files_to_remove = files + if any(file.endswith(tuple(self.allowed_file_ext)) for file in files_to_remove): + if skip_seqs is True: + log_text = f"Folder {remote_folder} has sequencing files. Not removed." + self.include_warning(log_text) + return + for file in files_to_remove: try: self.client.remove(os.path.join(remote_folder, os.path.basename(file))) log.info("%s Deleted from remote server", file) except (IOError, PermissionError) as e: - log.error("Could not delete file %s.", str(e)) - stderr.print(f"Could not delete file {file}. Error: {e}") - empty_folder = False - continue - if empty_folder: + self.include_warning(f"Could not delete remote file {file}: {e}") + stderr.print(f"Could not delete remote file {file}. Error: {e}") + return + + def delete_remote_folder(self, remote_folder): + """Delete a folder from remote sftp, check if it is empty or not first. + + Args: + remote_folder (str): path to folder in remote repository + """ + + def remove_client_dir(remote_folder): + # Never remove a folder in the top level if len(remote_folder.replace("./", "").split("/")) >= 2: + log.info("Trying to remove %s", remote_folder) try: self.client.rmdir(remote_folder) + log.info("Successfully removed %s", remote_folder) except (OSError, PermissionError) as e: - log.error("Could not delete folder %s.", str(e)) - stderr.print(f"Could not delete folder {remote_folder}. Error: {e}") + log_text = f"Could not delete remote {remote_folder}. Error: {e}" + self.include_warning(log_text) + stderr.print(log_text) + else: + log.info("%s is a top-level folder. Not removed", remote_folder) + + remote_folder_files = self.get_file_list(remote_folder) + if remote_folder_files: + log_text = f"Remote folder {remote_folder} not empty. Not removed" + self.include_warning(log_text) + else: + remove_client_dir(remote_folder) return def move_processing_fastqs(self, folders_with_metadata): @@ -733,11 +754,9 @@ def move_processing_fastqs(self, folders_with_metadata): except OSError: if file in folders_with_metadata[folder]: error_text = "File named %s already in %s. Skipped" - log.warning(error_text % (file, self.current_folder)) self.include_warning(error_text % (file, self.current_folder)) else: error_text = "Error while moving file %s" - log.error(error_text % file) self.include_error(error_text % file) folders_with_metadata[folder] = successful_files return folders_with_metadata @@ -757,8 +776,6 @@ def merge_md5sums(self, folders_with_metadata): folders_with_metadata: Same dict updated with the merged md5sum file """ output_location = self.platform_storage_folder - log.info("Merging md5sum files from remote folders...") - stderr.print("[blue]Merging md5sum files from remote folders...") # TODO: Include this function in relecov_tools.utils def md5_merger(md5_filelist, avoid_chars=None): @@ -802,12 +819,13 @@ def md5_handler(md5sumlist, output_location): for folder, files in folders_with_metadata.items(): self.current_folder = folder.split("/")[0] + log.info("Merging md5sum files from %s...", self.current_folder) + stderr.print(f"[blue]Merging md5sum files from {self.current_folder}...") md5flags = [".md5", "md5sum", "md5checksum"] md5sumlist = [fi for fi in files if any(flag in fi for flag in md5flags)] if not md5sumlist: - error_text = "No md5sum could be found in remote folder" - log.warning(error_text) - stderr.print(f"[yellow]{error_text}") + error_text = "No md5sum could be found in remote folder %s" + stderr.print(f"[yellow]{error_text % folder}") self.include_warning(error_text) continue folders_with_metadata[folder] = [fi for fi in files if fi not in md5sumlist] @@ -815,7 +833,6 @@ def md5_handler(md5sumlist, output_location): uploaded_md5 = md5_handler(md5sumlist, output_location) except (FileNotFoundError, OSError, PermissionError, CsvError) as e: error_text = "Could not merge md5files for %s. Reason: %s" - log.warning(error_text % (self.current_folder, str(e))) stderr.print(f"[yellow]{error_text % (self.current_folder, str(e))}") self.include_warning(error_text % (self.current_folder, str(e))) continue @@ -931,6 +948,7 @@ def pre_validate_folder(folder, folder_files): return downloaded_metadata folders_with_metadata = {} + processed_folders = [] merged_df = merged_excel_path = last_main_folder = excel_name = None log.info("Setting %s remote folders...", str(len(target_folders.keys()))) stderr.print(f"[blue]Setting {len(target_folders.keys())} remote folders...") @@ -964,7 +982,6 @@ def pre_validate_folder(folder, folder_files): folders_with_metadata[last_main_folder].append(excel_name) except OSError: error_text = "Error uploading merged metadata back to sftp: %s" - log.error(error_text % last_main_folder) self.include_error(error_text % last_main_folder) del folders_with_metadata[last_main_folder] try: @@ -972,8 +989,8 @@ def pre_validate_folder(folder, folder_files): except (ParserError, EmptyDataError, MetadataError, KeyError) as e: meta_name = os.path.basename(downloaded_metadata) error_text = "%s skipped. Error while processing excel %s: %s" - log.error(error_text % (main_folder, meta_name, str(e))) self.include_error(error_text % (main_folder, meta_name, str(e))) + os.remove(local_meta) continue folders_with_metadata[temp_folder] = [] folders_with_metadata[temp_folder].extend(filelist) @@ -988,6 +1005,8 @@ def pre_validate_folder(folder, folder_files): folders_with_metadata[temp_folder].extend(filelist) new_df = self.excel_to_df(local_meta, metadata_ws, header_flag) merged_df = self.merge_metadata(metadata_ws, merged_df, new_df) + os.remove(local_meta) + processed_folders.append(folder) # End of loop # Write last dataframe to file once loop is finished @@ -1003,7 +1022,7 @@ def pre_validate_folder(folder, folder_files): log_text = "Remote folders merged into %s folders. Proceed with processing" log.info(log_text % len(clean_target_folders.keys())) stderr.print(f"[green]{log_text % len(clean_target_folders.keys())}") - return clean_target_folders + return clean_target_folders, processed_folders def select_target_folders(self): """Find the selected folders in remote if given, else select every folder @@ -1068,7 +1087,6 @@ def compress_and_update(self, fetched_files, files_to_compress, local_folder): compressed = relecov_tools.utils.compress_file(f_path) if not compressed: error_text = "Could not compress file %s, file not found" % str(file) - log.error(error_text) self.include_error(error_text, f_path) continue # Remove file after compression is completed @@ -1168,7 +1186,6 @@ def download(self, target_folders, option="download"): ) if not fetched_files: error_text = "No files could be downloaded in folder %s" % str(folder) - log.warning(error_text) stderr.print(f"{error_text}") self.include_error(error_text) continue @@ -1197,9 +1214,8 @@ def download(self, target_folders, option="download"): corr_fold = os.path.join(local_folder, "corrupted") os.mkdir(corr_fold) error_text = "Found corrupted files: %s. Moved to: %s" - log.warning(error_text % (str(corrupted), corr_fold)) stderr.print(f"[red]{error_text % (str(corrupted), corr_fold)}") - self.include_warning(error_text % (str(corrupted), "/corrupt/")) + self.include_warning(error_text % (str(corrupted), corr_fold)) for corr_file in corrupted: path = os.path.join(local_folder, corr_file) try: @@ -1214,7 +1230,6 @@ def download(self, target_folders, option="download"): ) if self.abort_if_md5_mismatch: error_text = "Stop processing %s due to corrupted files." - log.error(error_text % folder) stderr.print(f"[red]{error_text % folder}") self.include_error(error_text % "folder") relecov_tools.utils.delete_local_folder(local_folder) @@ -1227,9 +1242,8 @@ def download(self, target_folders, option="download"): else: corrupted = [] error_text = "No single md5sum file could be found in %s" % folder - log.warning(error_text) stderr.print(f"[red]{error_text}") - self.include_error(error_text) + self.include_warning(error_text) clean_fetchlist = [ fi for fi in fetched_files if fi.endswith(tuple(self.allowed_file_ext)) @@ -1279,7 +1293,10 @@ def download(self, target_folders, option="download"): # If download_option is "download_clean", remove # sftp folder content after download is finished if option == "clean": - self.delete_remote_files(folder, files_to_download) + self.delete_remote_files(folder, files=files_to_download) + self.delete_remote_files(folder, skip_seqs=True) + self.delete_remote_folder(folder) + stderr.print(f"Delete process finished in {folder}") stderr.print(f"[green]Finished processing {folder}") return @@ -1302,20 +1319,30 @@ def execute_process(self): stderr.print("[red]Unable to establish sftp connection") sys.exit(1) target_folders = self.select_target_folders() - target_folders = self.merge_subfolders(target_folders) - if self.download_option == "download_only": - self.download(target_folders, option="download") - if self.download_option == "download_clean": - self.download(target_folders, option="clean") if self.download_option == "delete_only": log.info("Initiating delete_only process") - for folder, files in target_folders.items(): - self.delete_remote_files(folder, files) + for folder in target_folders.keys(): + self.current_folder = folder + self.delete_remote_files(folder) + self.delete_remote_folder(folder) stderr.print(f"Delete process finished in {folder}") + else: + target_folders, processed_folders = self.merge_subfolders(target_folders) + if self.download_option == "download_only": + self.download(target_folders, option="download") + if self.download_option == "download_clean": + self.download(target_folders, option="clean") + for folder in processed_folders: + self.current_folder = folder + self.delete_remote_files(folder, skip_seqs=True) + self.delete_remote_folder(folder) + stderr.print(f"Delete process finished in {folder}") + self.close_connection() + stderr.print(f"Processed {len(processed_folders)} folders: {processed_folders}") if self.logsum.logs: log.info("Printing process summary to %s", self.platform_storage_folder) - self.logsum.create_error_summary() + self.logsum.create_error_summary(called_module="download") else: log.info("Process log summary was empty. Not generated.") stderr.print("Finished execution") From f7338ff62e6d0a8dd58adc45c9a9f1310d27ec69 Mon Sep 17 00:00:00 2001 From: Shettland Date: Tue, 16 Jul 2024 11:58:16 +0200 Subject: [PATCH 2/4] Better remote cleaning. Fix1 --- relecov_tools/sftp_handle.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/relecov_tools/sftp_handle.py b/relecov_tools/sftp_handle.py index 0160fb17..db9a4b03 100755 --- a/relecov_tools/sftp_handle.py +++ b/relecov_tools/sftp_handle.py @@ -1321,7 +1321,8 @@ def execute_process(self): target_folders = self.select_target_folders() if self.download_option == "delete_only": log.info("Initiating delete_only process") - for folder in target_folders.keys(): + processed_folders = target_folders.keys() + for folder in processed_folders: self.current_folder = folder self.delete_remote_files(folder) self.delete_remote_folder(folder) From 41f6fa2cc75ece0cb485ed55e15a2ac8f36b6618 Mon Sep 17 00:00:00 2001 From: Shettland Date: Thu, 18 Jul 2024 16:30:11 +0200 Subject: [PATCH 3/4] Removed tmp_processing from downloaded folder name --- relecov_tools/sftp_handle.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/relecov_tools/sftp_handle.py b/relecov_tools/sftp_handle.py index db9a4b03..8911ae8a 100755 --- a/relecov_tools/sftp_handle.py +++ b/relecov_tools/sftp_handle.py @@ -250,12 +250,11 @@ def get_from_sftp(self, file, destination, exist_ok=False): log.error("Unable to fetch file %s ", e) return False - def create_local_folder(self, folder, date): + def create_local_folder(self, folder): """Create folder to download files in local path using date Args: folder (str): name of remote folder to be downloaded - date (datetime.datetime): datetime in YYYYMMDD format Returns: local_folder_path(str): path to the new created folder @@ -265,6 +264,7 @@ def create_local_folder(self, folder, date): if platform_storage_folder == folder: local_folder_path = platform_storage_folder else: + folder = folder.strip("_tmp_processing") local_folder_path = os.path.join(platform_storage_folder, folder) os.makedirs(local_folder_path, exist_ok=True) log.info("created the folder to download files %s", local_folder_path) @@ -681,7 +681,7 @@ def delete_remote_files(self, remote_folder, files=None, skip_seqs=False): files (list(str), optional): list of target filenames in remote repository skip_seqs (bool, optional): Skip sequencing files based on extension """ - stderr.print(f"[blue]Deleting files in {remote_folder}...") + stderr.print(f"[blue]Deleting files in remote {remote_folder}...") if files is None: files_to_remove = self.get_file_list(remote_folder) else: @@ -1167,7 +1167,7 @@ def download(self, target_folders, option="download"): stderr.print("[blue]Processing folder " + folder) # Validate that the files are the ones described in metadata. - local_folder = self.create_local_folder(folder, date) + local_folder = self.create_local_folder(folder) try: valid_filedict, meta_file = self.validate_remote_files( folder, local_folder @@ -1296,7 +1296,7 @@ def download(self, target_folders, option="download"): self.delete_remote_files(folder, files=files_to_download) self.delete_remote_files(folder, skip_seqs=True) self.delete_remote_folder(folder) - stderr.print(f"Delete process finished in {folder}") + stderr.print(f"Delete process finished in remote {folder}") stderr.print(f"[green]Finished processing {folder}") return From 6d592222ef384889b8d91f9c747cd96fe4325332 Mon Sep 17 00:00:00 2001 From: Shettland Date: Thu, 18 Jul 2024 16:32:05 +0200 Subject: [PATCH 4/4] updated sftp_handle. Linting --- relecov_tools/sftp_handle.py | 1 - 1 file changed, 1 deletion(-) diff --git a/relecov_tools/sftp_handle.py b/relecov_tools/sftp_handle.py index 8911ae8a..4437afbb 100755 --- a/relecov_tools/sftp_handle.py +++ b/relecov_tools/sftp_handle.py @@ -1153,7 +1153,6 @@ def download(self, target_folders, option="download"): log.error("You do not have permissions to create folder %s", e) sys.exit(1) folders_to_download = target_folders - date = datetime.today().strftime("%Y%m%d") for folder in folders_to_download.keys(): self.current_folder = folder.split("/")[0] # Close previously open connection to avoid timeouts