Skip to content

Commit

Permalink
fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zeyu10 committed Jan 13, 2025
1 parent b5c8ebf commit cb2f9fe
Show file tree
Hide file tree
Showing 19 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ public class OlympiceneFacade {
public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage,
Callback<DAGCallbackInfo> callback, DAGDispatcher dagDispatcher,
DAGStorageProcedure dagStorageProcedure, TimeChecker timeChecker,
SwitcherManager switcherManager) {
SwitcherManager switcherManager, Tracer tracer) {
ExecutorService executor = SameThreadExecutorService.INSTANCE;
return build(dagInfoStorage, dagContextStorage, dagStorageProcedure, callback, null,
dagDispatcher, timeChecker, executor, switcherManager, null);
dagDispatcher, timeChecker, executor, switcherManager, tracer);
}

public static Olympicene build(DAGInfoStorage dagInfoStorage, DAGContextStorage dagContextStorage, DAGStorageProcedure dagStorageProcedure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class ChoiceWithForeachTaskTraversalTest extends Specification {
Expand All @@ -29,7 +30,7 @@ class ChoiceWithForeachTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

/**
* A -> B -> C
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class ConstMappingsTest extends Specification {
Expand All @@ -32,7 +33,7 @@ class ConstMappingsTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "test const mappings should work well"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class ForeachTaskTraversalTest extends Specification {
Expand All @@ -33,7 +34,7 @@ class ForeachTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

/**
* A -> B -> C
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class FunctionRetryTest extends Specification {
Expand All @@ -30,7 +31,7 @@ class FunctionRetryTest extends Specification {
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
DefaultTimeChecker timeChecker = Mock(DefaultTimeChecker.class)
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager, Mock(Tracer))
DAG dag

def setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class FunctionTaskToleranceTraversalTest extends Specification {
Expand All @@ -30,7 +31,7 @@ class FunctionTaskToleranceTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "test one functionTask skip dag should work well"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class FunctionTaskTraversalFailedTest extends Specification {
Expand All @@ -31,7 +32,7 @@ class FunctionTaskTraversalFailedTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "test one functionTask failed dag should work well"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class FunctionTaskTraversalTest extends Specification {
Expand All @@ -31,7 +32,7 @@ class FunctionTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "test one functionTask dag should work well"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class InvokeMsgTest extends Specification {
Expand All @@ -30,7 +31,7 @@ class InvokeMsgTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def setup() {
switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import io.opentelemetry.api.trace.Tracer
import org.apache.commons.collections.CollectionUtils
import spock.lang.Specification

Expand All @@ -31,7 +32,7 @@ class InvokeTimeTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
String executionId = 'executionId'

def "dagInfo and taskInfo should save invoke time"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import groovy.util.logging.Slf4j
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

@Slf4j
Expand All @@ -31,7 +32,7 @@ class MultiDAGTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
DAG dag

def setup() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package com.weibo.rill.flow.olympicene.traversal

import com.weibo.rill.flow.interfaces.model.task.TaskStatus
import com.weibo.rill.flow.olympicene.core.event.Callback
import com.weibo.rill.flow.olympicene.core.event.Event
import com.weibo.rill.flow.olympicene.core.model.DAGSettings
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo
import com.weibo.rill.flow.olympicene.core.model.dag.DAG
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus
import com.weibo.rill.flow.interfaces.model.task.TaskStatus
import com.weibo.rill.flow.olympicene.core.runtime.DAGParser
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager
import com.weibo.rill.flow.olympicene.ddl.parser.DAGStringParser
import com.weibo.rill.flow.olympicene.ddl.serialize.YAMLSerializer
import com.weibo.rill.flow.olympicene.ddl.validation.dag.impl.FlowDAGValidator
import com.weibo.rill.flow.olympicene.ddl.validation.task.impl.FunctionTaskValidator
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient
import com.weibo.rill.flow.olympicene.storage.save.impl.DAGLocalStorage
import com.weibo.rill.flow.olympicene.storage.save.impl.LocalStorageProcedure
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class PassTaskTraversalTest extends Specification {
Expand All @@ -31,7 +31,7 @@ class PassTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "test one passTask dag should work well"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import groovy.util.logging.Slf4j
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification


Expand All @@ -31,7 +32,7 @@ class RedoTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))
String executionId = "xxx1"
DAG dag

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification


Expand All @@ -28,7 +29,7 @@ class ReturnTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "if return task status is success then next tasks status should be skip"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class SuspenseTaskTraversalTest extends Specification {
Expand All @@ -32,7 +33,7 @@ class SuspenseTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "test suspense task"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import com.weibo.rill.flow.olympicene.traversal.checker.DefaultTimeChecker
import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification

class SwitchTaskTraversalTest extends Specification {
Expand All @@ -27,7 +28,7 @@ class SwitchTaskTraversalTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "test basic switch"() {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.weibo.rill.flow.olympicene.traversal.config.OlympiceneFacade
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent
import io.opentelemetry.api.trace.Tracer
import spock.lang.Specification


Expand All @@ -29,7 +30,7 @@ class TaskDegradeTest extends Specification {
DAGDispatcher dispatcher = Mock(DAGDispatcher.class)
DAGStorageProcedure dagStorageProcedure = new LocalStorageProcedure()
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager, Mock(Tracer))

def "degrade current task only test"() {
given:
Expand Down
Loading

0 comments on commit cb2f9fe

Please sign in to comment.