Skip to content

Commit

Permalink
Data processing project (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
julien-truffaut authored Dec 15, 2020
1 parent 2e5be5c commit 5a58a09
Show file tree
Hide file tree
Showing 276 changed files with 10,597 additions and 4,146 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: build

on: [push, pull_request]

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 100
- uses: coursier/cache-action@v5
- uses: olafurpg/setup-scala@v10
- uses: olafurpg/setup-gpg@v3
- name: Run tests and doc
run: csbt test slides/mdoc
- name: Deploy 🚀
uses: JamesIves/github-pages-deploy-action@releases/v3
with:
ACCESS_TOKEN: ${{ secrets.ACCESS_TOKEN }}
BRANCH: gh-pages # The branch the action should deploy to.
FOLDER: slides/docs # The folder the action should deploy.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ deploy:
skip_cleanup: true
github_token: $GH_TOKEN
on:
branch: master
branch: data-processing-project
condition: $TRAVIS_PULL_REQUEST="false"
5 changes: 4 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ average temperature.
1. [Introduction](https://blog.fp-tower.com/foundations/data-processing/parallel-data-processing.html#1)
1. [Iteration with for loops](https://fp-tower.github.io/foundations/data-processing/iteration-with-for-loops.html#1)
1. [Recursions](https://fp-tower.github.io/foundations/data-processing/recursions.html#1)
1. [Analysis of global temperature](https://fp-tower.github.io/foundations/data-processing/analysis-of-global-temperature.html#1)
1. [Analysis of global temperature - part 1](https://fp-tower.github.io/foundations/data-processing/analysis-of-global-temperature-part-1.html#1)
1. [Analysis of global temperature - part 2](https://fp-tower.github.io/foundations/data-processing/analysis-of-global-temperature-part-2.html#1)
1. [Analysis of global temperature - part 3](https://fp-tower.github.io/foundations/data-processing/analysis-of-global-temperature-part-3.html#1)
1. [Analysis of global temperature - part 4](https://fp-tower.github.io/foundations/data-processing/analysis-of-global-temperature-part-4.html#1)


## 4. What is functional programming?
Expand Down
52 changes: 42 additions & 10 deletions answers/src/main/scala/answers/dataprocessing/Monoid.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
package answers.dataprocessing

trait Monoid[A] extends Semigroup[A] {
// forAll a: A, combine(a, default) == combiner(defaut, a) == a
// forAll a: A, combine(a, default) == combiner(default, a) == a
def default: A
}

// A Monoid where combine is also commutative
// forAll a1, a2: A, combine(a1, a2) == combine(a2, a1)
trait CommutativeMonoid[A] extends Monoid[A]

object CommutativeMonoid {
val sumInt: CommutativeMonoid[Int] = new CommutativeMonoid[Int] {
def default: Int = 0
def combine(first: Int, second: Int): Int = first + second
}

val sumDouble: CommutativeMonoid[Double] = new CommutativeMonoid[Double] {
def default: Double = 0.0
def combine(first: Double, second: Double): Double = first + second
}

def sumNumeric[A](implicit num: Numeric[A]): CommutativeMonoid[A] =
new CommutativeMonoid[A] {
def default: A = num.zero
def combine(first: A, second: A): A = num.plus(first, second)
}
}

object Monoid {

def minOption[A: Ordering]: Monoid[Option[A]] = maxByOption(identity)
def maxOption[A: Ordering]: Monoid[Option[A]] = minByOption(identity)

Expand All @@ -14,17 +37,11 @@ object Monoid {
def maxByOption[From, To: Ordering](zoom: From => To): Monoid[Option[From]] =
option(Semigroup.maxBy(zoom))

def sumNumeric[A](implicit num: Numeric[A]): Monoid[A] =
new Monoid[A] {
def default: A = num.zero
def combine(first: A, second: A): A = num.plus(first, second)
}

def tuple2[A, B](a: Monoid[A], b: Monoid[B]): Monoid[(A, B)] =
def zip[A, B](monoidA: Monoid[A], monoidB: Monoid[B]): Monoid[(A, B)] =
new Monoid[(A, B)] {
def default: (A, B) = (a.default, b.default)
def default: (A, B) = (monoidA.default, monoidB.default)
def combine(first: (A, B), second: (A, B)): (A, B) =
(a.combine(first._1, second._1), b.combine(first._2, second._2))
(monoidA.combine(first._1, second._1), monoidB.combine(first._2, second._2))
}

def option[A](semigroup: Semigroup[A]): Monoid[Option[A]] =
Expand Down Expand Up @@ -53,4 +70,19 @@ object Monoid {
}
}
}

val multiplyInt: Monoid[Int] = new Monoid[Int] {
def default: Int = 1
def combine(first: Int, second: Int): Int = first * second
}

val minInt: Monoid[Int] = new Monoid[Int] {
def default: Int = Int.MaxValue
def combine(first: Int, second: Int): Int = first min second
}

val maxInt: Monoid[Int] = new Monoid[Int] {
def default: Int = Int.MinValue
def combine(first: Int, second: Int): Int = first max second
}
}
24 changes: 21 additions & 3 deletions answers/src/main/scala/answers/dataprocessing/ParList.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ case class ParList[A](executionContext: ExecutionContext, partitions: List[List[
def map[To](update: A => To): ParList[To] =
ParList(executionContext, partitions.map(_.map(update)))

def foldLeft[B](default: B)(combine: (B, A) => A): B =
def foldLeft[To](default: To)(combine: (To, A) => To): To =
sys.error("Impossible")

def foldLeftV2[To](default: To)(combineElement: (To, A) => To)(combinePartition: (To, To) => To): To =
partitions
.map(_.foldLeft(default)(combineElement))
.foldLeft(default)(combinePartition)

// 1st `monoFoldLeft` implementation before introducing `Monoid`
def monoFoldLeftV1(default: A)(combine: (A, A) => A): A =
partitions
Expand All @@ -25,7 +30,7 @@ case class ParList[A](executionContext: ExecutionContext, partitions: List[List[
.foldLeft(monoid.default)(monoid.combine)

def size: Int =
parFoldMap(_ => 1)(Monoid.sumNumeric)
parFoldMap(_ => 1)(CommutativeMonoid.sumNumeric)

def min(implicit ord: Ordering[A]): Option[A] =
minBy(identity)
Expand All @@ -48,7 +53,7 @@ case class ParList[A](executionContext: ExecutionContext, partitions: List[List[
parReduceMap(identity)(Semigroup.maxBy(zoom))

def sum(implicit num: Numeric[A]): A =
fold(Monoid.sumNumeric)
fold(CommutativeMonoid.sumNumeric)

def fold(monoid: Monoid[A]): A =
parFoldMap(identity)(monoid)
Expand All @@ -75,6 +80,19 @@ case class ParList[A](executionContext: ExecutionContext, partitions: List[List[
def parFoldMap[To](update: A => To)(monoid: Monoid[To]): To =
parReduceMap(update)(monoid).getOrElse(monoid.default)

def parFoldMapUnordered[To](update: A => To)(monoid: CommutativeMonoid[To]): To = {
val ref = Ref(monoid.default)

def foldPartition(partition: List[A]): Future[Any] =
Future {
val res = partition.foldLeft(monoid.default)((state, value) => monoid.combine(state, update(value)))
ref.modify(monoid.combine(res, _))
}(executionContext)

partitions.map(foldPartition).foreach(Await.ready(_, Duration.Inf))
ref.get
}

def parReduceMap[To](update: A => To)(semigroup: Semigroup[To]): Option[To] = {
def reducePartition(partition: List[A]): Future[To] =
Future {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package answers.dataprocessing

// A Monoid without default value
trait Semigroup[A] {
// combine is associative (this means you can move parentheses around)
// combine is associative
// forAll a1, a2, a3: A, combine(a1, combine(a2, a3)) == combine(combine(a1, a2), a3)
def combine(first: A, second: A): A
}
Expand All @@ -19,4 +20,9 @@ object Semigroup {
def combine(first: From, second: From): From =
Ordering.by(zoom).max(first, second)
}

def nSmallest[A: Ordering](n: Int): Semigroup[List[A]] = new Semigroup[List[A]] {
def combine(first: List[A], second: List[A]): List[A] =
(first ++ second).sorted.take(n) // can do much better
}
}
8 changes: 4 additions & 4 deletions answers/src/main/scala/answers/dataprocessing/Summary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ object Summary {
val semigroup: Semigroup[Summary] = new Semigroup[Summary] {
val semigroupMin = Semigroup.minBy((_: Sample).temperatureFahrenheit)
val semigroupMax = Semigroup.maxBy((_: Sample).temperatureFahrenheit)
val monoidSumDouble = Monoid.sumNumeric[Double]
val monoidSumInt = Monoid.sumNumeric[Int]
val monoidSumDouble = CommutativeMonoid.sumNumeric[Double]
val monoidSumInt = CommutativeMonoid.sumNumeric[Int]

def combine(first: Summary, second: Summary): Summary =
Summary(
Expand All @@ -39,8 +39,8 @@ object Summary {
val semigroupDerived: Semigroup[Summary] = new Semigroup[Summary] {
val semigroupMin = Semigroup.minBy((_: Sample).temperatureFahrenheit)
val semigroupMax = Semigroup.maxBy((_: Sample).temperatureFahrenheit)
val monoidSumDouble = Monoid.sumNumeric[Double]
val monoidSumInt = Monoid.sumNumeric[Int]
val monoidSumDouble = CommutativeMonoid.sumNumeric[Double]
val monoidSumInt = CommutativeMonoid.sumNumeric[Int]

def combine(first: Summary, second: Summary): Summary =
Summary(
Expand Down
10 changes: 5 additions & 5 deletions answers/src/main/scala/answers/dataprocessing/SummaryV1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ case class SummaryV1(min: Option[Sample], max: Option[Sample], sum: Double, size

override def toString: String =
f"Summary(avg = ${average.getOrElse(0.0)}%.2f, " +
f"size = $size,\n " +
f"min = $min,\n " +
f"max = $max\n)"
s"size = $size,\n " +
s"min = $min,\n " +
s"max = $max\n)"
}

object SummaryV1 {
Expand Down Expand Up @@ -53,8 +53,8 @@ object SummaryV1 {
val monoidDerived: Monoid[SummaryV1] = new Monoid[SummaryV1] {
val monoidMin = Monoid.minByOption((_: Sample).temperatureFahrenheit)
val monoidMax = Monoid.maxByOption((_: Sample).temperatureFahrenheit)
val monoidSumDouble = Monoid.sumNumeric[Double]
val monoidSumInt = Monoid.sumNumeric[Int]
val monoidSumDouble = CommutativeMonoid.sumNumeric[Double]
val monoidSumInt = CommutativeMonoid.sumNumeric[Int]

def default: SummaryV1 = SummaryV1(
min = monoidMin.default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ object TemperatureAnswers {
def sumTemperature(samples: ParList[Sample]): Double =
samples.partitions.map(_.map(_.temperatureFahrenheit).sum).sum

def sumTemperatureV2(samples: ParList[Sample]): Double =
samples.foldLeftV2(0.0)((state, sample) => state + sample.temperatureFahrenheit)(_ + _)

def averageTemperatureV2(samples: ParList[Sample]): Option[Double] = {
val (length, sum) = samples.partitions
.map(
Expand All @@ -36,6 +39,14 @@ object TemperatureAnswers {
Option.unless(length == 0)(sum / length)
}

def summaryList(samples: List[Sample]): SummaryV1 =
SummaryV1(
min = samples.minByOption(_.temperatureFahrenheit),
max = samples.maxByOption(_.temperatureFahrenheit),
sum = samples.foldLeft(0.0)((state, sample) => state + sample.temperatureFahrenheit),
size = samples.size
)

def summaryListOnePass(samples: List[Sample]): SummaryV1 =
samples.foldLeft(
SummaryV1(
Expand All @@ -62,15 +73,18 @@ object TemperatureAnswers {
)
)

def summaryList(samples: List[Sample]): SummaryV1 =
def summaryParList(samples: ParList[Sample]): SummaryV1 =
SummaryV1(
min = samples.minByOption(_.temperatureFahrenheit),
max = samples.maxByOption(_.temperatureFahrenheit),
sum = samples.foldLeft(0.0)((state, sample) => state + sample.temperatureFahrenheit),
min = samples.minBy(_.temperatureFahrenheit),
max = samples.maxBy(_.temperatureFahrenheit),
sum = samples.foldMap(_.temperatureFahrenheit)(CommutativeMonoid.sumNumeric),
size = samples.size
)

def summaryParListOnePass(samples: ParList[Sample]): SummaryV1 =
samples.parFoldMap(SummaryV1.one)(SummaryV1.monoidDerived)
def summaryParListOnePassFoldMap(samples: ParList[Sample]): SummaryV1 =
samples.parFoldMap(SummaryV1.one)(SummaryV1.monoid)

def summaryParListOnePassReduceMap(samples: ParList[Sample]): SummaryV1 =
SummaryV1.fromSummary(samples.parReduceMap(Summary.one)(Summary.semigroup))

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,60 +23,62 @@ object TemperatureNotebookAnswers extends App {
val samplesArray = samples.toArray
val parSamplesArray = ParArray(computeEC, samplesArray, partitionSize)

val minSampleByTemperature: Option[Sample] =
TemperatureAnswers.minSampleByTemperature(parSamples)
val coldestSample = TemperatureAnswers.minSampleByTemperature(parSamples)

println(s"Min sample by temperature is $minSampleByTemperature")
println(s"Min sample by temperature is $coldestSample")
println(s"Max sample by temperature is ${parSamples.maxBy(_.temperatureFahrenheit)}")
println(s"Min sample by date is ${parSamples.minBy(_.localDate)}")
println(s"Max sample by date is ${parSamples.maxBy(_.localDate)}")

val averageTemperature: Option[Double] =
TemperatureAnswers.averageTemperature(parSamples)
val averageTemperature = TemperatureAnswers.averageTemperature(parSamples)

println(s"Average temperature is $averageTemperature")

println(s"Temperature summary is ${parSamples.parFoldMap(SummaryV1.one)(SummaryV1.monoid)}")

val summariesPerCity = aggregatePerLabel(parSamples)(s => List(s.city))
sealed trait Label
case class City(value: String) extends Label
case class Country(value: String) extends Label

summariesPerCity.get("Bordeaux").foreach(println)
summariesPerCity.get("London").foreach(println)
val summariesPerCity = aggregatePerLabel(parSamples)(s => List(City(s.city), Country(s.country)))

summariesPerCity.get(City("Bordeaux")).foreach(println)
summariesPerCity.get(City("London")).foreach(println)
summariesPerCity.get(City("Mexico")).foreach(println)
summariesPerCity.get(Country("London")).foreach(println)

bench("sum")(
Labelled("List foldLeft", () => samples.foldLeft(0.0)((state, sample) => state + sample.temperatureFahrenheit)),
Labelled("List map + sum", () => samples.map(_.temperatureFahrenheit).sum),
Labelled("ParList foldMap", () => parSamples.foldMap(_.temperatureFahrenheit)(Monoid.sumNumeric)),
Labelled("ParList parFoldMap", () => parSamples.parFoldMap(_.temperatureFahrenheit)(Monoid.sumNumeric)),
Labelled("ParArray parFoldMap", () => parSamplesArray.parFoldMap(_.temperatureFahrenheit)(Monoid.sumNumeric)),
Labelled("ParList foldMap", () => parSamples.foldMap(_.temperatureFahrenheit)(CommutativeMonoid.sumNumeric)),
Labelled("ParList parFoldMap", () => parSamples.parFoldMap(_.temperatureFahrenheit)(CommutativeMonoid.sumNumeric)),
Labelled("ParList parFoldMapUnordered",
() => parSamples.parFoldMapUnordered(_.temperatureFahrenheit)(CommutativeMonoid.sumNumeric)),
Labelled("ParArray parFoldMap",
() => parSamplesArray.parFoldMap(_.temperatureFahrenheit)(CommutativeMonoid.sumNumeric)),
Labelled("Array foldLeft",
() => samplesArray.foldLeft(0.0)((state, sample) => state + sample.temperatureFahrenheit)),
)

bench("min")(
Labelled("ParList minBy", () => parSamples.minBy(_.temperatureFahrenheit)),
Labelled("ParList parFoldMap", () => parSamples.parFoldMap(Option(_))(Monoid.minByOption(_.temperatureFahrenheit))),
Labelled("List minByOption", () => samples.minByOption(_.temperatureFahrenheit)),
)

bench("summary")(
Labelled("List 4 passes", () => TemperatureAnswers.summaryList(samples)),
Labelled("List one-pass", () => TemperatureAnswers.summaryListOnePass(samples)),
Labelled("ParList one-pass foldMap hard-coded Monoid",
() => parSamples.parFoldMap(SummaryV1.one)(SummaryV1.monoid)),
Labelled("ParList one-pass foldMap derived Monoid",
() => parSamples.parFoldMap(SummaryV1.one)(SummaryV1.monoidDerived)),
Labelled("ParList one-pass reduceMap hard-coded Semigroup",
() => SummaryV1.fromSummary(parSamples.parReduceMap(Summary.one)(Summary.semigroup))),
Labelled("ParList one-pass reduceMap derived Semigroup",
() => SummaryV1.fromSummary(parSamples.parReduceMap(Summary.one)(Summary.semigroupDerived))),
Labelled("List 4 iterations", () => TemperatureAnswers.summaryList(samples)),
Labelled("List 1 iteration", () => TemperatureAnswers.summaryListOnePass(samples)),
Labelled("ParList 4 iterations", () => TemperatureAnswers.summaryParList(parSamples)),
Labelled("ParList 1 iteration foldMap", () => TemperatureAnswers.summaryParListOnePassFoldMap(parSamples)),
Labelled("ParList 1 iteration reduceMap", () => TemperatureAnswers.summaryParListOnePassReduceMap(parSamples)),
)

bench("aggregatePerLabel")(
Labelled("ParList city", () => aggregatePerLabel(parSamples)(s => List(s.city))),
Labelled("ParList country", () => aggregatePerLabel(parSamples)(s => List(s.country))),
Labelled("ParList Bordeaux", () => aggregatePerLabel(parSamples)(s => List(s.city).filter(_ == "Bordeaux"))),
Labelled("ParList city, country, region",
() => aggregatePerLabel(parSamples)(s => List(s.city, s.country, s.region))),
Labelled("city", () => aggregatePerLabel(parSamples)(s => List(s.city))),
Labelled("country", () => aggregatePerLabel(parSamples)(s => List(s.country))),
Labelled("Bordeaux", () => aggregatePerLabel(parSamples)(s => List(s.city).filter(_ == "Bordeaux"))),
Labelled("city, country, region", () => aggregatePerLabel(parSamples)(s => List(s.city, s.country, s.region))),
)

def aggregatePerLabel[Label](parList: ParList[Sample])(labels: Sample => List[Label]): Map[Label, Summary] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object TimeUtil {
times.foreach(label => println(s" ${label.padName(maxLabelLength)}: ${label.value}"))
}

case class Labelled[A](name: String, value: A) {
case class Labelled[+A](name: String, value: A) {
def map[To](update: A => To): Labelled[To] =
copy(value = update(value))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ class ParArrayTest extends AnyFunSuite with ScalaCheckDrivenPropertyChecks {

test("par sum") {
val parArray = ParArray(ec, 1.to(20).toArray, 10)
val monoid = CommutativeMonoid.sumNumeric[Int]
assert(
parArray.parFoldMap(identity)(Monoid.sumNumeric) == parArray.parFoldMap(identity)(Monoid.sumNumeric)
parArray.parFoldMap(identity)(monoid) == parArray.parFoldMap(identity)(monoid)
)
}

Expand Down
Loading

0 comments on commit 5a58a09

Please sign in to comment.