Skip to content

Commit

Permalink
Add take and last operator
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Jan 9, 2025
1 parent 0014db9 commit 3e21298
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package nextflow.extension

/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
class LastOp {
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,7 @@ class OperatorImpl {
* @return A {@code DataflowVariable} emitting the `last` item in the channel
*/
DataflowWriteChannel last( final DataflowReadChannel source ) {

def target = new DataflowVariable()
def last = null
subscribeImpl( source, [onNext: { last = it }, onComplete: { target.bind(last) }] )
return target
new LastOp().withSource(source).apply()
}

DataflowWriteChannel collect(final DataflowReadChannel source, Closure action=null) {
Expand All @@ -399,7 +395,6 @@ class OperatorImpl {
return new CollectOp(source,action,opts).apply()
}


/**
* Convert a {@code DataflowQueue} alias *channel* to a Java {@code List}
*
Expand Down
19 changes: 12 additions & 7 deletions modules/nextflow/src/main/groovy/nextflow/extension/TakeOp.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.operator.ChainWithClosure
import groovyx.gpars.dataflow.operator.CopyChannelsClosure
import groovyx.gpars.dataflow.operator.DataflowEventAdapter
import groovyx.gpars.dataflow.operator.DataflowProcessor
import nextflow.Channel
import nextflow.Global
import nextflow.Session
import nextflow.extension.op.Op

/**
* Implement "take" operator
*
* @author Paolo Di Tommaso <[email protected]>
*/
Expand Down Expand Up @@ -73,11 +73,16 @@ class TakeOp {
}
}

newOperator(
inputs: [source],
outputs: [target],
listeners: (length > 0 ? [listener] : []),
new ChainWithClosure(new CopyChannelsClosure()))
final params = new OpParams()
.withInput(source)
.withOutput(target)
if( length>0 )
params.withListener(listener)

newOperator(params) {
final proc = getDelegate() as DataflowProcessor
Op.bind(proc, target, it)
}

return target
}
Expand Down
61 changes: 60 additions & 1 deletion modules/nextflow/src/test/groovy/nextflow/prov/ProvTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ class ProvTest extends Dsl2Spec {
}

def 'should track provenance with first operator'() {

when:
dsl_eval(globalConfig(), '''
workflow {
Expand All @@ -394,4 +393,64 @@ class ProvTest extends Dsl2Spec {
upstream1.size() == 1
upstream1.first.name == 'p1 (1)'
}

def 'should track provenance with take operator'() {
when:
dsl_eval(globalConfig(), '''
workflow {
channel.of(1,2,3,4,5) | p1 | take(2) | p2
}
process p1 {
input: val(x)
output: val(y)
exec:
y = x
}
process p2 {
input: val(x)
exec:
println x
}
''')

then:
def upstream1 = upstreamTasksOf('p2 (1)')
upstream1.size() == 1
upstream1.first.name == 'p1 (1)'
then:
def upstream2 = upstreamTasksOf('p2 (2)')
upstream2.size() == 1
upstream2.first.name == 'p1 (2)'

}

def 'should track provenance with last operator'() {
when:
dsl_eval(globalConfig(), '''
workflow {
channel.of(1,2,3,4,5) | p1 | last | p2
}
process p1 {
input: val(x)
output: val(y)
exec:
y = x
}
process p2 {
input: val(x)
exec:
println x
}
''')

then:
def upstream1 = upstreamTasksOf('p2')
upstream1.size() == 1
upstream1.first.name == 'p1 (5)'

}
}

0 comments on commit 3e21298

Please sign in to comment.