Skip to content

Commit

Permalink
move WithActionParametersSupport
Browse files Browse the repository at this point in the history
  • Loading branch information
gskrobisz committed Jan 13, 2025
1 parent 0d7cb6e commit c4a3f99
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 63 deletions.
4 changes: 2 additions & 2 deletions .run/NussknackerApp-dist-config.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="NussknackerApp-dist-config" type="Application" factoryName="Application">
<option name="ALTERNATIVE_JRE_PATH" value="corretto-11" />
<option name="ALTERNATIVE_JRE_PATH" value="/usr/lib/jvm/java-1.11.0-openjdk-amd64" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<envs>
<env name="AUTHENTICATION_USERS_FILE" value="../../../nussknacker-dist/src/universal/conf/users.conf" />
Expand Down Expand Up @@ -34,4 +34,4 @@
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
</component>
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@ trait TestWithParametersSupport[+T] { self: Source =>
def parametersToTestData(params: Map[ParameterName, AnyRef]): T
}

/**
* Used to define Source parameters for each activity
* e.g.
* {"DEPLOY": { "parametername": ...parameter configuration... }
*/
trait WithActionParameters { self: Source =>
def actionParametersDefinition: Map[String, Map[String, ParameterConfig]]
}

/**
* [[pl.touk.nussknacker.engine.api.process.SourceFactory]] has to have method annotated with [[pl.touk.nussknacker.engine.api.MethodToInvoke]]
* that returns [[pl.touk.nussknacker.engine.api.process.Source]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import io.circe.generic.JsonCodec
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.definition.RawParameterEditor
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.definition.activity.ActionInfoProvider
import pl.touk.nussknacker.restmodel.definition.UiActionParameterConfig
import pl.touk.nussknacker.ui.process.newactivity.ActionInfoService.{ActivityName, UiActionNodeParameters}
import pl.touk.nussknacker.ui.process.newactivity.ActionInfoService.UiActionNodeParameters
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.uiresolving.UIProcessResolver

Expand All @@ -20,16 +21,16 @@ class ActionInfoService(activityInfoProvider: ActionInfoProvider, processResolve
isFragment: Boolean
)(
implicit user: LoggedUser
): Map[ActivityName, List[UiActionNodeParameters]] = {
): Map[String, List[UiActionNodeParameters]] = {
val canonical = toCanonicalProcess(scenarioGraph, processVersion, isFragment)
activityInfoProvider
.getActionParameters(processVersion, canonical)
.map { case (activityName, nodeParamsMap) =>
activityName -> nodeParamsMap.map { case (nodeId, params) =>
.map { case (scenarioActionName, nodeParamsMap) =>
scenarioActionName.value -> nodeParamsMap.map { case (nodeId, params) =>
UiActionNodeParameters(
NodeId(nodeId),
nodeId,
params.map { case (name, value) =>
name -> UiActionParameterConfig(
name.value -> UiActionParameterConfig(
value.defaultValue,
value.editor.getOrElse(RawParameterEditor),
value.label,
Expand All @@ -53,6 +54,5 @@ class ActionInfoService(activityInfoProvider: ActionInfoProvider, processResolve
}

object ActionInfoService {
type ActivityName = String
@JsonCodec case class UiActionNodeParameters(nodeId: NodeId, parameters: Map[String, UiActionParameterConfig])
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pl.touk.nussknacker.test.{NuRestAssureMatchers, RestAssuredVerboseLogging
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.test.utils.domain.TestProcessUtil

class ActivityInfoResourcesSpec
class ActionInfoResourcesSpec
extends AnyFreeSpecLike
with NuItTest
with WithSimplifiedDesignerConfig
Expand All @@ -20,8 +20,8 @@ class ActivityInfoResourcesSpec
with NuRestAssureMatchers
with RestAssuredVerboseLoggingIfValidationFails {

"The scenario activity info endpoint when" - {
"return activity parameters when defined" in {
"The scenario action info endpoint when" - {
"return action parameters when defined" in {
val scenario = ScenarioBuilder
.streaming("scenarioWithSourceWithDeployParameters")
.source("sourceWithParametersId", "boundedSourceWithOffset", "elements" -> "{'one', 'two', 'three'}".spel)
Expand All @@ -34,7 +34,7 @@ class ActivityInfoResourcesSpec
.when()
.basicAuthAllPermUser()
.jsonBody(TestProcessUtil.toJson(scenario).noSpaces)
.post(s"$nuDesignerHttpAddress/api/activityInfo/${scenario.name.value}/activityParameters")
.post(s"$nuDesignerHttpAddress/api/actionInfo/${scenario.name.value}/actionParameters")
.Then()
.statusCode(200)
.body(
Expand All @@ -45,7 +45,7 @@ class ActivityInfoResourcesSpec
)
}

"return empty map when no activity parameters" in {
"return empty map when no action parameters" in {
val scenario = ScenarioBuilder
.streaming("scenarioWithoutParameters")
.source("sourceNoParamsId", "boundedSource", "elements" -> "{'one', 'two', 'three'}".spel)
Expand All @@ -58,7 +58,7 @@ class ActivityInfoResourcesSpec
.when()
.basicAuthAllPermUser()
.jsonBody(TestProcessUtil.toJson(scenario).noSpaces)
.post(s"$nuDesignerHttpAddress/api/activityInfo/${scenario.name.value}/activityParameters")
.post(s"$nuDesignerHttpAddress/api/actionInfo/${scenario.name.value}/actionParameters")
.Then()
.statusCode(200)
.equalsJsonBody(
Expand All @@ -76,7 +76,7 @@ class ActivityInfoResourcesSpec
.when()
.basicAuthAllPermUser()
.jsonBody(TestProcessUtil.toJson(scenario).noSpaces)
.post(s"$nuDesignerHttpAddress/api/activityInfo/${scenario.name.value}/activityParameters")
.post(s"$nuDesignerHttpAddress/api/actionInfo/${scenario.name.value}/actionParameters")
.Then()
.statusCode(404)
.equalsPlainBody(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.component.ParameterConfig
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, Parameter}
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName
import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, WithActionParametersSupport}
import pl.touk.nussknacker.engine.api.editor.FixedValuesEditorMode
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.{
ContextInitializer,
TestWithParametersSupport,
TopicName,
WithActionParameters
}
import pl.touk.nussknacker.engine.api.process.{ContextInitializer, TestWithParametersSupport, TopicName}
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext}
import pl.touk.nussknacker.engine.api.test.{TestRecord, TestRecordParser}
import pl.touk.nussknacker.engine.flink.api.exception.ExceptionHandler
Expand Down Expand Up @@ -67,7 +62,7 @@ class FlinkKafkaSource[T](
with FlinkSourceTestSupport[T]
with RecordFormatterBaseTestDataGenerator
with TestWithParametersSupport[T]
with WithActionParameters
with WithActionParametersSupport
with LazyLogging {

@silent("deprecated")
Expand All @@ -89,9 +84,9 @@ class FlinkKafkaSource[T](
private val defaultOffsetResetStrategy =
if (kafkaConfig.forceLatestRead.contains(true)) OffsetResetStrategy.Reset else OffsetResetStrategy.Continue

override def actionParametersDefinition: Map[String, Map[String, ParameterConfig]] = {
override def actionParametersDefinition: Map[ScenarioActionName, Map[ParameterName, ParameterConfig]] = {
Map(
ScenarioActionName.Deploy.value -> Map(
ScenarioActionName.Deploy -> Map(
OFFSET_RESET_STRATEGY_PARAM_NAME -> ParameterConfig(
defaultValue = Some(defaultOffsetResetStrategy.toString),
editor = Some(
Expand Down Expand Up @@ -128,7 +123,7 @@ class FlinkKafkaSource[T](
): SourceFunction[T] = {
val offsetResetStrategy =
flinkNodeContext.nodeDeploymentData
.flatMap(_.get(OFFSET_RESET_STRATEGY_PARAM_NAME))
.flatMap(_.get(OFFSET_RESET_STRATEGY_PARAM_NAME.value))
.map(OffsetResetStrategy.withName)
.getOrElse(defaultOffsetResetStrategy)
logger.info(
Expand Down Expand Up @@ -211,7 +206,7 @@ class FlinkKafkaSource[T](
}

object FlinkKafkaSource {
val OFFSET_RESET_STRATEGY_PARAM_NAME = "offsetResetStrategy"
val OFFSET_RESET_STRATEGY_PARAM_NAME: ParameterName = ParameterName("offsetResetStrategy")

object OffsetResetStrategy extends Enumeration {
type OffsetResetStrategy = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class KafkaSourceWithDeploymentParametersSpec extends KafkaSourceFactoryProcessM
NodesDeploymentData(dataByNodeId =
Map(
NodeId("procSource") -> Map(
FlinkKafkaSource.OFFSET_RESET_STRATEGY_PARAM_NAME -> offsetResetStrategy.toString
FlinkKafkaSource.OFFSET_RESET_STRATEGY_PARAM_NAME.value -> offsetResetStrategy.toString
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.api.component.{ParameterConfig, UnboundedStreamComponent}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, RawParameterEditor}
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName
import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, WithActionParametersSupport}
import pl.touk.nussknacker.engine.api.editor.FixedValuesEditorMode
import pl.touk.nussknacker.engine.api.process.{SourceFactory, WithActionParameters}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.SourceFactory
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext
Expand All @@ -25,15 +26,15 @@ object BoundedSource extends SourceFactory with UnboundedStreamComponent {

object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamComponent {

val OFFSET_PARAMETER_NAME = "offset"
val OFFSET_PARAMETER_NAME: ParameterName = ParameterName("offset")

@MethodToInvoke
def source(@ParamName("elements") elements: java.util.List[Any]) =
new CollectionSource[Any](elements.asScala.toList, None, Unknown) with WithActionParameters {
new CollectionSource[Any](elements.asScala.toList, None, Unknown) with WithActionParametersSupport {

override def actionParametersDefinition: Map[String, Map[String, ParameterConfig]] = {
override def actionParametersDefinition: Map[ScenarioActionName, Map[ParameterName, ParameterConfig]] = {
Map(
ScenarioActionName.Deploy.value -> deployActivityParameters
ScenarioActionName.Deploy -> deployActivityParameters
)
}

Expand All @@ -44,7 +45,7 @@ object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamCompone
): DataStreamSource[T] = {
val offsetOpt =
flinkNodeContext.nodeDeploymentData
.flatMap(_.get(OFFSET_PARAMETER_NAME))
.flatMap(_.get(OFFSET_PARAMETER_NAME.value))
.flatMap(s => Try(s.toInt).toOption)
val elementsWithOffset = offsetOpt match {
case Some(offset) => list.drop(offset)
Expand All @@ -55,7 +56,7 @@ object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamCompone

}

private def deployActivityParameters: Map[String, ParameterConfig] = {
private def deployActivityParameters: Map[ParameterName, ParameterConfig] = {
Map(
OFFSET_PARAMETER_NAME -> ParameterConfig(
defaultValue = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package pl.touk.nussknacker.engine.api.deployment
import io.circe.generic.JsonCodec
import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder}
import io.circe.{Decoder, Encoder}
import pl.touk.nussknacker.engine.api.component.ParameterConfig
import pl.touk.nussknacker.engine.api.deployment.ProcessActionState.ProcessActionState
import pl.touk.nussknacker.engine.api.process.{ProcessId, VersionId}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.{ProcessId, Source, VersionId}

import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -59,6 +61,13 @@ final case class ScenarioActionName(value: String) extends AnyVal {
override def toString: String = value
}

/**
* Used to define Source parameters for each action
*/
trait WithActionParametersSupport { self: Source =>
def actionParametersDefinition: Map[ScenarioActionName, Map[ParameterName, ParameterConfig]]
}

