diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/LastOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/LastOp.groovy new file mode 100644 index 0000000000..4a3734b0f9 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/extension/LastOp.groovy @@ -0,0 +1,8 @@ +package nextflow.extension + +/** + * + * @author Paolo Di Tommaso + */ +class LastOp { +} diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy index 5eff3cd920..4a53489ab5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/OperatorImpl.groovy @@ -384,11 +384,7 @@ class OperatorImpl { * @return A {@code DataflowVariable} emitting the `last` item in the channel */ DataflowWriteChannel last( final DataflowReadChannel source ) { - - def target = new DataflowVariable() - def last = null - subscribeImpl( source, [onNext: { last = it }, onComplete: { target.bind(last) }] ) - return target + new LastOp().withSource(source).apply() } DataflowWriteChannel collect(final DataflowReadChannel source, Closure action=null) { @@ -399,7 +395,6 @@ class OperatorImpl { return new CollectOp(source,action,opts).apply() } - /** * Convert a {@code DataflowQueue} alias *channel* to a Java {@code List} * diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/TakeOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/TakeOp.groovy index f7bf259d0a..b7bd3a7c6e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/TakeOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/TakeOp.groovy @@ -23,15 +23,15 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import groovyx.gpars.dataflow.DataflowReadChannel import groovyx.gpars.dataflow.DataflowWriteChannel -import groovyx.gpars.dataflow.operator.ChainWithClosure -import groovyx.gpars.dataflow.operator.CopyChannelsClosure import groovyx.gpars.dataflow.operator.DataflowEventAdapter import groovyx.gpars.dataflow.operator.DataflowProcessor import nextflow.Channel import nextflow.Global import nextflow.Session +import nextflow.extension.op.Op /** + * Implement "take" operator * * @author Paolo Di Tommaso */ @@ -73,11 +73,16 @@ class TakeOp { } } - newOperator( - inputs: [source], - outputs: [target], - listeners: (length > 0 ? [listener] : []), - new ChainWithClosure(new CopyChannelsClosure())) + final params = new OpParams() + .withInput(source) + .withOutput(target) + if( length>0 ) + params.withListener(listener) + + newOperator(params) { + final proc = getDelegate() as DataflowProcessor + Op.bind(proc, target, it) + } return target } diff --git a/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy b/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy index 6a6ab327e9..c544aa2e3e 100644 --- a/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy @@ -368,7 +368,6 @@ class ProvTest extends Dsl2Spec { } def 'should track provenance with first operator'() { - when: dsl_eval(globalConfig(), ''' workflow { @@ -394,4 +393,64 @@ class ProvTest extends Dsl2Spec { upstream1.size() == 1 upstream1.first.name == 'p1 (1)' } + + def 'should track provenance with take operator'() { + when: + dsl_eval(globalConfig(), ''' + workflow { + channel.of(1,2,3,4,5) | p1 | take(2) | p2 + } + + process p1 { + input: val(x) + output: val(y) + exec: + y = x + } + + process p2 { + input: val(x) + exec: + println x + } + ''') + + then: + def upstream1 = upstreamTasksOf('p2 (1)') + upstream1.size() == 1 + upstream1.first.name == 'p1 (1)' + then: + def upstream2 = upstreamTasksOf('p2 (2)') + upstream2.size() == 1 + upstream2.first.name == 'p1 (2)' + + } + + def 'should track provenance with last operator'() { + when: + dsl_eval(globalConfig(), ''' + workflow { + channel.of(1,2,3,4,5) | p1 | last | p2 + } + + process p1 { + input: val(x) + output: val(y) + exec: + y = x + } + + process p2 { + input: val(x) + exec: + println x + } + ''') + + then: + def upstream1 = upstreamTasksOf('p2') + upstream1.size() == 1 + upstream1.first.name == 'p1 (5)' + + } }