Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1960] API for messages preview in source #7510

Open
wants to merge 25 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.graph.{ProcessProperties, ScenarioGraph}
import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.graph.node.SourceNodeData
import pl.touk.nussknacker.engine.spel.ExpressionSuggestion
import pl.touk.nussknacker.restmodel.definition.UIValueParameter
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
Expand All @@ -16,10 +17,16 @@ import pl.touk.nussknacker.ui.api.BaseHttpService.CustomAuthorizationError
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.NodesError.{
InvalidNodeType,
MalformedTypingResult,
NoDataGenerated,
NoPermission,
NoProcessingType,
NoScenario
NoScenario,
NoSourcesWithTestDataGeneration,
Serialization,
SourceCompilation,
UnsupportedSourcePreview
}
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.{
ExpressionSuggestionDto,
Expand All @@ -34,11 +41,20 @@ import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.{
decodeVariableTypes,
prepareTypingResultDecoder
}
import pl.touk.nussknacker.ui.api.utils.ScenarioHttpServiceExtensions
import pl.touk.nussknacker.ui.api.utils.ScenarioDetailsOps._
import pl.touk.nussknacker.ui.api.utils.ScenarioHttpServiceExtensions
import pl.touk.nussknacker.ui.process.ProcessService
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository.ProcessDBQueryRepository.ProcessNotFoundError
import pl.touk.nussknacker.ui.process.test.ScenarioTestService
import pl.touk.nussknacker.ui.process.test.ScenarioTestService.ScenarioTestError
import pl.touk.nussknacker.ui.process.test.ScenarioTestService.ScenarioTestError.{
NoDataGeneratedError,
NoSourcesWithTestDataGenerationError,
SourceCompilationError,
UnsupportedSourcePreviewError,
_
}
import pl.touk.nussknacker.ui.security.api.{AuthManager, LoggedUser}
import pl.touk.nussknacker.ui.suggester.ExpressionSuggester
import pl.touk.nussknacker.ui.validation.{NodeValidator, ParametersValidator, UIProcessValidator}
Expand All @@ -52,6 +68,7 @@ class NodesApiHttpService(
processingTypeToNodeValidator: ProcessingTypeDataProvider[NodeValidator, _],
processingTypeToExpressionSuggester: ProcessingTypeDataProvider[ExpressionSuggester, _],
processingTypeToParametersValidator: ProcessingTypeDataProvider[ParametersValidator, _],
processingTypeToScenarioTestServices: ProcessingTypeDataProvider[ScenarioTestService, _],
protected override val scenarioService: ProcessService
)(override protected implicit val executionContext: ExecutionContext)
extends BaseHttpService(authManager)
Expand Down Expand Up @@ -144,6 +161,48 @@ class NodesApiHttpService(
}
}

expose {
nodesApiEndpoints.recordsEndpoint
.serverSecurityLogic(authorizeKnownUser[NodesError])
.serverLogicEitherT { implicit loggedUser =>
{ case (scenarioName, numberOfRecords, recordsRequestDto) =>
for {
scenarioWithDetails <- getScenarioWithDetailsByName(scenarioName)
scenarioTestService = processingTypeToScenarioTestServices.forProcessingTypeUnsafe(
scenarioWithDetails.processingType
)
sourceNodeData <- EitherT.fromEither[Future](recordsRequestDto.nodeData match {
case source: SourceNodeData => Right(source)
case other =>
Left(InvalidNodeType("SourceNodeData", other.getClass.getSimpleName))
})
parametersDefinition <- EitherT[Future, NodesError, String](
// The service is expected to limit the number of returned records if the response is too large.
// If it does not, we may need to implement client-side pagination or request size limits.
scenarioTestService.getDataFromSource(
recordsRequestDto.processProperties.toMetaData(scenarioName),
sourceNodeData,
numberOfRecords
) match {
case Left(SourceCompilationError(nodeId, errors)) =>
Future(Left(SourceCompilation(nodeId, errors)))
case Left(UnsupportedSourcePreviewError(nodeId)) =>
Future(Left(UnsupportedSourcePreview(nodeId)))
case Left(NoDataGeneratedError) =>
Future(Left(NoDataGenerated))
case Left(NoSourcesWithTestDataGenerationError) =>
Future(Left(NoSourcesWithTestDataGeneration))
case Left(SerializationError(message)) =>
Future(Left(Serialization(message)))
case Right(rawScenarioTestData) =>
Future(Right(rawScenarioTestData.content))
}
)
} yield parametersDefinition
}
}
}

