Skip to content

Commit

Permalink
multiple migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrz committed Jan 27, 2025
1 parent 0f1ccfb commit fae152b
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import pl.touk.nussknacker.engine.migration.{ProcessMigration, ProcessMigrations

class DefaultModelMigrations extends ProcessMigrations {

override def processMigrations: Map[Int, ProcessMigration] = ProcessMigrations
.listOf(
GroupByMigration,
SinkExpressionMigration,
RequestResponseSinkValidationModeMigration,
DecisionTableParameterNamesMigration,
PeriodicToSampleGeneratorMigration
)
.processMigrations
override def processMigrations: Map[Int, ProcessMigration] = Map(
1 -> GroupByMigration,
2 -> SinkExpressionMigration,
3 -> RequestResponseSinkValidationModeMigration,
4 -> DecisionTableParameterNamesMigration,
5 -> PeriodicToSampleGeneratorMigration,
// 100 -> NewMigration,
// Newly added migrations should be in the hundreds: 100, 200, 300 and so on. We do this because
// many ProcessMigrations can be loaded using SPI, and we want to avoid duplicate numbers when merging.
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,9 @@ trait ProcessMigration {

}

object ProcessMigrations {

def empty: ProcessMigrations = new ProcessMigrations {
override def processMigrations: Map[Int, ProcessMigration] = Map()
}

def listOf(migrations: ProcessMigration*): ProcessMigrations = new ProcessMigrations {

override def processMigrations: Map[Int, ProcessMigration] = migrations.zipWithIndex.map {
case (processMigration, index) => index + 1 -> processMigration
}.toMap

}

}

trait ProcessMigrations extends Serializable {

def processMigrations: Map[Int, ProcessMigration]

// we assume 0 is minimal version
def version: Int = (processMigrations.keys.toSet + 0).max

}

/**
* It migrates data of each node in process without changing the structure of process graph.
*/
* It migrates data of each node in process without changing the structure of process graph.
*/
trait NodeMigration extends ProcessMigration {

def migrateNode(metaData: MetaData): PartialFunction[NodeData, NodeData]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package pl.touk.nussknacker.engine.migration

import cats.data.NonEmptyList
import cats.syntax.functor._
import pl.touk.nussknacker.engine.migration.ProcessMigrations.CombineError.OverlappingMigrations
import pl.touk.nussknacker.engine.migration.ProcessMigrations.MigrationNumber

object ProcessMigrations {

type MigrationNumber = Int

def empty: ProcessMigrations = new ProcessMigrations {
override def processMigrations: Map[MigrationNumber, ProcessMigration] = Map()
}

def combine(processMigrationsList: List[ProcessMigrations]): Either[CombineError, ProcessMigrations] = {
processMigrationsList match {
case Nil => Right(empty)
case one :: Nil => Right(one)
case many => combineAtLeastTwoListOfMigrations(many)
}
}

private def combineAtLeastTwoListOfMigrations(processMigrationsList: List[ProcessMigrations]) = {
val migrationNumberToOrigins: Map[MigrationNumber, List[ProcessMigrations]] =
processMigrationsList
.flatMap(pm => pm.processMigrations.keys.toList.tupleRight(pm))
.groupBy(_._1)
.map { case (migrationNumber, migrationNumberAndOriginList) =>
migrationNumber -> migrationNumberAndOriginList.map(_._2)
}
val overlappingMigrationNumbers = migrationNumberToOrigins.filter { case (_, origins) => origins.size >= 2 }
if (overlappingMigrationNumbers.nonEmpty) {
Left(OverlappingMigrations(overlappingMigrationNumbers))
} else {
val combined = processMigrationsList
.map(_.processMigrations)
.reduceLeft((combinedMigrations, migrations) => combinedMigrations ++ migrations)
Right(new ProcessMigrations {
override def processMigrations: Map[MigrationNumber, ProcessMigration] = combined
})
}
}

sealed trait CombineError

object CombineError {
case class OverlappingMigrations(migrationNumberToOrigins: Map[MigrationNumber, List[ProcessMigrations]])
extends CombineError
}

}

trait ProcessMigrations extends Serializable {

def processMigrations: Map[MigrationNumber, ProcessMigration]

// we assume 0 is minimal version
def version: MigrationNumber = (processMigrations.keys.toSet + 0).max

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package pl.touk.nussknacker.engine.migration

import cats.data.NonEmptyList
import org.scalatest.Inside.inside
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.test.EitherValuesDetailedMessage

class ProcessMigrationsTest extends AnyFunSuite with Matchers with EitherValuesDetailedMessage {

test("should combine non overlapping migrations") {
val migrations1 = createMigrations(100, 200)
val migrations2 = createMigrations(101, 102, 103)
val migrations3 = createMigrations(110, 120)

val combined = ProcessMigrations.combine(migrations1 :: migrations2 :: migrations3 :: Nil)

combined.rightValue.processMigrations.keySet should contain only (100, 101, 102, 103, 110, 120, 200)
}

test("should combine single migrations list") {
val migrations = createMigrations(100, 200)

val combined = ProcessMigrations.combine(migrations :: Nil)

combined.rightValue shouldBe theSameInstanceAs(migrations)
}

test("should combine empty migrations list") {
val combined = ProcessMigrations.combine(Nil)

combined.rightValue.version shouldBe 0
combined.rightValue.processMigrations shouldBe Symbol("empty")
}

test("should return error with overlapping migrations") {
val migrations1 = createMigrations(100, 200, 300, 400, 500)
val migrations2 = createMigrations(101, 200, 201, 202, 203, 300, 301)
val migrations3 = createMigrations(200, 210)

val combined = ProcessMigrations.combine(migrations1 :: migrations2 :: migrations3 :: Nil)

inside(combined.leftValue) { case ProcessMigrations.CombineError.OverlappingMigrations(overlappingMigrations) =>
overlappingMigrations.keySet should contain only (200, 300)
overlappingMigrations(200) should contain only (migrations1, migrations2, migrations3)
overlappingMigrations(300) should contain only (migrations1, migrations2)
}
}

private def createMigrations(migrationIds: Int*): ProcessMigrations = {
new ProcessMigrations {
override def processMigrations: Map[Int, ProcessMigration] = {
migrationIds.map(id => id -> EmptyMigration).toMap
}
}
}

private object EmptyMigration extends ProcessMigration {
override def description: String = ???

override def migrateProcess(canonicalProcess: CanonicalProcess, category: String): CanonicalProcess = ???
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ import pl.touk.nussknacker.engine.dict.DictServicesFactoryLoader
import pl.touk.nussknacker.engine.migration.ProcessMigrations
import pl.touk.nussknacker.engine.modelconfig._
import pl.touk.nussknacker.engine.util.ThreadUtils
import pl.touk.nussknacker.engine.util.loader.{
DeploymentManagersClassLoader,
ModelClassLoader,
ProcessConfigCreatorLoader,
ScalaServiceLoader
}
import pl.touk.nussknacker.engine.util.loader.{ModelClassLoader, ProcessConfigCreatorLoader, ScalaServiceLoader}
import pl.touk.nussknacker.engine.util.multiplicity.{Empty, Many, Multiplicity, One}

import java.net.URL
Expand Down Expand Up @@ -163,7 +158,12 @@ case class ClassLoaderModelData private (
case Empty() => ProcessMigrations.empty
case One(migrationsDef) => migrationsDef
case Many(moreThanOne) =>
throw new IllegalArgumentException(s"More than one ProcessMigrations instance found: $moreThanOne")
ProcessMigrations
.combine(moreThanOne)
.fold(
error => throw new IllegalArgumentException(s"Cannot combine many migrations list because of: $error"),
identity
)
}
}

Expand Down

0 comments on commit fae152b

Please sign in to comment.