Skip to content

Commit

Permalink
feat: ingest FinnGen UKB meta-analysis data (opentargets#756)
Browse files Browse the repository at this point in the history
* feat: implement FinnGen UKB meta-analysis ingestion and harmonisation

* chore: remove ot_finngen_ukb_meta.yaml

* chore: remove raw_study_index_path to raw_study_index_path_from_tsv

* fix: use session.write_mode

* style: rename class to FinngenUkbMetaIngestionStep
  • Loading branch information
tskir authored Sep 11, 2024
1 parent d10cc20 commit a49ae9a
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 30 deletions.
2 changes: 1 addition & 1 deletion config/step/ot_ukb_ppp_eur_sumstat_preprocess.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defaults:
- ukb_ppp_eur_sumstat_preprocess

raw_study_index_path: ???
raw_study_index_path_from_tsv: ???
raw_summary_stats_path: ???
variant_annotation_path: ???
tmp_variant_annotation_path: ???
Expand Down
2 changes: 1 addition & 1 deletion src/airflow/dags/ukb_ppp_eur.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
cluster_name=CLUSTER_NAME,
step_id="ot_ukb_ppp_eur_sumstat_preprocess",
other_args=[
f"step.raw_study_index_path={UKB_PPP_EUR_STUDY_INDEX}",
f"step.raw_study_index_path_from_tsv={UKB_PPP_EUR_STUDY_INDEX}",
f"step.raw_summary_stats_path={UKB_PPP_EUR_SUMMARY_STATS}",
f"step.variant_annotation_path={VARIANT_ANNOTATION}",
f"step.tmp_variant_annotation_path={TMP_VARIANT_ANNOTATION}",
Expand Down
38 changes: 20 additions & 18 deletions src/gentropy/common/harmonise.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def harmonise_summary_stats(
colname_position: str,
colname_allele0: str,
colname_allele1: str,
colname_a1freq: str,
colname_info: str,
colname_a1freq: str | None,
colname_info: str | None,
colname_beta: str,
colname_se: str,
colname_mlog10p: str,
colname_n: str,
colname_n: str | None,
) -> DataFrame:
"""Ingest and harmonise the summary stats.
Expand All @@ -40,12 +40,12 @@ def harmonise_summary_stats(
colname_position (str): Column name for position.
colname_allele0 (str): Column name for allele0.
colname_allele1 (str): Column name for allele1.
colname_a1freq (str): Column name for allele1 frequency (optional).
colname_info (str): Column name for INFO, reflecting variant quality (optional).
colname_a1freq (str | None): Column name for allele1 frequency (optional).
colname_info (str | None): Column name for INFO, reflecting variant quality (optional).
colname_beta (str): Column name for beta.
colname_se (str): Column name for beta standard error.
colname_mlog10p (str): Column name for -log10(p).
colname_n (str): Column name for the number of samples.
colname_n (str | None): Column name for the number of samples (optional).
Returns:
DataFrame: A harmonised summary stats dataframe.
Expand Down Expand Up @@ -159,20 +159,22 @@ def harmonise_summary_stats(
)

# Prepare the fields according to schema.
select_expr = [
f.col("studyId"),
f.col("chromosome"),
f.col("variantId"),
f.col("beta"),
f.col(colname_position).cast(t.IntegerType()).alias("position"),
# Parse p-value into mantissa and exponent.
*neglog_pvalue_to_mantissa_and_exponent(f.col(colname_mlog10p).cast(t.DoubleType())),
# Add standard error and sample size information.
f.col(colname_se).cast("double").alias("standardError"),
]
if colname_n:
select_expr.append(f.col(colname_n).cast("integer").alias("sampleSize"))
df = (
df
.select(
f.col("studyId"),
f.col("chromosome"),
f.col("variantId"),
f.col("beta"),
f.col(colname_position).cast(t.IntegerType()).alias("position"),
# Parse p-value into mantissa and exponent.
*neglog_pvalue_to_mantissa_and_exponent(f.col(colname_mlog10p).cast(t.DoubleType())),
# Add standard error and sample size information.
f.col(colname_se).cast("double").alias("standardError"),
f.col(colname_n).cast("integer").alias("sampleSize"),
)
.select(*select_expr)
# Drop rows which don't have proper position or beta value.
.filter(
f.col("position").cast(t.IntegerType()).isNotNull()
Expand Down
16 changes: 14 additions & 2 deletions src/gentropy/common/per_chromosome.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import pyspark.sql.functions as f
from pyspark.sql import SparkSession

from gentropy.datasource.finngen_ukb_meta.summary_stats import (
FinngenUkbMetaSummaryStats,
)
from gentropy.datasource.ukb_ppp_eur.summary_stats import UkbPppEurSummaryStats


Expand Down Expand Up @@ -63,15 +66,23 @@ def prepare_va(session: SparkSession, variant_annotation_path: str, tmp_variant_
)


def process_summary_stats_per_chromosome(session: SparkSession, ingestion_class: type[UkbPppEurSummaryStats], raw_summary_stats_path: str, tmp_variant_annotation_path: str, summary_stats_output_path: str) -> None:
def process_summary_stats_per_chromosome(
session: SparkSession,
ingestion_class: type[UkbPppEurSummaryStats] | type[FinngenUkbMetaSummaryStats],
raw_summary_stats_path: str,
tmp_variant_annotation_path: str,
summary_stats_output_path: str,
study_index_path: str,
) -> None:
"""Processes summary statistics for each chromosome, partitioning and writing results.
Args:
session (SparkSession): The Spark session to use for distributed data processing.
ingestion_class (type[UkbPppEurSummaryStats]): The class used to handle ingestion of source data. Must have a `from_source` method returning a DataFrame.
ingestion_class (type[UkbPppEurSummaryStats] | type[FinngenUkbMetaSummaryStats]): The class used to handle ingestion of source data. Must have a `from_source` method returning a DataFrame.
raw_summary_stats_path (str): The path to the raw summary statistics files.
tmp_variant_annotation_path (str): The path to temporary variant annotation data, used for chromosome joins.
summary_stats_output_path (str): The output path to write processed summary statistics as parquet files.
study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.
"""
# Set mode to overwrite for processing the first chromosome.
write_mode = "overwrite"
Expand All @@ -85,6 +96,7 @@ def process_summary_stats_per_chromosome(session: SparkSession, ingestion_class:
raw_summary_stats_path=raw_summary_stats_path,
tmp_variant_annotation_path=tmp_variant_annotation_path,
chromosome=str(chromosome),
study_index_path=study_index_path,
)
.df
.coalesce(1)
Expand Down
15 changes: 14 additions & 1 deletion src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class PICSConfig(StepConfig):
class UkbPppEurConfig(StepConfig):
"""UKB PPP (EUR) ingestion step configuration."""

raw_study_index_path: str = MISSING
raw_study_index_path_from_tsv: str = MISSING
raw_summary_stats_path: str = MISSING
tmp_variant_annotation_path: str = MISSING
variant_annotation_path: str = MISSING
Expand All @@ -308,6 +308,19 @@ class UkbPppEurConfig(StepConfig):
_target_: str = "gentropy.ukb_ppp_eur_sumstat_preprocess.UkbPppEurStep"


@dataclass
class FinngenUkbMetaConfig(StepConfig):
"""FinnGen UKB meta-analysis ingestion step configuration."""

raw_study_index_path_from_tsv: str = MISSING
raw_summary_stats_path: str = MISSING
tmp_variant_annotation_path: str = MISSING
variant_annotation_path: str = MISSING
study_index_output_path: str = MISSING
summary_stats_output_path: str = MISSING
_target_: str = "gentropy.finngen_ukb_meta.FinngenUkbMetaIngestionStep"


@dataclass
class GnomadVariantConfig(StepConfig):
"""Gnomad variant ingestion step configuration."""
Expand Down
3 changes: 3 additions & 0 deletions src/gentropy/datasource/finngen_ukb_meta/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""FinnGen UKB meta-analysis data source."""

from __future__ import annotations
62 changes: 62 additions & 0 deletions src/gentropy/datasource/finngen_ukb_meta/study_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Study Index for Finngen data source."""
from __future__ import annotations

import pyspark.sql.functions as f
from pyspark.sql import SparkSession

from gentropy.dataset.study_index import StudyIndex


class FinngenUkbMetaStudyIndex(StudyIndex):
"""Study index dataset from FinnGen UKB meta-analysis."""

@classmethod
def from_source(
cls: type[FinngenUkbMetaStudyIndex],
spark: SparkSession,
raw_study_index_path_from_tsv: str,
) -> StudyIndex:
"""This function ingests study level metadata from FinnGen UKB meta-analysis.
Args:
spark (SparkSession): Spark session object.
raw_study_index_path_from_tsv (str): Raw study index path.
Returns:
StudyIndex: Parsed and annotated FinnGen UKB meta-analysis study table.
"""
# Read the raw study index and process.
study_index_df = (
spark.read.csv(raw_study_index_path_from_tsv, sep="\t", header=True)
.select(
f.lit("gwas").alias("studyType"),
f.lit("FINNGEN_R11_UKB_META").alias("projectId"),
f.col("_gentropy_study_id").alias("studyId"),
f.col("name").alias("traitFromSource"),
f.lit(True).alias("hasSumstats"),
f.col("_gentropy_summary_stats_link").alias("summarystatsLocation"),
(f.col("fg_n_cases") + f.col("ukbb_n_cases") + f.col("fg_n_controls") + f.col("ukbb_n_controls")).alias("nSamples")
)
)
# Add population structure.
study_index_df = (
study_index_df
.withColumn(
"discoverySamples",
f.array(
f.struct(
f.col("nSamples").cast("integer").alias("sampleSize"),
f.lit("European").alias("ancestry"),
)
)
)
.withColumn(
"ldPopulationStructure",
cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
)
)

return StudyIndex(
_df=study_index_df,
_schema=StudyIndex.get_schema(),
)
63 changes: 63 additions & 0 deletions src/gentropy/datasource/finngen_ukb_meta/summary_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Summary statistics ingestion for FinnGen UKB meta-analysis."""

from __future__ import annotations

from dataclasses import dataclass

from pyspark.sql import SparkSession

from gentropy.common.harmonise import harmonise_summary_stats
from gentropy.dataset.summary_statistics import SummaryStatistics


@dataclass
class FinngenUkbMetaSummaryStats:
"""Summary statistics dataset for FinnGen UKB meta-analysis."""

@classmethod
def from_source(
cls: type[FinngenUkbMetaSummaryStats],
spark: SparkSession,
raw_summary_stats_path: str,
tmp_variant_annotation_path: str,
chromosome: str,
study_index_path: str,
) -> SummaryStatistics:
"""Ingest and harmonise all summary stats for FinnGen UKB meta-analysis data.
Args:
spark (SparkSession): Spark session object.
raw_summary_stats_path (str): Input raw summary stats path.
tmp_variant_annotation_path (str): Input variant annotation dataset path.
chromosome (str): Which chromosome to process.
study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.
Returns:
SummaryStatistics: Processed summary statistics dataset for a given chromosome.
"""
# Run the harmonisation steps.
df = harmonise_summary_stats(
spark,
raw_summary_stats_path,
tmp_variant_annotation_path,
chromosome,
colname_position="POS",
colname_allele0="REF",
colname_allele1="ALT",
colname_a1freq=None,
colname_info=None,
colname_beta="all_inv_var_meta_beta",
colname_se="all_inv_var_meta_sebeta",
colname_mlog10p="all_inv_var_meta_mlogp",
colname_n=None,
)

# Populate the sample size column from the study index.
study_index = spark.read.parquet(study_index_path).select("studyId", "nSamples")
df = df.join(study_index, on=["studyId"], how="inner")

# Create the summary statistics object.
return SummaryStatistics(
_df=df,
_schema=SummaryStatistics.get_schema(),
)
6 changes: 3 additions & 3 deletions src/gentropy/datasource/ukb_ppp_eur/study_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ class UkbPppEurStudyIndex(StudyIndex):
def from_source(
cls: type[UkbPppEurStudyIndex],
spark: SparkSession,
raw_study_index_path: str,
raw_study_index_path_from_tsv: str,
raw_summary_stats_path: str,
) -> StudyIndex:
"""This function ingests study level metadata from UKB PPP (EUR).
Args:
spark (SparkSession): Spark session object.
raw_study_index_path (str): Raw study index path.
raw_study_index_path_from_tsv (str): Raw study index path.
raw_summary_stats_path (str): Raw summary stats path.
Returns:
Expand All @@ -39,7 +39,7 @@ def from_source(
)
# Now we can read the raw study index and complete the processing.
study_index_df = (
spark.read.csv(raw_study_index_path, sep="\t", header=True)
spark.read.csv(raw_study_index_path_from_tsv, sep="\t", header=True)
.select(
f.lit("pqtl").alias("studyType"),
f.lit("UKB_PPP_EUR").alias("projectId"),
Expand Down
2 changes: 2 additions & 0 deletions src/gentropy/datasource/ukb_ppp_eur/summary_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def from_source(
raw_summary_stats_path: str,
tmp_variant_annotation_path: str,
chromosome: str,
study_index_path: str,
) -> SummaryStatistics:
"""Ingest and harmonise all summary stats for UKB PPP (EUR) data.
Expand All @@ -29,6 +30,7 @@ def from_source(
raw_summary_stats_path (str): Input raw summary stats path.
tmp_variant_annotation_path (str): Input variant annotation dataset path.
chromosome (str): Which chromosome to process.
study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.
Returns:
SummaryStatistics: Processed summary statistics dataset for a given chromosome.
Expand Down
49 changes: 49 additions & 0 deletions src/gentropy/finngen_ukb_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Step to run FinnGen UKB meta-analysis data ingestion."""

from __future__ import annotations

from gentropy.common.per_chromosome import (
prepare_va,
process_summary_stats_per_chromosome,
)
from gentropy.common.session import Session
from gentropy.datasource.finngen_ukb_meta.study_index import FinngenUkbMetaStudyIndex
from gentropy.datasource.finngen_ukb_meta.summary_stats import (
FinngenUkbMetaSummaryStats,
)


class FinngenUkbMetaIngestionStep:
"""FinnGen UKB meta-analysis data ingestion and harmonisation."""

def __init__(
self, session: Session, raw_study_index_path_from_tsv: str, raw_summary_stats_path: str, variant_annotation_path: str, tmp_variant_annotation_path: str, study_index_output_path: str, summary_stats_output_path: str
) -> None:
"""Data ingestion and harmonisation step for FinnGen UKB meta-analysis.
Args:
session (Session): Session object.
raw_study_index_path_from_tsv (str): Input raw study index path.
raw_summary_stats_path (str): Input raw summary stats path.
variant_annotation_path (str): Input variant annotation dataset path.
tmp_variant_annotation_path (str): Temporary output path for variant annotation dataset.
study_index_output_path (str): Study index output path.
summary_stats_output_path (str): Summary stats output path.
"""
session.logger.info("Pre-compute the direct and flipped variant annotation dataset.")
prepare_va(session, variant_annotation_path, tmp_variant_annotation_path)

session.logger.info("Process study index.")
(
FinngenUkbMetaStudyIndex.from_source(
spark=session.spark,
raw_study_index_path_from_tsv=raw_study_index_path_from_tsv,
)
.df
.write
.mode(session.write_mode)
.parquet(study_index_output_path)
)

session.logger.info("Process and harmonise summary stats.")
process_summary_stats_per_chromosome(session, FinngenUkbMetaSummaryStats, raw_summary_stats_path, tmp_variant_annotation_path, summary_stats_output_path, study_index_output_path)
Loading

0 comments on commit a49ae9a

Please sign in to comment.