expose {
nodesApiEndpoints.parametersValidationEndpoint
.serverSecurityLogic(authorizeKnownUser[NodesError])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.circe.{Decoder, Encoder, Json, KeyDecoder, KeyEncoder}
import org.springframework.util.ClassUtils
import pl.touk.nussknacker.engine.additionalInfo.{AdditionalInfo, MarkdownAdditionalInfo}
import pl.touk.nussknacker.engine.api.CirceUtil._
import pl.touk.nussknacker.engine.api.{LayoutData, ProcessAdditionalFields}
import pl.touk.nussknacker.engine.api.{LayoutData, ProcessAdditionalFields, StreamMetaData}
import pl.touk.nussknacker.engine.api.definition.{
FixedExpressionValue,
FixedExpressionValueWithIcon,
Expand Down Expand Up @@ -54,9 +54,15 @@ import pl.touk.nussknacker.security.AuthCredentials
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioGraphCodec._
import pl.touk.nussknacker.ui.api.TapirCodecs.ScenarioNameCodec._
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.NodesError.{
InvalidNodeType,
MalformedTypingResult,
NoDataGenerated,
NoProcessingType,
NoScenario
NoScenario,
NoSourcesWithTestDataGeneration,
Serialization,
SourceCompilation,
UnsupportedSourcePreview
}
import pl.touk.nussknacker.ui.api.BaseHttpService.CustomAuthorizationError
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.NodeDataSchemas.nodeDataSchema
Expand Down Expand Up @@ -222,8 +228,12 @@ class NodesApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpoi
)
.errorOut(
oneOf[NodesError](
noScenarioExample,
malformedTypingResultExample
sourceCompilationExample,
unsupportedSourcePreviewExample,
noDataGeneratedExample,
noSourcesWithTestDataGenerationExample,
serializationExample,
noScenarioExample
)
)
.withSecurity(auth)
Expand Down Expand Up @@ -349,6 +359,61 @@ class NodesApiEndpoints(auth: EndpointInput[AuthCredentials]) extends BaseEndpoi
.withSecurity(auth)
}

lazy val recordsEndpoint: SecuredEndpoint[
(ProcessName, Int, RecordsRequestDto),
NodesError,
String,
Any
] = {
baseNuApiEndpoint
.summary("Fetch records for specific node")
.tag("Nodes")
.post
.in("nodes" / path[ProcessName]("scenarioName") / "records")
.in(
query[Int]("limit")
.default(10)
.description("Limit the number of records returned")
)
.in(
jsonBody[RecordsRequestDto]
.example(
Example.of(
summary = Some("Basic fetch request"),
value = RecordsRequestDto(
ProcessProperties(StreamMetaData()),
Source("sourceId", SourceRef("source", List.empty), None)
)
)
)
)
.out(
statusCode(Ok).and(
stringBody
.examples(
List(
Example.of(
summary = Some("Simple scenario test data in json stringify form"),
value = "{name: John}"
)
)
)
)
)
.errorOut(
oneOf[NodesError](
sourceCompilationExample,
unsupportedSourcePreviewExample,
noDataGeneratedExample,
noSourcesWithTestDataGenerationExample,
serializationExample,
noScenarioExample,
invalidNodeTypeExample
)
)
.withSecurity(auth)
}

lazy val parametersValidationEndpoint: SecuredEndpoint[
(ProcessingType, ParametersValidationRequestDto),
NodesError,
Expand Down Expand Up @@ -587,6 +652,78 @@ object NodesApiEndpoints {
)
)

val sourceCompilationExample: EndpointOutput.OneOfVariant[SourceCompilation] =
oneOfVariantFromMatchType(
BadRequest,
plainBody[SourceCompilation]
.example(
Example.of(
summary = Some("Source compilation error"),
value = SourceCompilation("sourceId", List("Invalid source configuration", "Missing required parameter"))
)
)
)

val unsupportedSourcePreviewExample: EndpointOutput.OneOfVariant[UnsupportedSourcePreview] =
oneOfVariantFromMatchType(
NotFound,
plainBody[UnsupportedSourcePreview]
.example(
Example.of(
summary = Some("Source preview not supported"),
value = UnsupportedSourcePreview("sourceId")
)
)
)

val noDataGeneratedExample: EndpointOutput.OneOfVariant[NoDataGenerated.type] =
oneOfVariantFromMatchType(
NotFound,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than one output variant with the same status code won't work correctly. There is a long discussion in Tapirs repo why it works that way. The unpleasant thing is that Tapir don't event report an Exception in that case. It justs ignore some of variants. To make it work with Tapir, you need to have one output variant with given status code with many examples. Take a look at for example DeploymentApiEndpoints.runDeploymentEndpoint

plainBody[NoDataGenerated.type]
.example(
Example.of(
summary = Some("No test data generated"),
value = NoDataGenerated
)
)
)

