diff --git a/src/cbiohub/analyze.py b/src/cbiohub/analyze.py index 43407d5..9db0965 100644 --- a/src/cbiohub/analyze.py +++ b/src/cbiohub/analyze.py @@ -1,30 +1,11 @@ from pathlib import Path from typing import Union -import pyarrow.parquet as pq +import pandas as pd import pyarrow.dataset as ds import duckdb -import pyarrow as pa -import pandas as pd -from dynaconf import settings from cbiohub.variant import GenomicVariant, ProteinVariant - -MUTATION_COLUMNS = { - "Chromosome": pa.string(), - "Start_Position": pa.string(), - "End_Position": pa.string(), - "Reference_Allele": pa.string(), - "Tumor_Seq_Allele1": pa.string(), - "Tumor_Seq_Allele2": pa.string(), - "t_ref_count": pa.string(), - "t_alt_count": pa.string(), - "n_ref_count": pa.string(), - "n_alt_count": pa.string(), - "Hugo_Symbol": pa.string(), - "HGVSp_Short": pa.string(), - "Tumor_Sample_Barcode": pa.string(), - "study_id": pa.string(), -} +from cbiohub.data_commands import MUTATION_COLUMNS def get_combined_df(directory=None): diff --git a/src/cbiohub/data_access.py b/src/cbiohub/data_access.py deleted file mode 100644 index c079b57..0000000 --- a/src/cbiohub/data_access.py +++ /dev/null @@ -1,90 +0,0 @@ -"TODO: This file is not really used anymore - just inspiration" -import pandas as pd -import glob -import os -from typing import Literal -from tqdm import tqdm -import pyarrow.parquet as pq -import pyarrow as pa -from dynaconf import settings - - -def get_local_all_combined_data_from_folders(folder): - return { - "maf": get_local_combined_data_from_folders(folder, "mutations"), - "clinp": get_local_combined_data_from_folders(folder, "patient"), - "clins": get_local_combined_data_from_folders(folder, "sample"), - } - - -def get_local_combined_data_from_folders( - folder, type: Literal["sample", "patient", "mutations"] -): - if type == "sample" or type == "patient": - basefilenames = ["data_clinical_{}.txt".format(type)] - skiprows = 4 - comment = "#" - dtype = str - INCLUDE_COLUMNS = None - else: - basefilenames = ["data_mutations.txt", "data_mutations_extended.txt"] - skiprows = 0 - comment = "#" - dtype = str - INCLUDE_COLUMNS = None - # INCLUDE_COLUMNS=[ - # 'Hugo_Symbol', - # 'Chromosome', - # 'Start_Position', - # 'End_Position', - # 'Reference_Allele', - # 'Tumor_Seq_Allele1', - # 'Tumor_Seq_Allele2', - # 'STUDY_ID' - # ] - # dtype={ - # 'Chromosome':str, - # 'Tumor_Seq_Allele1':str, - # 'Tumor_Seq_Allele2':str, - # 't_ref_count':'Int64', - # 't_alt_count':'Int64', - # 'n_ref_count':'Int64', - # 'n_alt_count':'Int64', - # 'Validation_Status':str, - # } - - files = [] - for basefilename in basefilenames: - files += glob.glob(os.path.expanduser(folder + "**/" + basefilename)) - print("Found {} {} files".format(len(files), basefilename)) - - tables = [] - - for f in tqdm(files): - parquet_filename = os.path.splitext(f)[0] + ".parquet" - - if os.path.exists(parquet_filename) and os.path.getmtime( - parquet_filename - ) >= os.path.getmtime(f): - # Use the existing Parquet file - table = pq.read_table(parquet_filename) - else: - try: - df = pd.read_csv( - f, sep="\t", comment="#", low_memory=False, dtype=dtype - ) - df["STUDY_ID"] = f.split("/")[-2] - if INCLUDE_COLUMNS is not None: - df = df[INCLUDE_COLUMNS] - except: - print("Error parsing {}: ignoring".format(f)) - continue - - # Write the DataFrame to a Parquet file - table = pa.Table.from_pandas(df) - pq.write_table(table, parquet_filename) - - tables.append(table) - - combined_table = pa.concat_tables(tables, promote=True) - return combined_table.to_pandas() diff --git a/src/cbiohub/data_commands.py b/src/cbiohub/data_commands.py index 9d048e5..8e3a0ab 100644 --- a/src/cbiohub/data_commands.py +++ b/src/cbiohub/data_commands.py @@ -1,8 +1,31 @@ +import time import click from tqdm import tqdm from pathlib import Path - -from cbiohub.study import Study +import pyarrow.parquet as pq +import pyarrow as pa +from dynaconf import ( + settings, +) # Assuming settings is a module with PROCESSED_PATH defined + +from cbiohub.study import Study, MUTATION_DATA_FILES + +MUTATION_COLUMNS = { + "Chromosome": pa.string(), + "Start_Position": pa.string(), + "End_Position": pa.string(), + "Reference_Allele": pa.string(), + "Tumor_Seq_Allele1": pa.string(), + "Tumor_Seq_Allele2": pa.string(), + "t_ref_count": pa.string(), + "t_alt_count": pa.string(), + "n_ref_count": pa.string(), + "n_alt_count": pa.string(), + "Hugo_Symbol": pa.string(), + "HGVSp_Short": pa.string(), + "Tumor_Sample_Barcode": pa.string(), + "study_id": pa.string(), +} @click.group() @@ -136,7 +159,6 @@ def combine(output_dir): pbar.update(1) continue - mutation_file = study.processed_path / "data_mutations.parquet" clinical_patient_file = ( study.processed_path / "data_clinical_patient.parquet" ) @@ -144,21 +166,23 @@ def combine(output_dir): study.processed_path / "data_clinical_sample.parquet" ) - if mutation_file.exists(): - table = pq.read_table(mutation_file) - # Select only specific columns and adjust their types - columns_to_include = MUTATION_COLUMNS - - # Filter out columns that do not exist in the table schema - existing_columns = { - col: dtype - for col, dtype in columns_to_include.items() - if col in table.schema.names - } - - table = table.select(list(existing_columns.keys())) - table = table.cast(pa.schema(existing_columns)) - mutation_tables.append(table) + for mutation_file in MUTATION_DATA_FILES: + mutation_file = study.processed_path / mutation_file.replace("txt","parquet") + if mutation_file.exists(): + table = pq.read_table(mutation_file) + # Select only specific columns and adjust their types + columns_to_include = MUTATION_COLUMNS + + # Filter out columns that do not exist in the table schema + existing_columns = { + col: dtype + for col, dtype in columns_to_include.items() + if col in table.schema.names + } + + table = table.select(list(existing_columns.keys())) + table = table.cast(pa.schema(existing_columns)) + mutation_tables.append(table) if clinical_patient_file.exists(): clinical_patient_tables.append(pq.read_table(clinical_patient_file)) if clinical_sample_file.exists(): diff --git a/src/cbiohub/study.py b/src/cbiohub/study.py index 4a9b095..5d18eea 100644 --- a/src/cbiohub/study.py +++ b/src/cbiohub/study.py @@ -5,6 +5,7 @@ settings, ) # Assuming settings is a module with PROCESSED_PATH defined +MUTATION_DATA_FILES = ["data_mutations.txt", "data_mutations_extended.txt", "data_nonsignedout_mutations.txt"] class Study: def __init__(self, study_path: Path): @@ -13,7 +14,12 @@ def __init__(self, study_path: Path): self.processed_path = Path(settings.PROCESSED_PATH) / "studies" / self.name self.sample_data_file = "data_clinical_sample.txt" self.patient_data_file = "data_clinical_patient.txt" - self.mutation_data_file = "data_mutations.txt" + # mutation data can have multiple names + self.mutation_data_files = [ + mut_file for mut_file in MUTATION_DATA_FILES if (self.study_path / mut_file).exists() + ] + # all mutation data files are combined into this file + self.mutation_parquet_file = "data_mutations.parquet" self.sample_df = None self.patient_df = None self.mutation_df = None @@ -28,9 +34,12 @@ def check_integrity(self): required_files = [ self.sample_data_file, self.patient_data_file, - self.mutation_data_file, "meta_study.txt", ] + # Check if at least one mutation data file exists + if len(self.mutation_data_files) == 0: + missing_files.append("at least one mutation data file") + missing_files = [ file_name for file_name in required_files @@ -65,80 +74,74 @@ def list_files(self, file_type: str): return [file_path] - def get_file(self, file_name): - """Get a specific file in the study directory.""" - file_path = self.study_path / file_name - if not file_path.exists(): - raise FileNotFoundError( - f"File {file_name} not found in study {self.study_path}." - ) - return file_path - def create_parquet(self, file_type: str): """Create a Parquet file for the specified file type in the PROCESSED_PATH folder.""" if file_type == "sample": - file_name = self.sample_data_file + file_names = [self.sample_data_file] elif file_type == "patient": - file_name = self.patient_data_file + file_names = [self.patient_data_file] elif file_type == "mutation": - file_name = self.mutation_data_file + file_names = self.mutation_data_files else: raise ValueError(f"Unknown file type: {file_type}") - file_path = self.study_path / file_name - if not file_path.exists(): - raise FileNotFoundError( - f"File {file_name} not found in study {self.study_path}." - ) - - try: - # Read the data file into a DataFrame - df = pd.read_csv( - file_path, sep="\t", comment="#", low_memory=False, dtype=str - ) - # add study_id as a column - df["study_id"] = self.name - - # Define the output directory and file path - output_dir = self.processed_path - output_dir.mkdir(parents=True, exist_ok=True) - output_file = output_dir / file_name.replace(".txt", ".parquet") + for file_name in file_names: + file_path = self.study_path / file_name + if not file_path.exists(): + raise FileNotFoundError( + f"File {file_name} not found in study {self.study_path}." + ) + + try: + # Read the data file into a DataFrame + df = pd.read_csv( + file_path, sep="\t", comment="#", low_memory=False, dtype=str + ) + # add study_id as a column + df["study_id"] = self.name + + # Define the output directory and file path + output_dir = self.processed_path + output_dir.mkdir(parents=True, exist_ok=True) + output_file = output_dir / file_name.replace(".txt", ".parquet") + + # Write the DataFrame to a Parquet file + df.to_parquet(output_file) + except pd.errors.ParserError as e: + print(f"Parse error in study {self.name} for file {file_name}: {e}") + return False - # Write the DataFrame to a Parquet file - df.to_parquet(output_file) - except pd.errors.ParserError as e: - print(f"Parse error in study {self.name} for file {file_name}: {e}") - return False return True def get_parquet(self, file_type: str): """Get the DataFrame for the specified file type, generating the Parquet file if necessary.""" if file_type == "sample": - file_name = self.sample_data_file + file_names = [self.sample_data_file] df_attr = "sample_df" elif file_type == "patient": - file_name = self.patient_data_file + file_names = [self.patient_data_file] df_attr = "patient_df" elif file_type == "mutation": - file_name = self.mutation_data_file + file_names = self.mutation_data_files df_attr = "mutation_df" else: raise ValueError(f"Unknown file type: {file_type}") - file_path = self.study_path / file_name - output_dir = Path(settings.PROCESSED_PATH) / self.name - output_file = output_dir / file_name.replace(".txt", ".parquet") + for file_name in file_names: + file_path = self.study_path / file_name + output_dir = Path(settings.PROCESSED_PATH) / self.name + output_file = output_dir / file_name.replace(".txt", ".parquet") - # Check if the Parquet file needs to be created or updated - if ( - not output_file.exists() - or file_path.stat().st_mtime > output_file.stat().st_mtime - ): - self.create_parquet(file_type) + # Check if the Parquet file needs to be created or updated + if ( + not output_file.exists() + or file_path.stat().st_mtime > output_file.stat().st_mtime + ): + self.create_parquet(file_type) - # Load the DataFrame from the Parquet file if not already loaded - if getattr(self, df_attr) is None: - setattr(self, df_attr, pd.read_parquet(output_file)) + # Load the DataFrame from the Parquet file if not already loaded + if getattr(self, df_attr) is None: + setattr(self, df_attr, pd.read_parquet(output_file)) return getattr(self, df_attr) @@ -177,7 +180,7 @@ def is_processed(self): source_files = [ self.study_path / self.sample_data_file, self.study_path / self.patient_data_file, - self.study_path / self.mutation_data_file, + *[self.study_path / mutation_data_file for mutation_data_file in self.mutation_data_files], self.study_path / "meta_study.txt", ]