Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full execution provenance resolution #5639

Draft
wants to merge 54 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
b5d423b
wip
pditommaso Jan 3, 2025
ca9fe88
Task provenance - poc #1
pditommaso Jan 4, 2025
494bbb0
Task provenance - poc #2
pditommaso Jan 5, 2025
ddeee05
Add toList and toSortedList operators
pditommaso Jan 6, 2025
e4e505a
Refactor redice operator as its own class
pditommaso Jan 7, 2025
e613123
Fix tests
pditommaso Jan 7, 2025
1f49d92
Fix tests
pditommaso Jan 7, 2025
2ab3a6b
Fix tests
pditommaso Jan 7, 2025
1826cfd
Fix tests
pditommaso Jan 7, 2025
e7c8722
Fix more tests
pditommaso Jan 7, 2025
88ae57e
Add chain operator
pditommaso Jan 8, 2025
c96d573
Fix tests
pditommaso Jan 8, 2025
f727005
Fix and improvements
pditommaso Jan 9, 2025
f06256f
Fix OperatorRun allocation
pditommaso Jan 9, 2025
7848791
Add distinc + unique operators
pditommaso Jan 9, 2025
0014db9
Add first operator
pditommaso Jan 9, 2025
2b4c5b3
Add take and last operator
pditommaso Jan 9, 2025
17fced9
Add count operator
pditommaso Jan 9, 2025
e449538
Add sum and mean ops
pditommaso Jan 10, 2025
ba69632
Improve run operator allocation
pditommaso Jan 11, 2025
fee34c7
Bind many
pditommaso Jan 11, 2025
bf06bf6
Add support for buffer operator
pditommaso Jan 11, 2025
6d13fbc
Add mix operator
pditommaso Jan 11, 2025
980cf32
Add join operator
pditommaso Jan 12, 2025
940b3fb
Add combine op
pditommaso Jan 12, 2025
d85702b
Cleanup and fix tests
pditommaso Jan 12, 2025
7183131
Refactor core operator
pditommaso Jan 12, 2025
b26c023
Refactor opertors
pditommaso Jan 12, 2025
8639e4d
Add concat operator
pditommaso Jan 12, 2025
c2a3e95
Add mix operator
pditommaso Jan 12, 2025
4d0379b
Remove chain operator + refactor
pditommaso Jan 13, 2025
a056f8a
Dump threads only with trace log level [ci fast]
pditommaso Jan 14, 2025
e1d3346
Fix tests
pditommaso Jan 14, 2025
f87036a
Fix tests
pditommaso Jan 14, 2025
dbd42ae
Fix tests
pditommaso Jan 14, 2025
4e154bb
Refactor flatMap op [ci fast]
pditommaso Jan 14, 2025
2f643f9
Refactor [ci fast]
pditommaso Jan 14, 2025
84f7806
Add flatten operator [ci fast]
pditommaso Jan 14, 2025
c139bec
Refactor
pditommaso Jan 14, 2025
8f8ff02
Minor change
pditommaso Jan 14, 2025
702705f
Add sum and mean operators
pditommaso Jan 15, 2025
5750f71
Add multimap operator [ci fast]
pditommaso Jan 15, 2025
14484a5
Fix tests [ci fast]
pditommaso Jan 15, 2025
ff74318
Minor [ci fast]
pditommaso Jan 15, 2025
9e5bc10
Add collate operator
pditommaso Jan 15, 2025
6b926f9
Update docs [ci skip]
pditommaso Jan 18, 2025
8637aee
Use a set object in place of list for operator inputs [ci fast]
pditommaso Jan 18, 2025
787349f
Add groupTuple operator
pditommaso Jan 18, 2025
08ac37f
Add compile static to GroupTupleOp class [ci fast]
pditommaso Jan 18, 2025
b8763c1
Merge branch 'master' into task-tracker-v2
pditommaso Jan 19, 2025
61c153c
Add bare minimal docs [ci skip]
pditommaso Jan 19, 2025
c2efcdc
Merge branch 'master' into task-tracker-v2
pditommaso Jan 20, 2025
fadab20
Add until operator [ci fast]
pditommaso Jan 20, 2025
0e30a8f
Add ifEmpty operator [ci fast]
pditommaso Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ class Channel {
def groupChannel = isFlat ? new DataflowQueue<>() : CH.create()

new GroupTupleOp(groupOpts, mapChannel)
.setTarget(groupChannel)
.withTarget(groupChannel)
.apply()

// -- flat the group resulting tuples
Expand Down
4 changes: 3 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package nextflow


import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
Expand Down Expand Up @@ -53,6 +52,7 @@ import nextflow.processor.ErrorStrategy
import nextflow.processor.TaskFault
import nextflow.processor.TaskHandler
import nextflow.processor.TaskProcessor
import nextflow.prov.Tracker
import nextflow.script.BaseScript
import nextflow.script.ProcessConfig
import nextflow.script.ProcessFactory
Expand Down Expand Up @@ -857,6 +857,8 @@ class Session implements ISession {

DAG getDag() { this.dag }

Tracker getProvenance() { provenance }

ExecutorService getExecService() { execService }

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ class BashWrapperBuilder {
log.warn "Invalid value for `NXF_DEBUG` variable: $str -- See http://www.nextflow.io/docs/latest/config.html#environment-variables"
}
BASH = Collections.unmodifiableList( level > 0 ? ['/bin/bash','-uex'] : ['/bin/bash','-ue'] )

}


@Delegate
ScriptFileCopyStrategy copyStrategy

Expand Down Expand Up @@ -480,6 +478,7 @@ class BashWrapperBuilder {
protected String getTaskMetadata() {
final lines = new StringBuilder()
lines << '### ---\n'
lines << "### id: '${bean.taskId}'\n"
lines << "### name: '${bean.name}'\n"
if( bean.arrayIndexName ) {
lines << '### array:\n'
Expand All @@ -493,12 +492,18 @@ class BashWrapperBuilder {
if( containerConfig?.isEnabled() )
lines << "### container: '${bean.containerImage}'\n"

if( outputFiles.size() > 0 ) {
if( outputFiles ) {
lines << '### outputs:\n'
for( final output : bean.outputFiles )
lines << "### - '${output}'\n"
}

if( bean.upstreamTasks ) {
lines << '### upstream-tasks:\n'
for( final it : bean.upstreamTasks )
lines << "### - '${it}'\n"
}

lines << '### ...\n'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import groovy.transform.CompileStatic
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.expression.DataflowExpression
import groovyx.gpars.dataflow.operator.DataflowProcessor
import nextflow.Channel
import nextflow.extension.op.Op
import nextflow.script.ChannelOut
import nextflow.script.TokenBranchChoice
import nextflow.script.TokenBranchDef
Expand Down Expand Up @@ -50,20 +52,21 @@ class BranchOp {

ChannelOut getOutput() { this.output }

protected void doNext(it) {
protected void doNext(DataflowProcessor dp, Object it) {
TokenBranchChoice ret = switchDef.closure.call(it)
if( ret ) {
targets[ret.choice].bind(ret.value)
Op.bind(dp, targets[ret.choice], ret.value)
}
}

protected void doComplete(nope) {
protected void doComplete(DataflowProcessor dp) {
for( DataflowWriteChannel ch : targets.values() ) {
if( ch instanceof DataflowExpression ) {
if( !ch.isBound()) ch.bind(Channel.STOP)
if( !ch.isBound() )
Op.bind(dp, ch, Channel.STOP)
}
else {
ch.bind(Channel.STOP)
Op.bind(dp, ch, Channel.STOP)
}
}
}
Expand All @@ -72,7 +75,10 @@ class BranchOp {
def events = new HashMap<String,Closure>(2)
events.put('onNext', this.&doNext)
events.put('onComplete', this.&doComplete)
DataflowHelper.subscribeImpl(source, events)
new SubscribeOp()
.withSource(source)
.withEvents(events)
.apply()
return this
}

Expand Down
42 changes: 27 additions & 15 deletions modules/nextflow/src/main/groovy/nextflow/extension/BufferOp.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package nextflow.extension

import static nextflow.extension.DataflowHelper.*
import static nextflow.util.CheckHelper.*

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -28,10 +31,11 @@ import groovyx.gpars.dataflow.operator.PoisonPill
import nextflow.Channel
import nextflow.Global
import nextflow.Session
import nextflow.extension.op.ContextGrouping
import nextflow.extension.op.Op
import org.codehaus.groovy.runtime.callsite.BooleanReturningMethodInvoker
import static nextflow.extension.DataflowHelper.newOperator
import static nextflow.util.CheckHelper.checkParams
/**
* Implements the "buffer" operator
*
* @author Paolo Di Tommaso <[email protected]>
*/
Expand Down Expand Up @@ -160,20 +164,16 @@ class BufferOp {
final listener = new DataflowEventAdapter() {

@Override
Object controlMessageArrived(final DataflowProcessor processor, final DataflowReadChannel<Object> channel, final int index, final Object message) {
Object controlMessageArrived(final DataflowProcessor dp, final DataflowReadChannel<Object> channel, final int index, final Object message) {
if( message instanceof PoisonPill && remainder && buffer.size() ) {
target.bind(buffer)
Op.bind(dp, target, buffer)
}
return message
}

@Override
void afterRun(DataflowProcessor processor, List<Object> messages) {
if( !stopOnFirst )
return
if( remainder && buffer)
target.bind(buffer)
target.bind(Channel.STOP)
void afterStop(DataflowProcessor dp) {
Op.bind(dp, target, Channel.STOP)
}

@Override
Expand All @@ -187,24 +187,36 @@ class BufferOp {
// -- open frame flag
boolean isOpen = startingCriteria == null

// -- the operator collecting the elements
newOperator( source, target, listener ) {
// -- op code
final code = {
if( isOpen ) {
buffer << it
}
else if( startingCriteria.call(it) ) {
isOpen = true
buffer << it
}

final dp = getDelegate() as DataflowProcessor
if( closeCriteria.call(it) ) {
((DataflowProcessor) getDelegate()).bindOutput(buffer);
Op.bind(dp, target, buffer)
buffer = []
// when a *startingCriteria* is defined, close the open frame flag
isOpen = (startingCriteria == null)
}

if( stopOnFirst ) {
if( remainder && buffer )
Op.bind(dp, target, buffer)
dp.terminate()
}
}

// -- the operator collecting the elements
new Op()
.withInput(source)
.withListener(listener)
.withContext(new ContextGrouping())
.withCode(code)
.apply()
}

}
86 changes: 86 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/extension/ChainOp.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.extension

import groovy.transform.CompileStatic
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import groovyx.gpars.dataflow.operator.ChainWithClosure
import groovyx.gpars.dataflow.operator.DataflowEventListener
import nextflow.extension.op.Op
/**
* Implements the chain operator
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
class ChainOp {

private DataflowReadChannel source
private DataflowWriteChannel target
private List<DataflowEventListener> listeners = List.of()
private Closure action

static ChainOp create() {
new ChainOp()
}

ChainOp withSource(DataflowReadChannel source) {
assert source
this.source = source
return this
}

ChainOp withTarget(DataflowWriteChannel target) {
assert target
this.target = target
return this
}

ChainOp withListener(DataflowEventListener listener) {
assert listener != null
this.listeners = List.of(listener)
return this
}

ChainOp withListeners(List<DataflowEventListener> listeners) {
assert listeners != null
this.listeners = listeners
return this
}

ChainOp withAction(Closure action) {
this.action = action
return this
}

DataflowWriteChannel apply() {
assert source
assert target
assert action

new Op()
.withInput(source)
.withOutput(target)
.withListeners(listeners)
.withCode(new ChainWithClosure(action))
.apply()

return target
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@

package nextflow.extension

import static nextflow.util.LoggerHelper.*

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.agent.Agent
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.NF
import nextflow.dag.NodeMarker
import nextflow.exception.ScriptRuntimeException
import nextflow.prov.Tracker
import nextflow.script.ChainableDef
import nextflow.script.ChannelOut
import nextflow.script.ComponentDef
import nextflow.script.CompositeDef
import nextflow.script.ExecutionStack
import org.codehaus.groovy.runtime.InvokerHelper
import static nextflow.util.LoggerHelper.fmtType

/**
* Implements dataflow channel extension methods
*
Expand Down Expand Up @@ -207,4 +207,11 @@ class ChannelEx {
left.add(right)
}

static Object unwrap(DataflowReadChannel self) {
final result = self.getVal()
return result instanceof Tracker.Msg
? result.value
: result
}

}
Loading
Loading