Skip to content

Commit

Permalink
handle multiple data_mutation files
Browse files Browse the repository at this point in the history
  • Loading branch information
inodb committed Jan 2, 2025
1 parent 5aa92f3 commit 0abb371
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 182 deletions.
23 changes: 2 additions & 21 deletions src/cbiohub/analyze.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,11 @@
from pathlib import Path
from typing import Union
import pyarrow.parquet as pq
import pandas as pd
import pyarrow.dataset as ds
import duckdb
import pyarrow as pa
import pandas as pd
from dynaconf import settings

from cbiohub.variant import GenomicVariant, ProteinVariant

MUTATION_COLUMNS = {
"Chromosome": pa.string(),
"Start_Position": pa.string(),
"End_Position": pa.string(),
"Reference_Allele": pa.string(),
"Tumor_Seq_Allele1": pa.string(),
"Tumor_Seq_Allele2": pa.string(),
"t_ref_count": pa.string(),
"t_alt_count": pa.string(),
"n_ref_count": pa.string(),
"n_alt_count": pa.string(),
"Hugo_Symbol": pa.string(),
"HGVSp_Short": pa.string(),
"Tumor_Sample_Barcode": pa.string(),
"study_id": pa.string(),
}
from cbiohub.data_commands import MUTATION_COLUMNS


def get_combined_df(directory=None):
Expand Down
90 changes: 0 additions & 90 deletions src/cbiohub/data_access.py

This file was deleted.

60 changes: 42 additions & 18 deletions src/cbiohub/data_commands.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
import time
import click
from tqdm import tqdm
from pathlib import Path

from cbiohub.study import Study
import pyarrow.parquet as pq
import pyarrow as pa
from dynaconf import (
settings,
) # Assuming settings is a module with PROCESSED_PATH defined

from cbiohub.study import Study, MUTATION_DATA_FILES

MUTATION_COLUMNS = {
"Chromosome": pa.string(),
"Start_Position": pa.string(),
"End_Position": pa.string(),
"Reference_Allele": pa.string(),
"Tumor_Seq_Allele1": pa.string(),
"Tumor_Seq_Allele2": pa.string(),
"t_ref_count": pa.string(),
"t_alt_count": pa.string(),
"n_ref_count": pa.string(),
"n_alt_count": pa.string(),
"Hugo_Symbol": pa.string(),
"HGVSp_Short": pa.string(),
"Tumor_Sample_Barcode": pa.string(),
"study_id": pa.string(),
}


@click.group()
Expand Down Expand Up @@ -136,29 +159,30 @@ def combine(output_dir):
pbar.update(1)
continue

mutation_file = study.processed_path / "data_mutations.parquet"
clinical_patient_file = (
study.processed_path / "data_clinical_patient.parquet"
)
clinical_sample_file = (
study.processed_path / "data_clinical_sample.parquet"
)

if mutation_file.exists():
table = pq.read_table(mutation_file)
# Select only specific columns and adjust their types
columns_to_include = MUTATION_COLUMNS

# Filter out columns that do not exist in the table schema
existing_columns = {
col: dtype
for col, dtype in columns_to_include.items()
if col in table.schema.names
}

table = table.select(list(existing_columns.keys()))
table = table.cast(pa.schema(existing_columns))
mutation_tables.append(table)
for mutation_file in MUTATION_DATA_FILES:
mutation_file = study.processed_path / mutation_file.replace("txt","parquet")
if mutation_file.exists():
table = pq.read_table(mutation_file)
# Select only specific columns and adjust their types
columns_to_include = MUTATION_COLUMNS

# Filter out columns that do not exist in the table schema
existing_columns = {
col: dtype
for col, dtype in columns_to_include.items()
if col in table.schema.names
}

table = table.select(list(existing_columns.keys()))
table = table.cast(pa.schema(existing_columns))
mutation_tables.append(table)
if clinical_patient_file.exists():
clinical_patient_tables.append(pq.read_table(clinical_patient_file))
if clinical_sample_file.exists():
Expand Down
109 changes: 56 additions & 53 deletions src/cbiohub/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
settings,
) # Assuming settings is a module with PROCESSED_PATH defined

MUTATION_DATA_FILES = ["data_mutations.txt", "data_mutations_extended.txt", "data_nonsignedout_mutations.txt"]

class Study:
def __init__(self, study_path: Path):
Expand All @@ -13,7 +14,12 @@ def __init__(self, study_path: Path):
self.processed_path = Path(settings.PROCESSED_PATH) / "studies" / self.name
self.sample_data_file = "data_clinical_sample.txt"
self.patient_data_file = "data_clinical_patient.txt"
self.mutation_data_file = "data_mutations.txt"
# mutation data can have multiple names
self.mutation_data_files = [
mut_file for mut_file in MUTATION_DATA_FILES if (self.study_path / mut_file).exists()
]
# all mutation data files are combined into this file
self.mutation_parquet_file = "data_mutations.parquet"
self.sample_df = None
self.patient_df = None
self.mutation_df = None
Expand All @@ -28,9 +34,12 @@ def check_integrity(self):
required_files = [
self.sample_data_file,
self.patient_data_file,
self.mutation_data_file,
"meta_study.txt",
]
# Check if at least one mutation data file exists
if len(self.mutation_data_files) == 0:
missing_files.append("at least one mutation data file")

