Skip to content

Commit

Permalink
Merge pull request #226 from X-DataInitiative/CNAM-433-LimitedPeriodA…
Browse files Browse the repository at this point in the history
…dder-Strategies

Cnam 433 limited period adder strategies
  • Loading branch information
Youcef Sebiat authored Jan 17, 2020
2 parents 61d4170 + ba682af commit 6e2fa10
Show file tree
Hide file tree
Showing 17 changed files with 405 additions and 115 deletions.
2 changes: 1 addition & 1 deletion src/main/resources/config/fall/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ root {
end_delay = 15 days
end_threshold_gc = 90 days
end_threshold_ngc = 30 days
to_exposure_strategy = "purchase_count_based"
}
}
interaction {
Expand All @@ -29,7 +30,6 @@ root {
run_parameters {
outcome: ["Acts", "Diagnoses", "Outcomes"] // pipeline of calculation of outcome, possible values : Acts, Diagnoses, and Outcomes
exposure: ["Patients", "StartGapPatients", "DrugPurchases", "Exposures"] // pipeline of the calculation of exposure, possible values : Patients, StartGapPatients, DrugPurchases, Exposures
hospital_stay: ["HospitalStay"] //pipeline for hospital stay, possible values : HospitalStay
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/config/fall/paths/cmap.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ env_name = "cmap"

input = {
dcir = "/shared/Observapur/staging/Flattening/flat_table/DCIR"
mco_ce = "/shared/Observapur/staging/Flattening/flat_table/MCO_CE"
mco = "/shared/Observapur/staging/Flattening/flat_table/MCO"
mco_ce = "/user/ds/CNAM360/flattening/flat_table/MCO_CE"
ir_ben = "/shared/Observapur/staging/Flattening/single_table/IR_BEN_R"
ir_imb = "/shared/Observapur/staging/Flattening/single_table/IR_IMB_R"
ir_pha = "/shared/Observapur/staging/Flattening/single_table/IR_PHA_R_MOL"
Expand Down
1 change: 0 additions & 1 deletion src/main/resources/config/fall/template.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# output.root = "target/test/output"
# output.save_mode = "errorIfExists" // Possible values = [overwrite, append, errorIfExists, withTimestamp] Strategy of saving output data. errorIfExists by deault

# exposures.min_purchases: 1 // 1+ (Usually 1 or 2)
# exposures.start_delay: 0 months // 0+ (Usually between 0 and 3). Represents the delay in months between a dispensation and its exposure start date.
# exposures.purchases_window: 0 months // 0+ (Usually 0 or 6) Represents the window size in months. Ignored when min_purchases=1.
# exposures.end_threshold_gc: 90 days // If periodStrategy="limited", represents the period without purchases for an exposure to be considered "finished".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,53 @@

package fr.polytechnique.cmap.cnam.etl.datatypes

import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

trait Addable [A] {
def + (other: A): RemainingPeriod[A]
}

object Addable {
/***
* Transforms a list of `Addable` of type A into a list of A where every element do not intersect with any other
* element of the list. The passed list of elements must ordered from the first to the last.
* To achieve the non intersection property, the algorithm proceedes as the following:
* 1. Takes an element of `RightRemainingPeriod` and add it to head of List of `LeftRemainingPeriod`.
* 1. If the result is `RightRemainingPeriod`, recurse with the new element as the `RightRemainingPeriod`.
* 2. If the result is `LeftRemainingPeriod`, add it to the Accumulator, and recurse with the head of of List of
* `LeftRemainingPeriod` as the new `RightRemainingPeriod`.
* 3. If the result is `DisjointedRemainingPeriod`, add the Left part to the Accumulator,
* and recurse with the right part.
* 4. If the result is `NullRemainingPeriod`, add the element of `RightRemainingPeriod` to the Accumulator,
* and recurse with the head of of List of `LeftRemainingPeriod` as the new `RightRemainingPeriod`
* 2. Recurse until lrs is empty.
* @param rr element to start the add operation with
* @param lrs list of LeftRemainingPeriod to combine. Must be timely ordered with respect to definition of `+` of type A.
* @param acc list to accumulate the results in.
* @tparam A type parameter. A must extend the Addable trait.
* @return List of `LeftRemainingPeriod` of type A.
*/
@tailrec
def combineAddables[A <: Addable[A] : ClassTag : TypeTag](
rr: RightRemainingPeriod[A],
lrs: List[LeftRemainingPeriod[A]],
acc: List[LeftRemainingPeriod[A]]): List[LeftRemainingPeriod[A]] = {
lrs match {
case Nil => rr.toLeft :: acc
case lr :: Nil => rr.e + lr.e match {
case NullRemainingPeriod => acc
case l: LeftRemainingPeriod[A] => l :: acc
case r: RightRemainingPeriod[A] => combineAddables[A](r, List.empty, acc)
case d: DisjointedRemainingPeriod[A] => combineAddables[A](d.r, List.empty, d.l :: acc)
}
case lr :: lr2 :: rest => rr.e + lr.e match {
case NullRemainingPeriod => combineAddables(lr2.toRight, rest, acc)
case l: LeftRemainingPeriod[A] => combineAddables(lr2.toRight, rest, l :: acc)
case r: RightRemainingPeriod[A] => combineAddables[A](r, lr2 :: rest, acc)
case d: DisjointedRemainingPeriod[A] => combineAddables[A](d.r, lr2 :: rest, d.l :: acc)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.sql.Timestamp
import fr.polytechnique.cmap.cnam.etl.transformers.interaction._
import fr.polytechnique.cmap.cnam.util.functions._

case class Period(start: Timestamp, end: Timestamp) extends Remainable[Period] with Addable[Period]{
case class Period(start: Timestamp, end: Timestamp) extends Subtractable[Period] with Addable[Period]{
self =>

def & (other: Period): Boolean = {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

package fr.polytechnique.cmap.cnam.etl.datatypes

import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import cats.Functor


Expand Down Expand Up @@ -37,22 +34,5 @@ object RemainingPeriod {
}
}
}

@tailrec
def delimitPeriods[A <: Remainable[A] : ClassTag : TypeTag](
rr: RightRemainingPeriod[A],
lrs: List[LeftRemainingPeriod[A]],
acc: List[LeftRemainingPeriod[A]]): List[LeftRemainingPeriod[A]] = {
lrs match {
case Nil => rr.toLeft :: acc
case _@LeftRemainingPeriod(null) :: Nil => rr.toLeft :: acc // This happens because of the Left Join
case lr :: rest => rr.e - lr.e match {
case NullRemainingPeriod => acc
case l: LeftRemainingPeriod[A] => l :: acc
case r: RightRemainingPeriod[A] => delimitPeriods[A](r, rest, acc)
case d: DisjointedRemainingPeriod[A] => delimitPeriods[A](d.r, rest, d.l :: acc)
}
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// License: BSD 3 clause

package fr.polytechnique.cmap.cnam.etl.datatypes

import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

trait Subtractable[A] {
def - (other: A): RemainingPeriod[A]
}

object Subtractable {
/***
* Subtract a list of `LeftRemainingPeriod` A from RightRemainingPeriod of type A. It proceedes until it is not
* possible to subtract.
* To achieve the non subtraction property, the algorithm proceeds as the following:
* 1. Takes the element of `RightRemainingPeriod` and subtract the head of List of `LeftRemainingPeriod` from it.
* 1. If the result is `RightRemainingPeriod`, recurse with the remaining element as the `RightRemainingPeriod`.
* 2. If the result is `LeftRemainingPeriod`, add it to the Accumulator, stop.
* 3. If the result is `DisjointedRemainingPeriod`, add the Left part to the Accumulator,
* and recurse with the right part.
* 4. If the result is `NullRemainingPeriod`, return the acc and stop it.
* 2. Recurse until lrs is empty.
* @param rr element to start the add operation with
* @param lrs list of LeftRemainingPeriod to combine. Must be timely ordered with respect to definition of `-` of type A.
* @param acc list to accumulate the results in.
* @tparam A type parameter. A must extend the Subtractable trait.
* @return List of `LeftRemainingPeriod` of type A.
*/
@tailrec
def combineSubtracables[A <: Subtractable[A] : ClassTag : TypeTag](
rr: RightRemainingPeriod[A],
lrs: List[LeftRemainingPeriod[A]],
acc: List[LeftRemainingPeriod[A]]): List[LeftRemainingPeriod[A]] = {
lrs match {
case Nil => rr.toLeft :: acc
case _@LeftRemainingPeriod(null) :: Nil => rr.toLeft :: acc // This happens because of the Left Join
case lr :: rest => rr.e - lr.e match {
case NullRemainingPeriod => acc
case l: LeftRemainingPeriod[A] => l :: acc
case r: RightRemainingPeriod[A] => combineSubtracables[A](r, rest, acc)
case d: DisjointedRemainingPeriod[A] => combineSubtracables[A](d.r, rest, d.l :: acc)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,26 @@ import fr.polytechnique.cmap.cnam.etl.datatypes._
import fr.polytechnique.cmap.cnam.etl.events.{Event, Exposure}


/***
* Internal Data representation that allows LimitedPeriodAdder to transform DrugPurchase into an Exposure.
*
* There is only two ways of creating a Duration Exposure:
* 1. From transforming a `Event[Drug]`. This transformation is the responsibility of the caller.
* 2. By calling `+` of this class.
* @param patientID string representation of patientID
* @param value equivalent of the value of `Event[Drug]`
* @param period represents the `Timestamps` of the beginning and the end of the Exposure.
* @param span long that represents the duration of the Exposure.
*/
case class ExposureDuration(patientID: String, value: String, period: Period, span: Long)
extends Addable[ExposureDuration] {
self =>
/***
* Add two `ExposureDuration` to form a `RemainingPeriod[ExposureDuration]`. For the exact rules, look at the
* test of `+` for `Period` & `ExposureDuration`
* @param other other `ExposureDruration` to be added
* @return result of the addition as `RemainingPeriod[ExposureDuration]`
*/
override def +(other: ExposureDuration): RemainingPeriod[ExposureDuration] =
if ((self.patientID != other.patientID) | (self.value != other.value)) {
RightRemainingPeriod(self)
Expand All @@ -33,7 +50,27 @@ case class ExposureDuration(patientID: String, value: String, period: Period, sp
}
}
}
}

/***
* Defines the strategies to be used to add different ExposureDuration and make them one.
*/
sealed trait ExposureDurationStrategy extends Function1[ExposureDuration, Event[Exposure]] with Serializable

def toExposure: Event[Exposure] =
Exposure(self.patientID, self.value, 1D, self.period.start, self.period.start + Duration(milliseconds = span) get)
/***
* Sets the Exposure end as start + purchase_1*purchase_1_duration + purchase_2*purchase_2_duration ...
*/
object PurchaseCountBased extends ExposureDurationStrategy {
override def apply(v1: ExposureDuration): Event[Exposure] = {
Exposure(v1.patientID, v1.value, 1D, v1.period.start, v1.period.start + Duration(milliseconds = v1.span) get)
}
}

/***
* Sets the Exposure end as being the end of the last Exposure Duration end.
*/
object LatestPurchaseBased extends ExposureDurationStrategy {
override def apply(v1: ExposureDuration): Event[Exposure] = {
Exposure(v1.patientID, "NA", v1.value, 1D, v1.period.start, Some(v1.period.end))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

package fr.polytechnique.cmap.cnam.etl.transformers.exposures

import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import me.danielpes.spark.datetime.implicits._
import me.danielpes.spark.datetime.{Period => Duration}
import org.apache.spark.sql.Dataset
Expand Down Expand Up @@ -40,7 +37,8 @@ final case class LimitedExposureAdder(
override val startDelay: Duration,
endDelay: Duration,
endThresholdGc: Duration,
endThresholdNgc: Duration) extends ExposurePeriodAdder(startDelay) {
endThresholdNgc: Duration,
toExposureStrategy: ExposureDurationStrategy) extends ExposurePeriodAdder(startDelay) {

override def toExposure(followUps: Dataset[Event[FollowUp]])
(drugs: Dataset[Event[Drug]]): Dataset[Event[Exposure]] = {
Expand All @@ -54,12 +52,13 @@ final case class LimitedExposureAdder(
.map(fromDrugToExposureDuration)
.groupByKey(ep => (ep.patientID, ep.value))
.flatMapGroups((_, eds) => combineExposureDurations(eds))
.map(e => e.toExposure)
.map(ed => ed.copy(period = ed.period.copy(end = ed.period.end - endDelay get)))
.map(e => toExposureStrategy(e))
}

def combineExposureDurations(exposureDurations: Iterator[ExposureDuration]): List[ExposureDuration] = {
val sortedExposureDurations = exposureDurations.toList.sortBy(_.period.start).map(LeftRemainingPeriod(_))
combineExposureDurationsRec(sortedExposureDurations.head.toRight, sortedExposureDurations.drop(1), List.empty)
Addable.combineAddables(sortedExposureDurations.head.toRight, sortedExposureDurations.drop(1), List.empty)
.map(_.e)
}

Expand All @@ -73,29 +72,6 @@ final case class LimitedExposureAdder(
)
}


@tailrec
def combineExposureDurationsRec[A <: Addable[A] : ClassTag : TypeTag](
rr: RightRemainingPeriod[A],
lrs: List[LeftRemainingPeriod[A]],
acc: List[LeftRemainingPeriod[A]]): List[LeftRemainingPeriod[A]] = {
lrs match {
case Nil => rr.toLeft :: acc
case lr :: Nil => rr.e + lr.e match {
case NullRemainingPeriod => acc
case l: LeftRemainingPeriod[A] => l :: acc
case r: RightRemainingPeriod[A] => combineExposureDurationsRec[A](r, List.empty, acc)
case d: DisjointedRemainingPeriod[A] => combineExposureDurationsRec[A](d.r, List.empty, d.l :: acc)
}
case lr :: lr2 :: rest => rr.e + lr.e match {
case NullRemainingPeriod => combineExposureDurationsRec(lr2.toRight, rest, acc)
case l: LeftRemainingPeriod[A] => combineExposureDurationsRec(lr2.toRight, rest, l :: acc)
case r: RightRemainingPeriod[A] => combineExposureDurationsRec[A](r, lr2 :: rest, acc)
case d: DisjointedRemainingPeriod[A] => combineExposureDurationsRec[A](d.r, lr2 :: rest, d.l :: acc)
}
}
}

def fromConditioningToDuration(weight: Double): Long = weight match {
case 1 => endThresholdGc.totalMilliseconds
case _ => endThresholdNgc.totalMilliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
package fr.polytechnique.cmap.cnam.etl.transformers.interaction

import cats.syntax.functor._
import fr.polytechnique.cmap.cnam.etl.datatypes.{NullRemainingPeriod, Period, Remainable, RemainingPeriod}
import fr.polytechnique.cmap.cnam.etl.datatypes.{NullRemainingPeriod, Period, Subtractable, RemainingPeriod}
import fr.polytechnique.cmap.cnam.etl.events.{Event, Interaction}

case class ExposureN(patientID: String, values: Set[String], period: Period) extends Remainable[ExposureN] {
case class ExposureN(patientID: String, values: Set[String], period: Period) extends Subtractable[ExposureN] {
self =>

def intersect(other: ExposureN): Option[ExposureN] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
package fr.polytechnique.cmap.cnam.etl.transformers.interaction

import org.apache.spark.sql.{Dataset, functions}
import fr.polytechnique.cmap.cnam.etl.datatypes.{LeftRemainingPeriod, Period, RemainingPeriod, RightRemainingPeriod}
import fr.polytechnique.cmap.cnam.etl.datatypes._
import fr.polytechnique.cmap.cnam.etl.events.{Event, Exposure, Interaction}
import fr.polytechnique.cmap.cnam.util.functions._

Expand Down Expand Up @@ -68,7 +68,7 @@ case class NLevelInteractionTransformer(config: InteractionTransformerConfig) ex
.flatMapGroups(
(e, i) => {
val sortedPeriods = i.map(_._2).toList.sortBy(_.period.start)
RemainingPeriod.delimitPeriods(
Subtractable.combineSubtracables(
RightRemainingPeriod(e),
sortedPeriods.map(e => LeftRemainingPeriod[ExposureN](e)),
List.empty[LeftRemainingPeriod[ExposureN]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ object FallConfig extends FallConfigLoader with FractureCodes {
startDelay = 0.days,
15.days,
90.days,
30.days
30.days,
PurchaseCountBased
)
) extends ExposuresTransformerConfig(exposurePeriodAdder = exposurePeriodAdder)

Expand Down
Loading

0 comments on commit 6e2fa10

Please sign in to comment.