diff --git a/.gitignore b/.gitignore index f4c85d797..b62418654 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ site/ .env .coverage* wandb/ +hail*.log diff --git a/pyproject.toml b/pyproject.toml index f61d82116..47829ddcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,7 +126,7 @@ exclude = ["dist"] addopts = "-n auto --doctest-modules --cov=src/ --cov-report=xml" pythonpath = ["."] testpaths = ["tests/gentropy", "src/gentropy"] -marks = ["step_test"] +markers = ["step_test"] # Semi-strict mode for mypy [tool.mypy] diff --git a/src/gentropy/common/spark_helpers.py b/src/gentropy/common/spark_helpers.py index fb13763b0..4e40ac4f1 100644 --- a/src/gentropy/common/spark_helpers.py +++ b/src/gentropy/common/spark_helpers.py @@ -818,3 +818,32 @@ def get_nested_struct_schema(dtype: t.DataType) -> t.StructType: return get_nested_struct_schema(dtype) case _: raise TypeError("The input data type must be a nested struct.") + + +def get_struct_field_schema(schema: t.StructType, name: str) -> t.DataType: + """Get schema for underlying struct field. + + Args: + schema (t.StructType): Provided schema where the name should be looked in. + name (str): Name of the field to look in the schema + + Returns: + t.DataType: Data type of the StructField with provided name + + Raises: + ValueError: If provided name is not present in the input schema + + Examples: + >>> get_struct_field_schema(t.StructType([t.StructField("a", t.StringType())]), "a") + StringType() + + >>> get_struct_field_schema(t.StructType([t.StructField("a", t.StringType())]), "b") # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError: Provided name b is not present in the schema + + """ + matching_fields = [f for f in schema.fields if f.name == name] + if not matching_fields: + raise ValueError("Provided name %s is not present in the schema.", name) + return matching_fields[0].dataType diff --git a/src/gentropy/dataset/study_locus.py b/src/gentropy/dataset/study_locus.py index bf9998458..4d8136c8e 100644 --- a/src/gentropy/dataset/study_locus.py +++ b/src/gentropy/dataset/study_locus.py @@ -14,6 +14,8 @@ from gentropy.common.schemas import parse_spark_schema from gentropy.common.spark_helpers import ( calculate_neglog_pvalue, + create_empty_column_if_not_exists, + get_struct_field_schema, order_array_of_structs_by_field, ) from gentropy.common.utils import get_logsum @@ -271,10 +273,19 @@ def validate_lead_pvalue(self: StudyLocus, pvalue_cutoff: float) -> StudyLocus: Returns: StudyLocus: Updated study locus with quality control flags. """ + df = self.df + qc_colname = StudyLocus.get_QC_column_name() + if qc_colname not in self.df.columns: + df = self.df.withColumn( + qc_colname, + create_empty_column_if_not_exists( + qc_colname, get_struct_field_schema(StudyLocus.get_schema(), qc_colname) + ), + ) return StudyLocus( _df=( - self.df.withColumn( - "qualityControls", + df.withColumn( + qc_colname, # Because this QC might already run on the dataset, the unique set of flags is generated: f.array_distinct( self._qc_subsignificant_associations( diff --git a/tests/gentropy/datasource/finngen/test_finngen_finemapping.py b/tests/gentropy/datasource/finngen/test_finngen_finemapping.py index ed0b68643..1e5d486b7 100644 --- a/tests/gentropy/datasource/finngen/test_finngen_finemapping.py +++ b/tests/gentropy/datasource/finngen/test_finngen_finemapping.py @@ -2,12 +2,16 @@ from __future__ import annotations +from pathlib import Path + import hail as hl import pytest from pyspark.sql import SparkSession +from gentropy.common.session import Session from gentropy.dataset.study_locus import StudyLocus from gentropy.datasource.finngen.finemapping import FinnGenFinemapping +from gentropy.finngen_finemapping_ingestion import FinnGenFinemappingIngestionStep @pytest.mark.parametrize( @@ -43,3 +47,39 @@ def test_finngen_finemapping_from_finngen_susie_finemapping( ), StudyLocus, ) + + +@pytest.mark.parametrize( + [ + "finngen_susie_finemapping_snp_files", + "finngen_susie_finemapping_cs_summary_files", + ], + [ + pytest.param( + "tests/gentropy/data_samples/finngen_R9_AB1_EBV.SUSIE.snp.gz", + "tests/gentropy/data_samples/finngen_credset_summary_sample.tsv", + id="non block compressed files", + ), + ], +) +@pytest.mark.step_test +def test_finngen_finemapping_ingestion_step( + session: Session, + finngen_susie_finemapping_snp_files: str, + finngen_susie_finemapping_cs_summary_files: str, + tmp_path: Path, +) -> None: + """Test finngen finemapping ingestion step.""" + output_path = tmp_path / "output" + FinnGenFinemappingIngestionStep( + session=session, + finngen_finemapping_out=str(output_path), + finngen_susie_finemapping_cs_summary_files=finngen_susie_finemapping_cs_summary_files, + finngen_susie_finemapping_snp_files=finngen_susie_finemapping_snp_files, + finngen_finemapping_lead_pvalue_threshold=1e-5, + ) + assert output_path.is_dir() + assert (output_path / "_SUCCESS").exists() + + cs = StudyLocus.from_parquet(session=session, path=str(output_path)) + assert cs.df.count() == 1