Skip to content

Commit

Permalink
Record NO_REPLACEMENT as step output for unspecified value
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Dec 18, 2024
1 parent 20d904e commit c65498f
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 41 deletions.
2 changes: 1 addition & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8108,7 +8108,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime):
tool_errors: Mapped[Optional[bytes]] = mapped_column(JSONType)
position: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
config: Mapped[Optional[bytes]] = mapped_column(JSONType)
order_index: Mapped[Optional[int]]
order_index: Mapped[int]
when_expression: Mapped[Optional[bytes]] = mapped_column(JSONType)
uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType)
label: Mapped[Optional[str]] = mapped_column(Unicode(255))
Expand Down
19 changes: 17 additions & 2 deletions lib/galaxy/tools/parameters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from .workflow_utils import (
is_runtime_value,
NO_REPLACEMENT,
runtime_to_json,
)
from .wrapped import flat_to_nested_state
Expand Down Expand Up @@ -180,8 +181,22 @@ def callback_helper(input, input_values, name_prefix, label_prefix, parent_prefi
replace = new_value != no_replacement_value
if replace:
input_values[input.name] = new_value
elif replace_optional_connections and is_runtime_value(value) and hasattr(input, "value"):
input_values[input.name] = input.value
elif replace_optional_connections:
# Only used in workflow context
has_default = hasattr(input, "value")
if new_value is value is NO_REPLACEMENT:
# NO_REPLACEMENT means value was connected but left unspecified
if has_default:
# Use default if we have one
input_values[input.name] = input.value
else:
# Should fail if input is not optional and does not have default value
# Effectively however depends on parameter implementation.
# We might want to raise an exception here, instead of depending on a tool parameter value error.
input_values[input.name] = None

elif is_runtime_value(value) and has_default:
input_values[input.name] = input.value

def get_current_case(input, input_values):
test_parameter = input.test_param
Expand Down
14 changes: 11 additions & 3 deletions lib/galaxy/tools/parameters/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
ParameterParseException,
text_input_is_optional,
)
from galaxy.tools.parameters.workflow_utils import workflow_building_modes
from galaxy.tools.parameters.workflow_utils import (
NO_REPLACEMENT,
workflow_building_modes,
)
from galaxy.util import (
sanitize_param,
string_as_bool,
Expand Down Expand Up @@ -247,6 +250,8 @@ def to_python(self, value, app):
def value_to_basic(self, value, app, use_security=False):
if is_runtime_value(value):
return runtime_to_json(value)
elif value == NO_REPLACEMENT:
return {"__class__": "NoReplacement"}
return self.to_json(value, app, use_security)

def value_from_basic(self, value, app, ignore_errors=False):
Expand All @@ -255,8 +260,11 @@ def value_from_basic(self, value, app, ignore_errors=False):
if isinstance(self, HiddenToolParameter):
raise ParameterValueError(message_suffix="Runtime Parameter not valid", parameter_name=self.name)
return runtime_to_object(value)
elif isinstance(value, MutableMapping) and value.get("__class__") == "UnvalidatedValue":
return value["value"]
elif isinstance(value, MutableMapping):
if value.get("__class__") == "UnvalidatedValue":
return value["value"]
elif value.get("__class__") == "NoReplacement":
return NO_REPLACEMENT
# Delegate to the 'to_python' method
if ignore_errors:
try:
Expand Down
9 changes: 9 additions & 0 deletions lib/galaxy/tools/parameters/workflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from collections.abc import MutableMapping


class NoReplacement:

def __str__(self):
return "NO_REPLACEMENT singleton"


NO_REPLACEMENT = NoReplacement()


class workflow_building_modes:
DISABLED = False
ENABLED = True
Expand Down
24 changes: 11 additions & 13 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
from galaxy.tools.parameters.workflow_utils import (
ConnectedValue,
is_runtime_value,
NO_REPLACEMENT,
NoReplacement,
runtime_to_json,
workflow_building_modes,
)
Expand Down Expand Up @@ -129,14 +131,6 @@
POSSIBLE_PARAMETER_TYPES: Tuple[INPUT_PARAMETER_TYPES] = get_args(INPUT_PARAMETER_TYPES)


class NoReplacement:
def __str__(self):
return "NO_REPLACEMENT singleton"


NO_REPLACEMENT = NoReplacement()


class ConditionalStepWhen(BooleanToolParameter):
pass

Expand Down Expand Up @@ -954,7 +948,7 @@ class InputModule(WorkflowModule):

def get_runtime_state(self):
state = DefaultToolState()
state.inputs = dict(input=None)
state.inputs = dict(input=NO_REPLACEMENT)
return state

def get_all_inputs(self, data_only=False, connectable_only=False):
Expand All @@ -966,7 +960,7 @@ def execute(
invocation = invocation_step.workflow_invocation
step = invocation_step.workflow_step
input_value = step.state.inputs["input"]
if input_value is None:
if input_value is NO_REPLACEMENT:
default_value = step.get_input_default_value(NO_REPLACEMENT)
if default_value is not NO_REPLACEMENT:
input_value = raw_to_galaxy(trans.app, trans.history, default_value)
Expand Down Expand Up @@ -1582,7 +1576,7 @@ def _parameter_def_list_to_options(parameter_value):

def get_runtime_state(self):
state = DefaultToolState()
state.inputs = dict(input=None)
state.inputs = dict(input=NO_REPLACEMENT)
return state

def get_all_outputs(self, data_only=False):
Expand All @@ -1609,7 +1603,7 @@ def execute(
input_value = progress.inputs_by_step_id[step.id]
else:
input_value = step.state.inputs["input"]
if input_value is None:
if input_value is NO_REPLACEMENT:
default_value = step.get_input_default_value(NO_REPLACEMENT)
# TODO: look at parameter type and infer if value should be a dictionary
# instead. Guessing only field parameter types in CWL branch would have
Expand Down Expand Up @@ -2266,7 +2260,11 @@ def decode_runtime_state(self, step, runtime_state):
)

def execute(
self, trans, progress: "WorkflowProgress", invocation_step, use_cached_job: bool = False
self,
trans,
progress: "WorkflowProgress",
invocation_step: "WorkflowInvocationStep",
use_cached_job: bool = False,
) -> Optional[bool]:
invocation = invocation_step.workflow_invocation
step = invocation_step.workflow_step
Expand Down
6 changes: 2 additions & 4 deletions lib/galaxy/workflow/refactor/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from galaxy.tools.parameters.basic import contains_workflow_parameter
from galaxy.tools.parameters.workflow_utils import (
ConnectedValue,
NO_REPLACEMENT,
runtime_to_json,
)
from .schema import (
Expand Down Expand Up @@ -41,10 +42,7 @@
UpgradeSubworkflowAction,
UpgradeToolAction,
)
from ..modules import (
InputParameterModule,
NO_REPLACEMENT,
)
from ..modules import InputParameterModule

log = logging.getLogger(__name__)

Expand Down
30 changes: 14 additions & 16 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import uuid
from collections.abc import MutableMapping
from typing import (
Any,
Dict,
Expand Down Expand Up @@ -37,6 +38,10 @@
WarningReason,
)
from galaxy.tools.parameters.basic import raw_to_galaxy
from galaxy.tools.parameters.workflow_utils import (
NO_REPLACEMENT,
NoReplacement,
)
from galaxy.tools.parameters.wrapped import nested_key_to_path
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
Expand Down Expand Up @@ -432,11 +437,11 @@ def remaining_steps(

def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[str, Any]):
replacement: Union[
modules.NoReplacement,
NoReplacement,
model.DatasetCollectionInstance,
List[model.DatasetCollectionInstance],
HistoryItem,
] = modules.NO_REPLACEMENT
] = NO_REPLACEMENT
prefixed_name = input_dict["name"]
multiple = input_dict["multiple"]
is_data = input_dict["input_type"] in ["dataset", "dataset_collection"]
Expand Down Expand Up @@ -494,6 +499,8 @@ def replacement_for_connection(self, connection: "WorkflowStepConnection", is_da
dependent_workflow_step_id=output_step_id,
)
)
if isinstance(replacement, MutableMapping) and replacement.get("__class__") == "NoReplacement":
return NO_REPLACEMENT
if isinstance(replacement, model.HistoryDatasetCollectionAssociation):
if not replacement.collection.populated:
if not replacement.waiting_for_elements:
Expand Down Expand Up @@ -574,19 +581,8 @@ def set_outputs_for_input(
if self.inputs_by_step_id:
step_id = step.id
if step_id not in self.inputs_by_step_id and "output" not in outputs:
default_value = step.get_input_default_value(modules.NO_REPLACEMENT)
if default_value is not modules.NO_REPLACEMENT:
outputs["output"] = default_value
else:
log.error(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}")
raise modules.FailWorkflowEvaluation(
why=InvocationFailureOutputNotFound(
reason=FailureReason.output_not_found,
workflow_step_id=invocation_step.workflow_step_id,
output_name="output",
dependent_workflow_step_id=invocation_step.workflow_step_id,
)
)
default_value = step.get_input_default_value(NO_REPLACEMENT)
outputs["output"] = default_value
elif step_id in self.inputs_by_step_id:
if self.inputs_by_step_id[step_id] is not None or "output" not in outputs:
outputs["output"] = self.inputs_by_step_id[step_id]
Expand Down Expand Up @@ -620,7 +616,7 @@ def set_step_outputs(
# Add this non-data, non workflow-output output to the workflow outputs.
# This is required for recovering the output in the next scheduling iteration,
# and should be replaced with a WorkflowInvocationStepOutputValue ASAP.
if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT:
if not workflow_outputs_by_name.get(output_name):
workflow_output = model.WorkflowOutput(step, output_name=output_name)
step.workflow_outputs.append(workflow_output)
for workflow_output in step.workflow_outputs:
Expand All @@ -645,6 +641,8 @@ def set_step_outputs(
)

def _record_workflow_output(self, step: "WorkflowStep", workflow_output: "WorkflowOutput", output: Any) -> None:
if output is NO_REPLACEMENT:
output = {"__class__": "NoReplacement"}
self.workflow_invocation.add_output(workflow_output, step, output)

def mark_step_outputs_delayed(self, step: "WorkflowStep", why: Optional[str] = None) -> None:
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy_test/workflow/default_values.gxwf-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
- that: has_text
text: "1"
- doc: |
Test that null is replaced with default value (follows https://www.commonwl.org/v1.2/Workflow.html#WorkflowInputParameter)
Test that explicit null is not replaced and fails
expect_failure: true
job:
required_int_with_default:
type: raw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
- that: has_text
text: "1"
- doc: |
Test that null is replaced with default value (follows https://www.commonwl.org/v1.2/Workflow.html#WorkflowInputParameter)
Test that explicit null is not replaced and fails
expect_failure: true
job:
optional_int_with_default:
type: raw
Expand Down

0 comments on commit c65498f

Please sign in to comment.