Skip to content

Commit

Permalink
fix typo in PreviousValueTransformer (#2032)
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomsky authored Aug 11, 2021
1 parent 0daa89a commit c6254dc
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ object LastVariableFilterTransformer extends CustomStreamTransformer with Single

override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])(implicit nodeId: ProcessCompilationError.NodeId): NodeTransformationDefinition = {
case TransformationStep(Nil, _) => NextParameters(groupByParameter.parameter :: valueParameter.parameter ::Nil)
case TransformationStep((gropuByParameterName,_ ) :: (`valueParameterName`, DefinedLazyParameter(expr)) :: Nil, _) => NextParameters(conditionParameter(expr.returnType)::Nil)
case TransformationStep((_,_ ) :: (`valueParameterName`, DefinedLazyParameter(expr)) :: Nil, _) => NextParameters(conditionParameter(expr.returnType)::Nil)
//if we cannot determine value, we'll assume it's type is Unknown
case TransformationStep((gropuByParameterName, _) :: (`valueParameterName`, FailedToDefineParameter) :: Nil, _) => NextParameters(conditionParameter(Unknown)::Nil)
case TransformationStep((gropuByParameterName, _) :: (`valueParameterName`, _) :: (`conditionParameterName`, _) :: Nil, _) => FinalResults(context)
case TransformationStep((_, _) :: (`valueParameterName`, FailedToDefineParameter) :: Nil, _) => NextParameters(conditionParameter(Unknown)::Nil)
case TransformationStep((_, _) :: (`valueParameterName`, _) :: (`conditionParameterName`, _) :: Nil, _) => FinalResults(context)
}

override def initialParameters: List[Parameter] = List(groupByParameter.parameter, valueParameter.parameter, conditionParameter(Unknown))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ case object PreviousValueTransformer extends CustomStreamTransformer with Explic
type Value = AnyRef

@MethodToInvoke(returnType = classOf[Value])
def execute(@ParamName("gropuBy") groupBy: LazyParameter[CharSequence],
def execute(@ParamName("groupBy") groupBy: LazyParameter[CharSequence],
@ParamName("value") value: LazyParameter[Value])
= FlinkCustomStreamTransformation((start: DataStream[Context], ctx: FlinkCustomNodeContext) =>
setUidToNodeIdIfNeed(ctx,
Expand Down

0 comments on commit c6254dc

Please sign in to comment.