object ScenarioActionName {

implicit val encoder: Encoder[ScenarioActionName] = deriveUnwrappedEncoder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package pl.touk.nussknacker.engine.definition.activity

import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.{NodeId, ProcessVersion}
import pl.touk.nussknacker.engine.api.component.ParameterConfig
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess

trait ActionInfoProvider {

def getActionParameters(processVersion: ProcessVersion, scenario: CanonicalProcess): Map[String, Map[String, Map[String, ParameterConfig]]]
def getActionParameters(
processVersion: ProcessVersion,
scenario: CanonicalProcess
): Map[ScenarioActionName, Map[NodeId, Map[ParameterName, ParameterConfig]]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package pl.touk.nussknacker.engine.definition.activity
import cats.data.Validated.Valid
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.component.ParameterConfig
import pl.touk.nussknacker.engine.api.process.WithActionParameters
import pl.touk.nussknacker.engine.api.deployment.{ScenarioActionName, WithActionParametersSupport}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.{JobData, MetaData, NodeId, ProcessVersion}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.graph.node.SourceNodeData
Expand All @@ -16,37 +17,37 @@ class ModelDataActionInfoProvider(modelData: ModelData)
override def getActionParameters(
processVersion: ProcessVersion,
scenario: CanonicalProcess
): Map[String, Map[String, Map[String, ParameterConfig]]] = {
): Map[ScenarioActionName, Map[NodeId, Map[ParameterName, ParameterConfig]]] = {
val jobData = JobData(scenario.metaData, processVersion)
modelData.withThisAsContextClassLoader {
val nodeToActivityToParameters = collectAllSources(scenario)
.map(source => source.id -> getActivityParameters(source, jobData))
.map(source => NodeId(source.id) -> getActionParameters(source, jobData))
.toMap
groupByAction(nodeToActivityToParameters)
}
}

private def groupByAction(
nodeToActionToParameters: Map[String, Map[String, Map[String, ParameterConfig]]]
): Map[String, Map[String, Map[String, ParameterConfig]]] = {
val activityToNodeToParameters = for {
(node, activityToParams) <- nodeToActionToParameters.toList
(activity, params) <- activityToParams.toList
} yield (activity, node -> params)
activityToNodeToParameters
nodeToActionToParameters: Map[NodeId, Map[ScenarioActionName, Map[ParameterName, ParameterConfig]]]
): Map[ScenarioActionName, Map[NodeId, Map[ParameterName, ParameterConfig]]] = {
val actionToNodeToParameters = for {
(node, actionToParams) <- nodeToActionToParameters.toList
(actionName, params) <- actionToParams.toList
} yield (actionName, node -> params)
actionToNodeToParameters
.groupBy(_._1)
.mapValuesNow(_.map(_._2).toMap)
}

private def getActivityParameters(
private def getActionParameters(
source: SourceNodeData,
jobData: JobData
): Map[String, Map[String, ParameterConfig]] = {
): Map[ScenarioActionName, Map[ParameterName, ParameterConfig]] = {
modelData.withThisAsContextClassLoader {
val compiledSource = prepareSourceObj(source)(jobData, NodeId(source.id))
compiledSource match {
case Valid(s: WithActionParameters) => s.actionParametersDefinition
case _ => Map.empty
case Valid(s: WithActionParametersSupport) => s.actionParametersDefinition
case _ => Map.empty
}
}
}
Expand Down

0 comments on commit c4a3f99

Please sign in to comment.