Skip to content

Commit

Permalink
CNAM-154 Added filter to remove patients without outcomes
Browse files Browse the repository at this point in the history
CNAM-154 Added comments to config file
  • Loading branch information
danielpes committed Dec 6, 2016
1 parent 8521f6b commit bccbaa7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 22 deletions.
29 changes: 15 additions & 14 deletions src/main/resources/config/filtering-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,23 @@ default = {

}
mlpp_parameters = {
bucket_size = [30] # in days
lag_count = [10]
min_timestamp = ${default.dates.study_start}
max_timestamp = ${default.dates.study_end}
include_death_bucket = false
bucket_size = [30] # Number of days of each bucket of time
lag_count = [10] # Number of lags to be created
min_timestamp = ${default.dates.study_start} # Min timestamp to be considered by the time buckets
max_timestamp = ${default.dates.study_end} # Max timestamp to be considered by the time buckets
include_death_bucket = false # If false, the row corresponding to the death bucket is filled with 0s

exposures = {
min_purchases = 1
start_delay = 0
purchases_window = 0
only_first = false
filter_lost_patients = false
filter_diagnosed_patients = true
diagnosed_patients_threshold = 0
filter_delayed_entries = true
delayed_entry_threshold = 12
min_purchases = 1 # Number of drug purchases within <purchases_window> to form an exposure
start_delay = 0 # Months after the drug purchases for the exposure to start
purchases_window = 0 # Period in months to look for multiple drug purchases
only_first = false # If true, only the first exposure is kept for each <patient, molecule> pair
filter_never_sick_patients = false # if true, patients who never got a target disease are removed
filter_lost_patients = false # if true, patients with a trackloss are removed
filter_diagnosed_patients = true # if true, patients with an early diagnostic are removed
diagnosed_patients_threshold = 0 # number of months after study start for a diagnostic to be considered "early"
filter_delayed_entries = true # if true, patients who entered the study (i.e. first drug purchase) after a delay are removed
delayed_entry_threshold = 12 # delay in months for the delayed entries filter
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ object MLPPConfig {
startDelay: Int,
purchasesWindow: Int,
onlyFirst: Boolean,
filterNeverSickPatients: Boolean,
filterLostPatients: Boolean,
filterDiagnosedPatients: Boolean,
filterEarlyDiagnosedPatients: Boolean,
diagnosedPatientsThreshold: Int,
filterDelayedEntries: Boolean,
delayedEntryThreshold: Int
Expand All @@ -33,8 +34,9 @@ object MLPPConfig {
startDelay = conf.getInt("exposures.start_delay"),
purchasesWindow = conf.getInt("exposures.purchases_window"),
onlyFirst = conf.getBoolean("exposures.only_first"),
filterNeverSickPatients = conf.getBoolean("exposures.filter_never_sick_patients"),
filterLostPatients = conf.getBoolean("exposures.filter_lost_patients"),
filterDiagnosedPatients = conf.getBoolean("exposures.filter_diagnosed_patients"),
filterEarlyDiagnosedPatients = conf.getBoolean("exposures.filter_diagnosed_patients"),
diagnosedPatientsThreshold = conf.getInt("exposures.diagnosed_patients_threshold"),
filterDelayedEntries = conf.getBoolean("exposures.filter_delayed_entries"),
delayedEntryThreshold = conf.getInt("exposures.delayed_entry_threshold")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ object MLPPExposuresTransformer extends ExposuresTransformer {
private def filterLostPatients = MLPPConfig.exposureDefinition.filterLostPatients
private def filterDelayedEntries = MLPPConfig.exposureDefinition.filterDelayedEntries
private def delayedEntryThreshold = MLPPConfig.exposureDefinition.delayedEntryThreshold
private def filterDiagnosedPatients = MLPPConfig.exposureDefinition.filterDiagnosedPatients
private def filterEarlyDiagnosedPatients = MLPPConfig.exposureDefinition.filterEarlyDiagnosedPatients
private def diagnosedPatientsThreshold = MLPPConfig.exposureDefinition.diagnosedPatientsThreshold
private def filterNeverSickPatients = MLPPConfig.exposureDefinition.filterNeverSickPatients

val outputColumns = List(
col("patientID"),
Expand All @@ -33,9 +34,9 @@ object MLPPExposuresTransformer extends ExposuresTransformer {
implicit class ExposuresDataFrame(data: DataFrame) {

/**
* Drops patients whose got a target disease before periodStart + delay (default = 0)
* Drops patients who got a target disease before periodStart + delay (default = 0)
*/
def filterDiagnosedPatients(doFilter: Boolean): DataFrame = {
def filterEarlyDiagnosedPatients(doFilter: Boolean): DataFrame = {

if (doFilter) {
val window = Window.partitionBy("patientID")
Expand Down Expand Up @@ -104,6 +105,29 @@ object MLPPExposuresTransformer extends ExposuresTransformer {
}
}

/**
* Drops patients who never had a target disease event
*/
def filterNeverSickPatients(doFilter: Boolean): DataFrame = {

if (doFilter) {
val window = Window.partitionBy("patientID")

val filterColumn: Column = max(
when(
(col("category") === "disease") &&
(col("eventId") === "targetDisease") &&
(col("start") <= MLPPConfig.maxTimestamp), lit(1)
).otherwise(lit(0))
).over(window).cast(BooleanType)

data.withColumn("filter", filterColumn).where(col("filter")).drop("filter")
}
else {
data
}
}

def withExposureStart(minPurchases: Int = 1, intervalSize: Int = 6,
startDelay: Int = 0, firstOnly: Boolean = false): DataFrame = {

Expand Down Expand Up @@ -142,8 +166,9 @@ object MLPPExposuresTransformer extends ExposuresTransformer {

input.toDF
.filterDelayedEntries(filterDelayedEntries)
.filterDiagnosedPatients(filterDiagnosedPatients)
.filterEarlyDiagnosedPatients(filterEarlyDiagnosedPatients)
.filterLostPatients(filterLostPatients)
.filterNeverSickPatients(filterNeverSickPatients)
.where(col("category") === "molecule")
.withExposureStart(
minPurchases = minPurchases,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class MLPPExposuresTransformerSuite extends SharedContext {

// When
import MLPPExposuresTransformer.ExposuresDataFrame
val result = input.filterDiagnosedPatients(true).select("patientID", "category")
val result = input.filterEarlyDiagnosedPatients(true).select("patientID", "category")

// Then
import RichDataFrames._
Expand All @@ -100,7 +100,7 @@ class MLPPExposuresTransformerSuite extends SharedContext {

// When
import MLPPExposuresTransformer.ExposuresDataFrame
val result = input.filterDiagnosedPatients(false)
val result = input.filterEarlyDiagnosedPatients(false)

// Then
import RichDataFrames._
Expand Down Expand Up @@ -135,6 +135,37 @@ class MLPPExposuresTransformerSuite extends SharedContext {
assert(result === expected)
}

"filterNeverSickPatients" should "remove patients who never have a target disease" in {
val sqlCtx = sqlContext
import sqlCtx.implicits._

// Given
val input = Seq(
("Patient_A", "molecule", "", makeTS(2006, 1, 1)),
("Patient_A", "molecule", "", makeTS(2006, 3, 1)),
("Patient_A", "disease", "targetDisease", makeTS(2006, 6, 1)),
("Patient_B", "molecule", "", makeTS(2006, 5, 1)),
("Patient_B", "molecule", "", makeTS(2007, 1, 1)),
("Patient_C", "molecule", "", makeTS(2006, 11, 1))
).toDF("patientID", "category", "eventId", "start")

val expected = Seq(
("Patient_A", "molecule", "", makeTS(2006, 1, 1)),
("Patient_A", "molecule", "", makeTS(2006, 3, 1)),
("Patient_A", "disease", "targetDisease", makeTS(2006, 6, 1))
).toDF("patientID", "category", "eventId", "start")

// When
import MLPPExposuresTransformer.ExposuresDataFrame
val result = input.filterNeverSickPatients(true)

// Then
import RichDataFrames._
result.show
expected.show
assert(result === expected)
}

"withExposureStart" should "add a column with the start of the default MLPP exposure definition" in {
val sqlCtx = sqlContext
import sqlCtx.implicits._
Expand Down

0 comments on commit bccbaa7

Please sign in to comment.