missing_files = [
file_name
for file_name in required_files
Expand Down Expand Up @@ -65,80 +74,74 @@ def list_files(self, file_type: str):

return [file_path]

def get_file(self, file_name):
"""Get a specific file in the study directory."""
file_path = self.study_path / file_name
if not file_path.exists():
raise FileNotFoundError(
f"File {file_name} not found in study {self.study_path}."
)
return file_path

def create_parquet(self, file_type: str):
"""Create a Parquet file for the specified file type in the PROCESSED_PATH folder."""
if file_type == "sample":
file_name = self.sample_data_file
file_names = [self.sample_data_file]
elif file_type == "patient":
file_name = self.patient_data_file
file_names = [self.patient_data_file]
elif file_type == "mutation":
file_name = self.mutation_data_file
file_names = self.mutation_data_files
else:
raise ValueError(f"Unknown file type: {file_type}")

file_path = self.study_path / file_name
if not file_path.exists():
raise FileNotFoundError(
f"File {file_name} not found in study {self.study_path}."
)

try:
# Read the data file into a DataFrame
df = pd.read_csv(
file_path, sep="\t", comment="#", low_memory=False, dtype=str
)
# add study_id as a column
df["study_id"] = self.name

# Define the output directory and file path
output_dir = self.processed_path
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / file_name.replace(".txt", ".parquet")
for file_name in file_names:
file_path = self.study_path / file_name
if not file_path.exists():
raise FileNotFoundError(
f"File {file_name} not found in study {self.study_path}."
)

try:
# Read the data file into a DataFrame
df = pd.read_csv(
file_path, sep="\t", comment="#", low_memory=False, dtype=str
)
# add study_id as a column
df["study_id"] = self.name

# Define the output directory and file path
output_dir = self.processed_path
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / file_name.replace(".txt", ".parquet")

# Write the DataFrame to a Parquet file
df.to_parquet(output_file)
except pd.errors.ParserError as e:
print(f"Parse error in study {self.name} for file {file_name}: {e}")
return False

# Write the DataFrame to a Parquet file
df.to_parquet(output_file)
except pd.errors.ParserError as e:
print(f"Parse error in study {self.name} for file {file_name}: {e}")
return False
return True

def get_parquet(self, file_type: str):
"""Get the DataFrame for the specified file type, generating the Parquet file if necessary."""
if file_type == "sample":
file_name = self.sample_data_file
file_names = [self.sample_data_file]
df_attr = "sample_df"
elif file_type == "patient":
file_name = self.patient_data_file
file_names = [self.patient_data_file]
df_attr = "patient_df"
elif file_type == "mutation":
file_name = self.mutation_data_file
file_names = self.mutation_data_files
df_attr = "mutation_df"
else:
raise ValueError(f"Unknown file type: {file_type}")

file_path = self.study_path / file_name
output_dir = Path(settings.PROCESSED_PATH) / self.name
output_file = output_dir / file_name.replace(".txt", ".parquet")
for file_name in file_names:
file_path = self.study_path / file_name
output_dir = Path(settings.PROCESSED_PATH) / self.name
output_file = output_dir / file_name.replace(".txt", ".parquet")

# Check if the Parquet file needs to be created or updated
if (
not output_file.exists()
or file_path.stat().st_mtime > output_file.stat().st_mtime
):
self.create_parquet(file_type)
# Check if the Parquet file needs to be created or updated
if (
not output_file.exists()
or file_path.stat().st_mtime > output_file.stat().st_mtime
):
self.create_parquet(file_type)

# Load the DataFrame from the Parquet file if not already loaded
if getattr(self, df_attr) is None:
setattr(self, df_attr, pd.read_parquet(output_file))
# Load the DataFrame from the Parquet file if not already loaded
if getattr(self, df_attr) is None:
setattr(self, df_attr, pd.read_parquet(output_file))

return getattr(self, df_attr)

Expand Down Expand Up @@ -177,7 +180,7 @@ def is_processed(self):
source_files = [
self.study_path / self.sample_data_file,
self.study_path / self.patient_data_file,
self.study_path / self.mutation_data_file,
*[self.study_path / mutation_data_file for mutation_data_file in self.mutation_data_files],
self.study_path / "meta_study.txt",
]

Expand Down

0 comments on commit 0abb371

Please sign in to comment.