diff --git a/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala b/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala index ad977a76978..19cded18509 100644 --- a/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala +++ b/defaultModel/src/main/scala/pl/touk/nussknacker/defaultmodel/DefaultModelMigrations.scala @@ -5,14 +5,15 @@ import pl.touk.nussknacker.engine.migration.{ProcessMigration, ProcessMigrations class DefaultModelMigrations extends ProcessMigrations { - override def processMigrations: Map[Int, ProcessMigration] = ProcessMigrations - .listOf( - GroupByMigration, - SinkExpressionMigration, - RequestResponseSinkValidationModeMigration, - DecisionTableParameterNamesMigration, - PeriodicToSampleGeneratorMigration - ) - .processMigrations + override def processMigrations: Map[Int, ProcessMigration] = Map( + 1 -> GroupByMigration, + 2 -> SinkExpressionMigration, + 3 -> RequestResponseSinkValidationModeMigration, + 4 -> DecisionTableParameterNamesMigration, + 5 -> PeriodicToSampleGeneratorMigration, + // 100 -> NewMigration, + // Newly added migrations should be in the hundreds: 100, 200, 300 and so on. We do this because + // many ProcessMigrations can be loaded using SPI, and we want to avoid duplicate numbers when merging. + ) } diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/migration/ProcessMigration.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/migration/ProcessMigration.scala index 57db65e98ab..6bc4377af1e 100644 --- a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/migration/ProcessMigration.scala +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/migration/ProcessMigration.scala @@ -15,34 +15,9 @@ trait ProcessMigration { } -object ProcessMigrations { - - def empty: ProcessMigrations = new ProcessMigrations { - override def processMigrations: Map[Int, ProcessMigration] = Map() - } - - def listOf(migrations: ProcessMigration*): ProcessMigrations = new ProcessMigrations { - - override def processMigrations: Map[Int, ProcessMigration] = migrations.zipWithIndex.map { - case (processMigration, index) => index + 1 -> processMigration - }.toMap - - } - -} - -trait ProcessMigrations extends Serializable { - - def processMigrations: Map[Int, ProcessMigration] - - // we assume 0 is minimal version - def version: Int = (processMigrations.keys.toSet + 0).max - -} - /** - * It migrates data of each node in process without changing the structure of process graph. - */ + * It migrates data of each node in process without changing the structure of process graph. + */ trait NodeMigration extends ProcessMigration { def migrateNode(metaData: MetaData): PartialFunction[NodeData, NodeData] diff --git a/extensions-api/src/main/scala/pl/touk/nussknacker/engine/migration/ProcessMigrations.scala b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/migration/ProcessMigrations.scala new file mode 100644 index 00000000000..816a4aa967d --- /dev/null +++ b/extensions-api/src/main/scala/pl/touk/nussknacker/engine/migration/ProcessMigrations.scala @@ -0,0 +1,61 @@ +package pl.touk.nussknacker.engine.migration + +import cats.data.NonEmptyList +import cats.syntax.functor._ +import pl.touk.nussknacker.engine.migration.ProcessMigrations.CombineError.OverlappingMigrations +import pl.touk.nussknacker.engine.migration.ProcessMigrations.MigrationNumber + +object ProcessMigrations { + + type MigrationNumber = Int + + def empty: ProcessMigrations = new ProcessMigrations { + override def processMigrations: Map[MigrationNumber, ProcessMigration] = Map() + } + + def combine(processMigrationsList: List[ProcessMigrations]): Either[CombineError, ProcessMigrations] = { + processMigrationsList match { + case Nil => Right(empty) + case one :: Nil => Right(one) + case many => combineAtLeastTwoListOfMigrations(many) + } + } + + private def combineAtLeastTwoListOfMigrations(processMigrationsList: List[ProcessMigrations]) = { + val migrationNumberToOrigins: Map[MigrationNumber, List[ProcessMigrations]] = + processMigrationsList + .flatMap(pm => pm.processMigrations.keys.toList.tupleRight(pm)) + .groupBy(_._1) + .map { case (migrationNumber, migrationNumberAndOriginList) => + migrationNumber -> migrationNumberAndOriginList.map(_._2) + } + val overlappingMigrationNumbers = migrationNumberToOrigins.filter { case (_, origins) => origins.size >= 2 } + if (overlappingMigrationNumbers.nonEmpty) { + Left(OverlappingMigrations(overlappingMigrationNumbers)) + } else { + val combined = processMigrationsList + .map(_.processMigrations) + .reduceLeft((combinedMigrations, migrations) => combinedMigrations ++ migrations) + Right(new ProcessMigrations { + override def processMigrations: Map[MigrationNumber, ProcessMigration] = combined + }) + } + } + + sealed trait CombineError + + object CombineError { + case class OverlappingMigrations(migrationNumberToOrigins: Map[MigrationNumber, List[ProcessMigrations]]) + extends CombineError + } + +} + +trait ProcessMigrations extends Serializable { + + def processMigrations: Map[MigrationNumber, ProcessMigration] + + // we assume 0 is minimal version + def version: MigrationNumber = (processMigrations.keys.toSet + 0).max + +} diff --git a/extensions-api/src/test/scala/pl/touk/nussknacker/engine/migration/ProcessMigrationsTest.scala b/extensions-api/src/test/scala/pl/touk/nussknacker/engine/migration/ProcessMigrationsTest.scala new file mode 100644 index 00000000000..6876ababab0 --- /dev/null +++ b/extensions-api/src/test/scala/pl/touk/nussknacker/engine/migration/ProcessMigrationsTest.scala @@ -0,0 +1,65 @@ +package pl.touk.nussknacker.engine.migration + +import cats.data.NonEmptyList +import org.scalatest.Inside.inside +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess +import pl.touk.nussknacker.test.EitherValuesDetailedMessage + +class ProcessMigrationsTest extends AnyFunSuite with Matchers with EitherValuesDetailedMessage { + + test("should combine non overlapping migrations") { + val migrations1 = createMigrations(100, 200) + val migrations2 = createMigrations(101, 102, 103) + val migrations3 = createMigrations(110, 120) + + val combined = ProcessMigrations.combine(migrations1 :: migrations2 :: migrations3 :: Nil) + + combined.rightValue.processMigrations.keySet should contain only (100, 101, 102, 103, 110, 120, 200) + } + + test("should combine single migrations list") { + val migrations = createMigrations(100, 200) + + val combined = ProcessMigrations.combine(migrations :: Nil) + + combined.rightValue shouldBe theSameInstanceAs(migrations) + } + + test("should combine empty migrations list") { + val combined = ProcessMigrations.combine(Nil) + + combined.rightValue.version shouldBe 0 + combined.rightValue.processMigrations shouldBe Symbol("empty") + } + + test("should return error with overlapping migrations") { + val migrations1 = createMigrations(100, 200, 300, 400, 500) + val migrations2 = createMigrations(101, 200, 201, 202, 203, 300, 301) + val migrations3 = createMigrations(200, 210) + + val combined = ProcessMigrations.combine(migrations1 :: migrations2 :: migrations3 :: Nil) + + inside(combined.leftValue) { case ProcessMigrations.CombineError.OverlappingMigrations(overlappingMigrations) => + overlappingMigrations.keySet should contain only (200, 300) + overlappingMigrations(200) should contain only (migrations1, migrations2, migrations3) + overlappingMigrations(300) should contain only (migrations1, migrations2) + } + } + + private def createMigrations(migrationIds: Int*): ProcessMigrations = { + new ProcessMigrations { + override def processMigrations: Map[Int, ProcessMigration] = { + migrationIds.map(id => id -> EmptyMigration).toMap + } + } + } + + private object EmptyMigration extends ProcessMigration { + override def description: String = ??? + + override def migrateProcess(canonicalProcess: CanonicalProcess, category: String): CanonicalProcess = ??? + } + +} diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala index 5f65b3754bc..ae2c9f0fb84 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/ModelData.scala @@ -24,12 +24,7 @@ import pl.touk.nussknacker.engine.dict.DictServicesFactoryLoader import pl.touk.nussknacker.engine.migration.ProcessMigrations import pl.touk.nussknacker.engine.modelconfig._ import pl.touk.nussknacker.engine.util.ThreadUtils -import pl.touk.nussknacker.engine.util.loader.{ - DeploymentManagersClassLoader, - ModelClassLoader, - ProcessConfigCreatorLoader, - ScalaServiceLoader -} +import pl.touk.nussknacker.engine.util.loader.{ModelClassLoader, ProcessConfigCreatorLoader, ScalaServiceLoader} import pl.touk.nussknacker.engine.util.multiplicity.{Empty, Many, Multiplicity, One} import java.net.URL @@ -163,7 +158,12 @@ case class ClassLoaderModelData private ( case Empty() => ProcessMigrations.empty case One(migrationsDef) => migrationsDef case Many(moreThanOne) => - throw new IllegalArgumentException(s"More than one ProcessMigrations instance found: $moreThanOne") + ProcessMigrations + .combine(moreThanOne) + .fold( + error => throw new IllegalArgumentException(s"Cannot combine many migrations list because of: $error"), + identity + ) } }