val noSourcesWithTestDataGenerationExample: EndpointOutput.OneOfVariant[NoSourcesWithTestDataGeneration.type] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one won't happen because it is a valid error for test generation based on scenario, not a node. Maybe we could another sealed trait level below ScenarioTestError - SourceTestError. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make class hierarchy where ScenarioError extends SourceError, but it didn't really work too well with Eithers. This is why I duplicated classes a little bit and I made it a flat structure for both cases. I don't have any better ideas for now. WDYT?

oneOfVariantFromMatchType(
NotFound,
plainBody[NoSourcesWithTestDataGeneration.type]
.example(
Example.of(
summary = Some("No sources support test data"),
value = NoSourcesWithTestDataGeneration
)
)
)

val serializationExample: EndpointOutput.OneOfVariant[Serialization] =
oneOfVariantFromMatchType(
BadRequest,
plainBody[Serialization]
.example(
Example.of(
summary = Some("Serialization error"),
value = Serialization("Failed to serialize test data")
)
)
)

val invalidNodeTypeExample: EndpointOutput.OneOfVariant[InvalidNodeType] =
oneOfVariantFromMatchType(
BadRequest,
plainBody[InvalidNodeType]
.example(
Example.of(
summary = Some("Invalid node type error"),
value = InvalidNodeType("Filter", "Source")
)
)
)

}

object Dtos {
Expand Down Expand Up @@ -1472,10 +1609,53 @@ object NodesApiEndpoints {
sealed trait NodesError

object NodesError {
final case class NoScenario(scenarioName: ProcessName) extends NodesError
final case class NoProcessingType(processingType: ProcessingType) extends NodesError
final case object NoPermission extends NodesError with CustomAuthorizationError
final case class MalformedTypingResult(msg: String) extends NodesError
final case class SourceCompilation(nodeId: String, errors: List[String]) extends NodesError
final case class UnsupportedSourcePreview(nodeId: String) extends NodesError
final case object NoDataGenerated extends NodesError
final case object NoSourcesWithTestDataGeneration extends NodesError
final case class Serialization(msg: String) extends NodesError
final case class InvalidNodeType(expectedType: String, actualType: String) extends NodesError
final case class NoScenario(scenarioName: ProcessName) extends NodesError
final case class NoProcessingType(processingType: ProcessingType) extends NodesError
final case object NoPermission extends NodesError with CustomAuthorizationError
final case class MalformedTypingResult(msg: String) extends NodesError

implicit val sourceCompilationCodec: Codec[String, SourceCompilation, CodecFormat.TextPlain] = {
BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[SourceCompilation](e =>
s"Cannot compile source '${e.nodeId}'. Errors: ${e.errors.mkString(", ")}"
)
}

implicit val unsupportedSourcePreviewCodec: Codec[String, UnsupportedSourcePreview, CodecFormat.TextPlain] = {
BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[UnsupportedSourcePreview](e =>
s"Source '${e.nodeId}' doesn't support records preview"
)
}

implicit val noDataGeneratedCodec: Codec[String, NoDataGenerated.type, CodecFormat.TextPlain] = {
BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[NoDataGenerated.type](_ =>
"No test data was generated"
)
}

implicit val noSourcesWithTestDataGenerationCodec
: Codec[String, NoSourcesWithTestDataGeneration.type, CodecFormat.TextPlain] = {
BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[NoSourcesWithTestDataGeneration.type](_ =>
"No sources available that support test data generation"
)
}

implicit val serializationCodec: Codec[String, Serialization, CodecFormat.TextPlain] = {
BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[Serialization](e =>
s"Error during serialization: ${e.msg}"
)
}

implicit val invalidNodeTypeCodec: Codec[String, InvalidNodeType, CodecFormat.TextPlain] = {
BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[InvalidNodeType](e =>
s"Expected ${e.expectedType} but got: ${e.actualType}"
)
}

implicit val noScenarioCodec: Codec[String, NoScenario, CodecFormat.TextPlain] = {
BaseEndpointDefinitions.toTextPlainCodecSerializationOnly[NoScenario](e =>
Expand All @@ -1497,6 +1677,12 @@ object NodesApiEndpoints {

}

@derive(schema, encoder, decoder)
final case class RecordsRequestDto(
processProperties: ProcessProperties,
nodeData: NodeData
)

}

}
Expand Down
Loading
Loading