diff --git a/haystack_experimental/__init__.py b/haystack_experimental/__init__.py index c1764a6e..807bee58 100644 --- a/haystack_experimental/__init__.py +++ b/haystack_experimental/__init__.py @@ -1,3 +1,5 @@ # SPDX-FileCopyrightText: 2022-present deepset GmbH # # SPDX-License-Identifier: Apache-2.0 + +from .core import Pipeline, AsyncPipeline, run_async_pipeline \ No newline at end of file diff --git a/haystack_experimental/core/__init__.py b/haystack_experimental/core/__init__.py index efafd8c9..8395ba39 100644 --- a/haystack_experimental/core/__init__.py +++ b/haystack_experimental/core/__init__.py @@ -3,5 +3,6 @@ # SPDX-License-Identifier: Apache-2.0 from .pipeline import AsyncPipeline, run_async_pipeline +from .pipeline import Pipeline -_all_ = ["AsyncPipeline", "run_async_pipeline"] +_all_ = ["AsyncPipeline", "run_async_pipeline", "Pipeline"] diff --git a/haystack_experimental/core/pipeline/__init__.py b/haystack_experimental/core/pipeline/__init__.py index c755a510..1ecc73bb 100644 --- a/haystack_experimental/core/pipeline/__init__.py +++ b/haystack_experimental/core/pipeline/__init__.py @@ -3,5 +3,6 @@ # SPDX-License-Identifier: Apache-2.0 from .async_pipeline import AsyncPipeline, run_async_pipeline +from .pipeline import Pipeline -__all__ = ["AsyncPipeline", "run_async_pipeline"] +__all__ = ["AsyncPipeline", "run_async_pipeline", "Pipeline"] diff --git a/haystack_experimental/core/pipeline/base.py b/haystack_experimental/core/pipeline/base.py new file mode 100644 index 00000000..d5db7bde --- /dev/null +++ b/haystack_experimental/core/pipeline/base.py @@ -0,0 +1,838 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import itertools +from collections import defaultdict +from copy import deepcopy +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Iterator, List, Optional, TextIO, Tuple, Type, TypeVar, Union + +from haystack import logging +from haystack.core.component import Component, InputSocket, OutputSocket, component +from haystack.core.errors import ( + DeserializationError, + PipelineConnectError, + PipelineDrawingError, + PipelineError, + PipelineUnmarshalError, + PipelineValidationError, +) +from haystack.core.pipeline.descriptions import find_pipeline_inputs, find_pipeline_outputs +from haystack.core.pipeline.draw import _to_mermaid_image +from haystack.core.pipeline.template import PipelineTemplate, PredefinedPipeline +from haystack.core.pipeline.utils import parse_connect_string +from haystack.core.serialization import DeserializationCallbacks, component_from_dict, component_to_dict +from haystack.core.type_utils import _type_name, _types_are_compatible +from haystack.marshal import Marshaller, YamlMarshaller +from haystack.utils import is_in_jupyter, type_serialization +from networkx import MultiDiGraph # type:ignore + +DEFAULT_MARSHALLER = YamlMarshaller() + +# We use a generic type to annotate the return value of classmethods, +# so that static analyzers won't be confused when derived classes +# use those methods. +T = TypeVar("T", bound="PipelineBase") + +logger = logging.getLogger(__name__) + + +class PipelineBase: + """ + Components orchestration engine. + + Builds a graph of components and orchestrates their execution according to the execution graph. + """ + + def __init__(self, metadata: Optional[Dict[str, Any]] = None, max_runs_per_component: int = 100): + """ + Creates the Pipeline. + + :param metadata: + Arbitrary dictionary to store metadata about this `Pipeline`. Make sure all the values contained in + this dictionary can be serialized and deserialized if you wish to save this `Pipeline` to file. + :param max_runs_per_component: + How many times the `Pipeline` can run the same Component. + If this limit is reached a `PipelineMaxComponentRuns` exception is raised. + If not set defaults to 100 runs per Component. + """ + self._telemetry_runs = 0 + self._last_telemetry_sent: Optional[datetime] = None + self.metadata = metadata or {} + self.graph = MultiDiGraph() + self._max_runs_per_component = max_runs_per_component + + def __eq__(self, other) -> bool: + """ + Pipeline equality is defined by their type and the equality of their serialized form. + + Pipelines of the same type share every metadata, node and edge, but they're not required to use + the same node instances: this allows pipeline saved and then loaded back to be equal to themselves. + """ + if not isinstance(self, type(other)): + return False + return self.to_dict() == other.to_dict() + + def __repr__(self) -> str: + """ + Returns a text representation of the Pipeline. + """ + res = f"{object.__repr__(self)}\n" + if self.metadata: + res += "🧱 Metadata\n" + for k, v in self.metadata.items(): + res += f" - {k}: {v}\n" + + res += "🚅 Components\n" + for name, instance in self.graph.nodes(data="instance"): # type: ignore # type wrongly defined in networkx + res += f" - {name}: {instance.__class__.__name__}\n" + + res += "🛤️ Connections\n" + for sender, receiver, edge_data in self.graph.edges(data=True): + sender_socket = edge_data["from_socket"].name + receiver_socket = edge_data["to_socket"].name + res += f" - {sender}.{sender_socket} -> {receiver}.{receiver_socket} ({edge_data['conn_type']})\n" + + return res + + def to_dict(self) -> Dict[str, Any]: + """ + Serializes the pipeline to a dictionary. + + This is meant to be an intermediate representation but it can be also used to save a pipeline to file. + + :returns: + Dictionary with serialized data. + """ + components = {} + for name, instance in self.graph.nodes(data="instance"): # type:ignore + components[name] = component_to_dict(instance, name) + + connections = [] + for sender, receiver, edge_data in self.graph.edges.data(): + sender_socket = edge_data["from_socket"].name + receiver_socket = edge_data["to_socket"].name + connections.append({"sender": f"{sender}.{sender_socket}", "receiver": f"{receiver}.{receiver_socket}"}) + return { + "metadata": self.metadata, + "max_runs_per_component": self._max_runs_per_component, + "components": components, + "connections": connections, + } + + @classmethod + def from_dict( # noqa: PLR0912 + cls: Type[T], data: Dict[str, Any], callbacks: Optional[DeserializationCallbacks] = None, **kwargs + ) -> T: + """ + Deserializes the pipeline from a dictionary. + + :param data: + Dictionary to deserialize from. + :param callbacks: + Callbacks to invoke during deserialization. + :param kwargs: + `components`: a dictionary of {name: instance} to reuse instances of components instead of creating new + ones. + :returns: + Deserialized component. + """ + data_copy = deepcopy(data) # to prevent modification of original data + metadata = data_copy.get("metadata", {}) + max_runs_per_component = data_copy.get("max_runs_per_component", 100) + pipe = cls(metadata=metadata, max_runs_per_component=max_runs_per_component) + components_to_reuse = kwargs.get("components", {}) + for name, component_data in data_copy.get("components", {}).items(): + if name in components_to_reuse: + # Reuse an instance + instance = components_to_reuse[name] + else: + if "type" not in component_data: + raise PipelineError(f"Missing 'type' in component '{name}'") + + if component_data["type"] not in component.registry: + try: + # Import the module first... + module, _ = component_data["type"].rsplit(".", 1) + logger.debug("Trying to import module {module_name}", module_name=module) + type_serialization.thread_safe_import(module) + # ...then try again + if component_data["type"] not in component.registry: + raise PipelineError( + f"Successfully imported module {module} but can't find it in the component registry." + "This is unexpected and most likely a bug." + ) + except (ImportError, PipelineError) as e: + raise PipelineError(f"Component '{component_data['type']}' not imported.") from e + + # Create a new one + component_class = component.registry[component_data["type"]] + + try: + instance = component_from_dict(component_class, component_data, name, callbacks) + except Exception as e: + msg = ( + f"Couldn't deserialize component '{name}' of class '{component_class.__name__}' " + f"with the following data: {str(component_data)}. Possible reasons include " + "malformed serialized data, mismatch between the serialized component and the " + "loaded one (due to a breaking change, see " + "https://github.com/deepset-ai/haystack/releases), etc." + ) + raise DeserializationError(msg) from e + pipe.add_component(name=name, instance=instance) + + for connection in data.get("connections", []): + if "sender" not in connection: + raise PipelineError(f"Missing sender in connection: {connection}") + if "receiver" not in connection: + raise PipelineError(f"Missing receiver in connection: {connection}") + pipe.connect(sender=connection["sender"], receiver=connection["receiver"]) + + return pipe + + def dumps(self, marshaller: Marshaller = DEFAULT_MARSHALLER) -> str: + """ + Returns the string representation of this pipeline according to the format dictated by the `Marshaller` in use. + + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. + :returns: + A string representing the pipeline. + """ + return marshaller.marshal(self.to_dict()) + + def dump(self, fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER): + """ + Writes the string representation of this pipeline to the file-like object passed in the `fp` argument. + + :param fp: + A file-like object ready to be written to. + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. + """ + fp.write(marshaller.marshal(self.to_dict())) + + @classmethod + def loads( + cls: Type[T], + data: Union[str, bytes, bytearray], + marshaller: Marshaller = DEFAULT_MARSHALLER, + callbacks: Optional[DeserializationCallbacks] = None, + ) -> T: + """ + Creates a `Pipeline` object from the string representation passed in the `data` argument. + + :param data: + The string representation of the pipeline, can be `str`, `bytes` or `bytearray`. + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. + :param callbacks: + Callbacks to invoke during deserialization. + :raises DeserializationError: + If an error occurs during deserialization. + :returns: + A `Pipeline` object. + """ + try: + deserialized_data = marshaller.unmarshal(data) + except Exception as e: + raise DeserializationError( + "Error while unmarshalling serialized pipeline data. This is usually " + "caused by malformed or invalid syntax in the serialized representation." + ) from e + + return cls.from_dict(deserialized_data, callbacks) + + @classmethod + def load( + cls: Type[T], + fp: TextIO, + marshaller: Marshaller = DEFAULT_MARSHALLER, + callbacks: Optional[DeserializationCallbacks] = None, + ) -> T: + """ + Creates a `Pipeline` object a string representation. + + The string representation is read from the file-like object passed in the `fp` argument. + + + :param fp: + A file-like object ready to be read from. + :param marshaller: + The Marshaller used to create the string representation. Defaults to `YamlMarshaller`. + :param callbacks: + Callbacks to invoke during deserialization. + :raises DeserializationError: + If an error occurs during deserialization. + :returns: + A `Pipeline` object. + """ + return cls.loads(fp.read(), marshaller, callbacks) + + def add_component(self, name: str, instance: Component) -> None: + """ + Add the given component to the pipeline. + + Components are not connected to anything by default: use `Pipeline.connect()` to connect components together. + Component names must be unique, but component instances can be reused if needed. + + :param name: + The name of the component to add. + :param instance: + The component instance to add. + + :raises ValueError: + If a component with the same name already exists. + :raises PipelineValidationError: + If the given instance is not a Canals component. + """ + # Component names are unique + if name in self.graph.nodes: + raise ValueError(f"A component named '{name}' already exists in this pipeline: choose another name.") + + # Components can't be named `_debug` + if name == "_debug": + raise ValueError("'_debug' is a reserved name for debug output. Choose another name.") + + # Component instances must be components + if not isinstance(instance, Component): + raise PipelineValidationError( + f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?" + ) + + if getattr(instance, "__haystack_added_to_pipeline__", None): + msg = ( + "Component has already been added in another Pipeline. Components can't be shared between Pipelines. " + "Create a new instance instead." + ) + raise PipelineError(msg) + + setattr(instance, "__haystack_added_to_pipeline__", self) + + # Add component to the graph, disconnected + logger.debug("Adding component '{component_name}' ({component})", component_name=name, component=instance) + # We're completely sure the fields exist so we ignore the type error + self.graph.add_node( + name, + instance=instance, + input_sockets=instance.__haystack_input__._sockets_dict, # type: ignore[attr-defined] + output_sockets=instance.__haystack_output__._sockets_dict, # type: ignore[attr-defined] + visits=0, + ) + + def remove_component(self, name: str) -> Component: + """ + Remove and returns component from the pipeline. + + Remove an existing component from the pipeline by providing its name. + All edges that connect to the component will also be deleted. + + :param name: + The name of the component to remove. + :returns: + The removed Component instance. + + :raises ValueError: + If there is no component with that name already in the Pipeline. + """ + + # Check that a component with that name is in the Pipeline + try: + instance = self.get_component(name) + except ValueError as exc: + raise ValueError( + f"There is no component named '{name}' in the pipeline. The valid component names are: ", + ", ".join(n for n in self.graph.nodes), + ) from exc + + # Delete component from the graph, deleting all its connections + self.graph.remove_node(name) + + # Reset the Component sockets' senders and receivers + input_sockets = instance.__haystack_input__._sockets_dict # type: ignore[attr-defined] + for socket in input_sockets.values(): + socket.senders = [] + + output_sockets = instance.__haystack_output__._sockets_dict # type: ignore[attr-defined] + for socket in output_sockets.values(): + socket.receivers = [] + + # Reset the Component's pipeline reference + setattr(instance, "__haystack_added_to_pipeline__", None) + + return instance + + def connect(self, sender: str, receiver: str) -> "PipelineBase": # noqa: PLR0915 PLR0912 + """ + Connects two components together. + + All components to connect must exist in the pipeline. + If connecting to a component that has several output connections, specify the inputs and output names as + 'component_name.connections_name'. + + :param sender: + The component that delivers the value. This can be either just a component name or can be + in the format `component_name.connection_name` if the component has multiple outputs. + :param receiver: + The component that receives the value. This can be either just a component name or can be + in the format `component_name.connection_name` if the component has multiple inputs. + :returns: + The Pipeline instance. + + :raises PipelineConnectError: + If the two components cannot be connected (for example if one of the components is + not present in the pipeline, or the connections don't match by type, and so on). + """ + # Edges may be named explicitly by passing 'node_name.edge_name' to connect(). + sender_component_name, sender_socket_name = parse_connect_string(sender) + receiver_component_name, receiver_socket_name = parse_connect_string(receiver) + + if sender_component_name == receiver_component_name: + raise PipelineConnectError("Connecting a Component to itself is not supported.") + + # Get the nodes data. + try: + from_sockets = self.graph.nodes[sender_component_name]["output_sockets"] + except KeyError as exc: + raise ValueError(f"Component named {sender_component_name} not found in the pipeline.") from exc + try: + to_sockets = self.graph.nodes[receiver_component_name]["input_sockets"] + except KeyError as exc: + raise ValueError(f"Component named {receiver_component_name} not found in the pipeline.") from exc + + # If the name of either socket is given, get the socket + sender_socket: Optional[OutputSocket] = None + if sender_socket_name: + sender_socket = from_sockets.get(sender_socket_name) + if not sender_socket: + raise PipelineConnectError( + f"'{sender} does not exist. " + f"Output connections of {sender_component_name} are: " + + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in from_sockets.items()]) + ) + + receiver_socket: Optional[InputSocket] = None + if receiver_socket_name: + receiver_socket = to_sockets.get(receiver_socket_name) + if not receiver_socket: + raise PipelineConnectError( + f"'{receiver} does not exist. " + f"Input connections of {receiver_component_name} are: " + + ", ".join([f"{name} (type {_type_name(socket.type)})" for name, socket in to_sockets.items()]) + ) + + # Look for a matching connection among the possible ones. + # Note that if there is more than one possible connection but two sockets match by name, they're paired. + sender_socket_candidates: List[OutputSocket] = [sender_socket] if sender_socket else list(from_sockets.values()) + receiver_socket_candidates: List[InputSocket] = ( + [receiver_socket] if receiver_socket else list(to_sockets.values()) + ) + + # Find all possible connections between these two components + possible_connections = [ + (sender_sock, receiver_sock) + for sender_sock, receiver_sock in itertools.product(sender_socket_candidates, receiver_socket_candidates) + if _types_are_compatible(sender_sock.type, receiver_sock.type) + ] + + # We need this status for error messages, since we might need it in multiple places we calculate it here + status = _connections_status( + sender_node=sender_component_name, + sender_sockets=sender_socket_candidates, + receiver_node=receiver_component_name, + receiver_sockets=receiver_socket_candidates, + ) + + if not possible_connections: + # There's no possible connection between these two components + if len(sender_socket_candidates) == len(receiver_socket_candidates) == 1: + msg = ( + f"Cannot connect '{sender_component_name}.{sender_socket_candidates[0].name}' with " + f"'{receiver_component_name}.{receiver_socket_candidates[0].name}': " + f"their declared input and output types do not match.\n{status}" + ) + else: + msg = ( + f"Cannot connect '{sender_component_name}' with '{receiver_component_name}': " + f"no matching connections available.\n{status}" + ) + raise PipelineConnectError(msg) + + if len(possible_connections) == 1: + # There's only one possible connection, use it + sender_socket = possible_connections[0][0] + receiver_socket = possible_connections[0][1] + + if len(possible_connections) > 1: + # There are multiple possible connection, let's try to match them by name + name_matches = [ + (out_sock, in_sock) for out_sock, in_sock in possible_connections if in_sock.name == out_sock.name + ] + if len(name_matches) != 1: + # There's are either no matches or more than one, we can't pick one reliably + msg = ( + f"Cannot connect '{sender_component_name}' with " + f"'{receiver_component_name}': more than one connection is possible " + "between these components. Please specify the connection name, like: " + f"pipeline.connect('{sender_component_name}.{possible_connections[0][0].name}', " + f"'{receiver_component_name}.{possible_connections[0][1].name}').\n{status}" + ) + raise PipelineConnectError(msg) + + # Get the only possible match + sender_socket = name_matches[0][0] + receiver_socket = name_matches[0][1] + + # Connection must be valid on both sender/receiver sides + if not sender_socket or not receiver_socket or not sender_component_name or not receiver_component_name: + if sender_component_name and sender_socket: + sender_repr = f"{sender_component_name}.{sender_socket.name} ({_type_name(sender_socket.type)})" + else: + sender_repr = "input needed" + + if receiver_component_name and receiver_socket: + receiver_repr = f"({_type_name(receiver_socket.type)}) {receiver_component_name}.{receiver_socket.name}" + else: + receiver_repr = "output" + msg = f"Connection must have both sender and receiver: {sender_repr} -> {receiver_repr}" + raise PipelineConnectError(msg) + + logger.debug( + "Connecting '{sender_component}.{sender_socket_name}' to '{receiver_component}.{receiver_socket_name}'", + sender_component=sender_component_name, + sender_socket_name=sender_socket.name, + receiver_component=receiver_component_name, + receiver_socket_name=receiver_socket.name, + ) + + if receiver_component_name in sender_socket.receivers and sender_component_name in receiver_socket.senders: + # This is already connected, nothing to do + return self + + if receiver_socket.senders and not receiver_socket.is_variadic: + # Only variadic input sockets can receive from multiple senders + msg = ( + f"Cannot connect '{sender_component_name}.{sender_socket.name}' with " + f"'{receiver_component_name}.{receiver_socket.name}': " + f"{receiver_component_name}.{receiver_socket.name} is already connected to {receiver_socket.senders}.\n" + ) + raise PipelineConnectError(msg) + + # Update the sockets with the new connection + sender_socket.receivers.append(receiver_component_name) + receiver_socket.senders.append(sender_component_name) + + # Create the new connection + self.graph.add_edge( + sender_component_name, + receiver_component_name, + key=f"{sender_socket.name}/{receiver_socket.name}", + conn_type=_type_name(sender_socket.type), + from_socket=sender_socket, + to_socket=receiver_socket, + mandatory=receiver_socket.is_mandatory, + ) + return self + + def get_component(self, name: str) -> Component: + """ + Get the component with the specified name from the pipeline. + + :param name: + The name of the component. + :returns: + The instance of that component. + + :raises ValueError: + If a component with that name is not present in the pipeline. + """ + try: + return self.graph.nodes[name]["instance"] + except KeyError as exc: + raise ValueError(f"Component named {name} not found in the pipeline.") from exc + + def get_component_name(self, instance: Component) -> str: + """ + Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise. + + :param instance: + The Component instance to look for. + :returns: + The name of the Component instance. + """ + for name, inst in self.graph.nodes(data="instance"): # type: ignore # type wrongly defined in networkx + if inst == instance: + return name + return "" + + def inputs(self, include_components_with_connected_inputs: bool = False) -> Dict[str, Dict[str, Any]]: + """ + Returns a dictionary containing the inputs of a pipeline. + + Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes + the input sockets of that component, including their types and whether they are optional. + + :param include_components_with_connected_inputs: + If `False`, only components that have disconnected input edges are + included in the output. + :returns: + A dictionary where each key is a pipeline component name and each value is a dictionary of + inputs sockets of that component. + """ + inputs: Dict[str, Dict[str, Any]] = {} + for component_name, data in find_pipeline_inputs(self.graph, include_components_with_connected_inputs).items(): + sockets_description = {} + for socket in data: + sockets_description[socket.name] = {"type": socket.type, "is_mandatory": socket.is_mandatory} + if not socket.is_mandatory: + sockets_description[socket.name]["default_value"] = socket.default_value + + if sockets_description: + inputs[component_name] = sockets_description + return inputs + + def outputs(self, include_components_with_connected_outputs: bool = False) -> Dict[str, Dict[str, Any]]: + """ + Returns a dictionary containing the outputs of a pipeline. + + Each key in the dictionary corresponds to a component name, and its value is another dictionary that describes + the output sockets of that component. + + :param include_components_with_connected_outputs: + If `False`, only components that have disconnected output edges are + included in the output. + :returns: + A dictionary where each key is a pipeline component name and each value is a dictionary of + output sockets of that component. + """ + outputs = { + comp: {socket.name: {"type": socket.type} for socket in data} + for comp, data in find_pipeline_outputs(self.graph, include_components_with_connected_outputs).items() + if data + } + return outputs + + def show(self) -> None: + """ + If running in a Jupyter notebook, display an image representing this `Pipeline`. + + """ + if is_in_jupyter(): + from IPython.display import Image, display # type: ignore + + image_data = _to_mermaid_image(self.graph) + + display(Image(image_data)) + else: + msg = "This method is only supported in Jupyter notebooks. Use Pipeline.draw() to save an image locally." + raise PipelineDrawingError(msg) + + def draw(self, path: Path) -> None: + """ + Save an image representing this `Pipeline` to `path`. + + :param path: + The path to save the image to. + """ + # Before drawing we edit a bit the graph, to avoid modifying the original that is + # used for running the pipeline we copy it. + image_data = _to_mermaid_image(self.graph) + Path(path).write_bytes(image_data) + + def walk(self) -> Iterator[Tuple[str, Component]]: + """ + Visits each component in the pipeline exactly once and yields its name and instance. + + No guarantees are provided on the visiting order. + + :returns: + An iterator of tuples of component name and component instance. + """ + for component_name, instance in self.graph.nodes(data="instance"): # type: ignore # type is wrong in networkx + yield component_name, instance + + def warm_up(self): + """ + Make sure all nodes are warm. + + It's the node's responsibility to make sure this method can be called at every `Pipeline.run()` + without re-initializing everything. + """ + for node in self.graph.nodes: + if hasattr(self.graph.nodes[node]["instance"], "warm_up"): + logger.info("Warming up component {node}...", node=node) + self.graph.nodes[node]["instance"].warm_up() + + def _validate_input(self, data: Dict[str, Any]): + """ + Validates pipeline input data. + + Validates that data: + * Each Component name actually exists in the Pipeline + * Each Component is not missing any input + * Each Component has only one input per input socket, if not variadic + * Each Component doesn't receive inputs that are already sent by another Component + + :param data: + A dictionary of inputs for the pipeline's components. Each key is a component name. + + :raises ValueError: + If inputs are invalid according to the above. + """ + for component_name, component_inputs in data.items(): + if component_name not in self.graph.nodes: + raise ValueError(f"Component named {component_name} not found in the pipeline.") + instance = self.graph.nodes[component_name]["instance"] + for socket_name, socket in instance.__haystack_input__._sockets_dict.items(): + if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs: + raise ValueError(f"Missing input for component {component_name}: {socket_name}") + for input_name in component_inputs.keys(): + if input_name not in instance.__haystack_input__._sockets_dict: + raise ValueError(f"Input {input_name} not found in component {component_name}.") + + for component_name in self.graph.nodes: + instance = self.graph.nodes[component_name]["instance"] + for socket_name, socket in instance.__haystack_input__._sockets_dict.items(): + component_inputs = data.get(component_name, {}) + if socket.senders == [] and socket.is_mandatory and socket_name not in component_inputs: + raise ValueError(f"Missing input for component {component_name}: {socket_name}") + if socket.senders and socket_name in component_inputs and not socket.is_variadic: + raise ValueError( + f"Input {socket_name} for component {component_name} is already sent by {socket.senders}." + ) + + def _prepare_component_input_data(self, data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """ + Prepares input data for pipeline components. + + Organizes input data for pipeline components and identifies any inputs that are not matched to any + component's input slots. Deep-copies data items to avoid sharing mutables across multiple components. + + This method processes a flat dictionary of input data, where each key-value pair represents an input name + and its corresponding value. It distributes these inputs to the appropriate pipeline components based on + their input requirements. Inputs that don't match any component's input slots are classified as unresolved. + + :param data: + A dictionary potentially having input names as keys and input values as values. + + :returns: + A dictionary mapping component names to their respective matched inputs. + """ + # check whether the data is a nested dictionary of component inputs where each key is a component name + # and each value is a dictionary of input parameters for that component + is_nested_component_input = all(isinstance(value, dict) for value in data.values()) + if not is_nested_component_input: + # flat input, a dict where keys are input names and values are the corresponding values + # we need to convert it to a nested dictionary of component inputs and then run the pipeline + # just like in the previous case + pipeline_input_data: Dict[str, Dict[str, Any]] = defaultdict(dict) + unresolved_kwargs = {} + + # Retrieve the input slots for each component in the pipeline + available_inputs: Dict[str, Dict[str, Any]] = self.inputs() + + # Go through all provided to distribute them to the appropriate component inputs + for input_name, input_value in data.items(): + resolved_at_least_once = False + + # Check each component to see if it has a slot for the current kwarg + for component_name, component_inputs in available_inputs.items(): + if input_name in component_inputs: + # If a match is found, add the kwarg to the component's input data + pipeline_input_data[component_name][input_name] = input_value + resolved_at_least_once = True + + if not resolved_at_least_once: + unresolved_kwargs[input_name] = input_value + + if unresolved_kwargs: + logger.warning( + "Inputs {input_keys} were not matched to any component inputs, please check your run parameters.", + input_keys=list(unresolved_kwargs.keys()), + ) + + data = dict(pipeline_input_data) + + # deepcopying the inputs prevents the Pipeline run logic from being altered unexpectedly + # when the same input reference is passed to multiple components. + for component_name, component_inputs in data.items(): + data[component_name] = {k: deepcopy(v) for k, v in component_inputs.items()} + + return data + + @classmethod + def from_template( + cls, predefined_pipeline: PredefinedPipeline, template_params: Optional[Dict[str, Any]] = None + ) -> "PipelineBase": + """ + Create a Pipeline from a predefined template. See `PredefinedPipeline` for available options. + + :param predefined_pipeline: + The predefined pipeline to use. + :param template_params: + An optional dictionary of parameters to use when rendering the pipeline template. + :returns: + An instance of `Pipeline`. + """ + tpl = PipelineTemplate.from_predefined(predefined_pipeline) + # If tpl.render() fails, we let bubble up the original error + rendered = tpl.render(template_params) + + # If there was a problem with the rendered version of the + # template, we add it to the error stack for debugging + try: + return cls.loads(rendered) + except Exception as e: + msg = f"Error unmarshalling pipeline: {e}\n" + msg += f"Source:\n{rendered}" + raise PipelineUnmarshalError(msg) + + def _init_graph(self): + """Resets the visits count for each component""" + for node in self.graph.nodes: + self.graph.nodes[node]["visits"] = 0 + + def _find_receivers_from(self, component_name: str) -> List[Tuple[str, OutputSocket, InputSocket]]: + """ + Utility function to find all Components that receive input form `component_name`. + + :param component_name: + Name of the sender Component + + :returns: + List of tuples containing name of the receiver Component and sender OutputSocket + and receiver InputSocket instances + """ + res = [] + for _, receiver_name, connection in self.graph.edges(nbunch=component_name, data=True): + sender_socket: OutputSocket = connection["from_socket"] + receiver_socket: InputSocket = connection["to_socket"] + res.append((receiver_name, sender_socket, receiver_socket)) + return res + + +def _connections_status( + sender_node: str, receiver_node: str, sender_sockets: List[OutputSocket], receiver_sockets: List[InputSocket] +): + """ + Lists the status of the sockets, for error messages. + """ + sender_sockets_entries = [] + for sender_socket in sender_sockets: + sender_sockets_entries.append(f" - {sender_socket.name}: {_type_name(sender_socket.type)}") + sender_sockets_list = "\n".join(sender_sockets_entries) + + receiver_sockets_entries = [] + for receiver_socket in receiver_sockets: + if receiver_socket.senders: + sender_status = f"sent by {','.join(receiver_socket.senders)}" + else: + sender_status = "available" + receiver_sockets_entries.append( + f" - {receiver_socket.name}: {_type_name(receiver_socket.type)} ({sender_status})" + ) + receiver_sockets_list = "\n".join(receiver_sockets_entries) + + return f"'{sender_node}':\n{sender_sockets_list}\n'{receiver_node}':\n{receiver_sockets_list}" diff --git a/haystack_experimental/core/pipeline/component_checks.py b/haystack_experimental/core/pipeline/component_checks.py new file mode 100644 index 00000000..7986b08a --- /dev/null +++ b/haystack_experimental/core/pipeline/component_checks.py @@ -0,0 +1,249 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, Dict, List + +from haystack.core.component.types import InputSocket, _empty + +_NO_OUTPUT_PRODUCED = _empty + + +def can_component_run(component: Dict, inputs: Dict) -> bool: + """ + Checks if the component can run, given the current state of its inputs. + + A component needs to pass two gates so that it is ready to run: + 1. It has received all mandatory inputs. + 2. It has received a trigger. + :param component: Component metadata and the component instance. + :param inputs: Inputs for the component. + """ + received_all_mandatory_inputs = are_all_sockets_ready(component, inputs, only_check_mandatory=True) + received_trigger = has_any_trigger(component, inputs) + + return received_all_mandatory_inputs and received_trigger + + +def has_any_trigger(component: Dict, inputs: Dict) -> bool: + """ + Checks if a component was triggered to execute. + + There are 3 triggers: + 1. A predecessor provided input to the component. + 2. Input to the component was provided from outside the pipeline (e.g. user input). + 3. The component does not receive input from any other components in the pipeline and `Pipeline.run` was called. + + A trigger can only cause a component to execute ONCE because: + 1. Components consume inputs from predecessors before execution (they are deleted). + 2. Inputs from outside the pipeline can only trigger a component when it is executed for the first time. + 3. `Pipeline.run` can only trigger a component when it is executed for the first time. + + :param component: Component metadata and the component instance. + :param inputs: Inputs for the component. + """ + trigger_from_predecessor = any_predecessors_provided_input(component, inputs) + trigger_from_user = has_user_input(inputs) and component["visits"] == 0 + trigger_without_inputs = can_not_receive_inputs_from_pipeline(component) and component["visits"] == 0 + + return trigger_from_predecessor or trigger_from_user or trigger_without_inputs + + +def are_all_sockets_ready(component: Dict, inputs: Dict, only_check_mandatory: bool = False) -> bool: + """ + Checks if all sockets of a component have enough inputs for the component to execute. + + :param component: Component metadata and the component instance. + :param inputs: Inputs for the component. + :param only_check_mandatory: If only mandatory sockets should be checked. + """ + filled_sockets = set() + expected_sockets = set() + if only_check_mandatory: + sockets_to_check = { + socket_name: socket for socket_name, socket in component["input_sockets"].items() if socket.is_mandatory + } + else: + sockets_to_check = { + socket_name: socket + for socket_name, socket in component["input_sockets"].items() + if socket.is_mandatory or len(socket.senders) + } + + for socket_name, socket in sockets_to_check.items(): + socket_inputs = inputs.get(socket_name, []) + expected_sockets.add(socket_name) + + # Check if socket has all required inputs or is a lazy variadic socket with any input + if has_socket_received_all_inputs(socket, socket_inputs) or ( + is_socket_lazy_variadic(socket) and any_socket_input_received(socket_inputs) + ): + filled_sockets.add(socket_name) + + return filled_sockets == expected_sockets + + +def any_predecessors_provided_input(component: Dict, inputs: Dict) -> bool: + """ + Checks if a component received inputs from any predecessors. + + :param component: Component metadata and the component instance. + :param inputs: Inputs for the component. + """ + return any( + any_socket_value_from_predecessor_received(inputs.get(socket_name, [])) + for socket_name in component["input_sockets"].keys() + ) + + +def any_socket_value_from_predecessor_received(socket_inputs: List[Dict[str, Any]]) -> bool: + """ + Checks if a component socket received input from any predecessors. + + :param socket_inputs: Inputs for the component's socket. + """ + # When sender is None, the input was provided from outside the pipeline. + return any(inp["value"] != _NO_OUTPUT_PRODUCED and inp["sender"] is not None for inp in socket_inputs) + + +def has_user_input(inputs: Dict) -> bool: + """ + Checks if a component has received input from outside the pipeline (e.g. user input). + + :param inputs: Inputs for the component. + """ + return any(inp for socket in inputs.values() for inp in socket if inp["sender"] is None) + + +def can_not_receive_inputs_from_pipeline(component: Dict) -> bool: + """ + Checks if a component can not receive inputs from any other components in the pipeline. + + :param: Component metadata and the component instance. + """ + return all(len(sock.senders) == 0 for sock in component["input_sockets"].values()) + + +def all_socket_predecessors_executed(socket: InputSocket, socket_inputs: List[Dict]) -> bool: + """ + Checks if all components connecting to an InputSocket have executed. + + :param: The InputSocket of a component. + :param: socket_inputs: Inputs for the socket. + """ + expected_senders = set(socket.senders) + executed_senders = {inp["sender"] for inp in socket_inputs if inp["sender"] is not None} + + return expected_senders == executed_senders + + +def any_socket_input_received(socket_inputs: List[Dict]) -> bool: + """ + Checks if a socket has received any input from any other components in the pipeline or from outside the pipeline. + + :param socket_inputs: Inputs for the socket. + """ + return any(inp["value"] != _NO_OUTPUT_PRODUCED for inp in socket_inputs) + + +def has_lazy_variadic_socket_received_all_inputs(socket: InputSocket, socket_inputs: List[Dict]) -> bool: + """ + Checks if a lazy variadic socket has received all expected inputs from other components in the pipeline. + + :param socket: The InputSocket of a component. + :param socket_inputs: Inputs for the socket. + """ + expected_senders = set(socket.senders) + actual_senders = { + sock["sender"] for sock in socket_inputs if sock["value"] != _NO_OUTPUT_PRODUCED and sock["sender"] is not None + } + + return expected_senders == actual_senders + + +def is_socket_lazy_variadic(socket: InputSocket) -> bool: + """ + Checks if an InputSocket is a lazy variadic socket. + + :param socket: The InputSocket of a component. + """ + return socket.is_variadic and not socket.is_greedy + + +def has_socket_received_all_inputs(socket: InputSocket, socket_inputs: List[Dict]) -> bool: + """ + Checks if a socket has received all expected inputs. + + :param socket: The InputSocket of a component. + :param socket_inputs: Inputs for the socket. + """ + # No inputs received for the socket, it is not filled. + if len(socket_inputs) == 0: + return False + + # The socket is greedy variadic and at least one input was produced, it is complete. + if ( + socket.is_variadic + and socket.is_greedy + and any(sock["value"] != _NO_OUTPUT_PRODUCED for sock in socket_inputs) + ): + return True + + # The socket is lazy variadic and all expected inputs were produced. + if is_socket_lazy_variadic(socket) and has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs): + return True + + # The socket is not variadic and the only expected input is complete. + return not socket.is_variadic and socket_inputs[0]["value"] != _NO_OUTPUT_PRODUCED + + +def all_predecessors_executed(component: Dict, inputs: Dict) -> bool: + """ + Checks if all predecessors of a component have executed. + + :param component: Component metadata and the component instance. + :param inputs: Inputs for the component. + """ + return all( + all_socket_predecessors_executed(socket, inputs.get(socket_name, [])) + for socket_name, socket in component["input_sockets"].items() + ) + + +def are_all_lazy_variadic_sockets_resolved(component: Dict, inputs: Dict) -> bool: + """ + Checks if the final state for all lazy variadic sockets of a component is resolved. + + :param component: Component metadata and the component instance. + :param inputs: Inputs for the component. + """ + for socket_name, socket in component["input_sockets"].items(): + if is_socket_lazy_variadic(socket): + socket_inputs = inputs.get(socket_name, []) + + # Checks if a lazy variadic socket is ready to run. + # A socket is ready if either: + # - it has received all expected inputs, or + # - all its predecessors have executed + # If none of the conditions are met, the socket is not ready to run and we defer the component. + if not ( + has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs) + or all_socket_predecessors_executed(socket, socket_inputs) + ): + return False + + return True + + +def is_any_greedy_socket_ready(component: Dict, inputs: Dict) -> bool: + """ + Checks if the component has any greedy socket that is ready to run. + + :param component: Component metadata and the component instance. + :param inputs: Inputs for the component. + """ + for socket_name, socket in component["input_sockets"].items(): + if socket.is_greedy and has_socket_received_all_inputs(socket, inputs.get(socket_name, [])): + return True + + return False diff --git a/haystack_experimental/core/pipeline/pipeline.py b/haystack_experimental/core/pipeline/pipeline.py new file mode 100644 index 00000000..0883cd0a --- /dev/null +++ b/haystack_experimental/core/pipeline/pipeline.py @@ -0,0 +1,518 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import warnings +from copy import deepcopy +from enum import IntEnum +from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union, cast + +from haystack import logging, tracing +from haystack.core.component import Component, InputSocket +from haystack.core.errors import PipelineMaxComponentRuns, PipelineRuntimeError +from haystack.telemetry import pipeline_running + +from haystack_experimental.core.pipeline.base import PipelineBase +from haystack_experimental.core.pipeline.component_checks import ( + _NO_OUTPUT_PRODUCED, + all_predecessors_executed, + are_all_lazy_variadic_sockets_resolved, + are_all_sockets_ready, + can_component_run, + is_any_greedy_socket_ready, + is_socket_lazy_variadic, +) +from haystack_experimental.core.pipeline.utils import FIFOPriorityQueue + +logger = logging.getLogger(__name__) + + +class ComponentPriority(IntEnum): + HIGHEST = 1 + READY = 2 + DEFER = 3 + DEFER_LAST = 4 + BLOCKED = 5 + + +class Pipeline(PipelineBase): + """ + Synchronous version of the orchestration engine. + + Orchestrates component execution according to the execution graph, one after the other. + """ + + @staticmethod + def _add_missing_input_defaults(component_inputs: Dict[str, Any], component_input_sockets: Dict[str, InputSocket]): + """ + Updates the inputs with the default values for the inputs that are missing + + :param component_inputs: Inputs for the component. + :param component_input_sockets: Input sockets of the component. + """ + for name, socket in component_input_sockets.items(): + if not socket.is_mandatory and name not in component_inputs: + if socket.is_variadic: + component_inputs[name] = [socket.default_value] + else: + component_inputs[name] = socket.default_value + + return component_inputs + + def _run_component( + self, component: Dict[str, Any], inputs: Dict[str, Any], parent_span: Optional[tracing.Span] = None + ) -> Tuple[Dict, Dict]: + """ + Runs a Component with the given inputs. + + :param component: Component with component metadata. + :param inputs: Inputs for the Component. + :param parent_span: The parent span to use for the newly created span. + This is to allow tracing to be correctly linked to the pipeline run. + :raises PipelineRuntimeError: If Component doesn't return a dictionary. + :return: The output of the Component and the new state of inputs. + """ + instance: Component = component["instance"] + component_name = self.get_component_name(instance) + component_inputs, inputs = self._consume_component_inputs( + component_name=component_name, component=component, inputs=inputs + ) + + # We need to add missing defaults using default values from input sockets because the run signature + # might not provide these defaults for components with inputs defined dynamically upon component initialization + component_inputs = self._add_missing_input_defaults(component_inputs, component["input_sockets"]) + + with tracing.tracer.trace( + "haystack.component.run", + tags={ + "haystack.component.name": component_name, + "haystack.component.type": instance.__class__.__name__, + "haystack.component.input_types": {k: type(v).__name__ for k, v in component_inputs.items()}, + "haystack.component.input_spec": { + key: { + "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)), + "senders": value.senders, + } + for key, value in instance.__haystack_input__._sockets_dict.items() # type: ignore + }, + "haystack.component.output_spec": { + key: { + "type": (value.type.__name__ if isinstance(value.type, type) else str(value.type)), + "receivers": value.receivers, + } + for key, value in instance.__haystack_output__._sockets_dict.items() # type: ignore + }, + }, + parent_span=parent_span, + ) as span: + # We deepcopy the inputs otherwise we might lose that information + # when we delete them in case they're sent to other Components + span.set_content_tag("haystack.component.input", deepcopy(component_inputs)) + logger.info("Running component {component_name}", component_name=component_name) + component_output = instance.run(**component_inputs) + component["visits"] += 1 + + if not isinstance(component_output, Mapping): + raise PipelineRuntimeError( + f"Component '{component_name}' didn't return a dictionary. " + "Components must always return dictionaries: check the documentation." + ) + + span.set_tag("haystack.component.visits", component["visits"]) + span.set_content_tag("haystack.component.output", component_output) + + return cast(Dict[Any, Any], component_output), inputs + + @staticmethod + def _consume_component_inputs(component_name: str, component: Dict, inputs: Dict) -> Tuple[Dict, Dict]: + """ + Extracts the inputs needed to run for the component and removes them from the global inputs state. + + :param component: Component with component metadata. + :param inputs: Global inputs state. + :returns: The inputs for the component and the new state of global inputs. + """ + component_inputs = inputs.get(component_name, {}) + consumed_inputs = {} + greedy_inputs_to_remove = set() + for socket_name, socket in component["input_sockets"].items(): + socket_inputs = component_inputs.get(socket_name, []) + socket_inputs = [sock["value"] for sock in socket_inputs if sock["value"] != _NO_OUTPUT_PRODUCED] + if socket_inputs: + if not socket.is_variadic: + # We only care about the first input provided to the socket. + consumed_inputs[socket_name] = socket_inputs[0] + elif socket.is_greedy: + # We need to keep track of greedy inputs because we always remove them, even if they come from + # outside the pipeline. Otherwise, a greedy input from the user would trigger a pipeline to run + # indefinitely. + greedy_inputs_to_remove.add(socket_name) + consumed_inputs[socket_name] = [socket_inputs[0]] + elif is_socket_lazy_variadic(socket): + # We use all inputs provided to the socket on a lazy variadic socket. + consumed_inputs[socket_name] = socket_inputs + + # We prune all inputs except for those that were provided from outside the pipeline (e.g. user inputs). + pruned_inputs = { + socket_name: [ + sock for sock in socket if sock["sender"] is None and not socket_name in greedy_inputs_to_remove + ] + for socket_name, socket in component_inputs.items() + } + pruned_inputs = {socket_name: socket for socket_name, socket in pruned_inputs.items() if len(socket) > 0} + + inputs[component_name] = pruned_inputs + + return consumed_inputs, inputs + + @staticmethod + def _convert_from_legacy_format(pipeline_inputs: Dict[str, Any]) -> Dict[str, Dict[str, List]]: + """ + Converts the inputs to the pipeline to the format that is needed for the internal `Pipeline.run` logic. + + Example Input: + {'prompt_builder': {'question': 'Who lives in Paris?'}, 'retriever': {'query': 'Who lives in Paris?'}} + Example Output: + {'prompt_builder': {'question': [{'sender': None, 'value': 'Who lives in Paris?'}]}, + 'retriever': {'query': [{'sender': None, 'value': 'Who lives in Paris?'}]}} + + :param pipeline_inputs: Inputs to the pipeline. + :returns: Converted inputs that can be used by the internal `Pipeline.run` logic. + """ + inputs: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} + for component_name, socket_dict in pipeline_inputs.items(): + inputs[component_name] = {} + for socket_name, value in socket_dict.items(): + inputs[component_name][socket_name] = [{"sender": None, "value": value}] + + return inputs + + def _fill_queue(self, component_names: List[str], inputs: Dict[str, Any]) -> FIFOPriorityQueue: + """ + Calculates the execution priority for each component and inserts it into the priority queue. + + :param component_names: Names of the components to put into the queue. + :param inputs: Inputs to the components. + :returns: A prioritized queue of component names. + """ + priority_queue = FIFOPriorityQueue() + for component_name in component_names: + component = self._get_component_with_graph_metadata(component_name) + priority = self._calculate_priority(component, inputs.get(component_name, {})) + priority_queue.push(component_name, priority) + + return priority_queue + + @staticmethod + def _calculate_priority(component: Dict, inputs: Dict) -> ComponentPriority: + """ + Calculates the execution priority for a component depending on the component's inputs. + + :param component: Component metadata and component instance. + :param inputs: Inputs to the component. + :returns: Priority value for the component. + """ + if not can_component_run(component, inputs): + return ComponentPriority.BLOCKED + elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs): + return ComponentPriority.HIGHEST + elif all_predecessors_executed(component, inputs): + return ComponentPriority.READY + elif are_all_lazy_variadic_sockets_resolved(component, inputs): + return ComponentPriority.DEFER + else: + return ComponentPriority.DEFER_LAST + + def _get_component_with_graph_metadata(self, component_name: str) -> Dict[str, Any]: + return self.graph.nodes[component_name] + + def _get_next_runnable_component( + self, priority_queue: FIFOPriorityQueue + ) -> Union[Tuple[ComponentPriority, str, Dict[str, Any]], None]: + """ + Returns the next runnable component alongside its metadata from the priority queue. + + :param priority_queue: Priority queue of component names. + :returns: The next runnable component, the component name, and its priority + or None if no component in the queue can run. + :raises: PipelineMaxComponentRuns if the next runnable component has exceeded the maximum number of runs. + """ + priority_and_component_name: Union[Tuple[ComponentPriority, str], None] = ( + None if (item := priority_queue.get()) is None else (ComponentPriority(item[0]), str(item[1])) + ) + + if priority_and_component_name is not None and priority_and_component_name[0] != ComponentPriority.BLOCKED: + priority, component_name = priority_and_component_name + component = self._get_component_with_graph_metadata(component_name) + if component["visits"] > self._max_runs_per_component: + msg = f"Maximum run count {self._max_runs_per_component} reached for component '{component_name}'" + raise PipelineMaxComponentRuns(msg) + + return priority, component_name, component + + return None + + @staticmethod + def _write_component_outputs( + component_name, component_outputs, inputs, receivers, include_outputs_from + ) -> Tuple[Dict, Dict]: + """ + Distributes the outputs of a component to the input sockets that it is connected to. + + :param component_name: The name of the component. + :param component_outputs: The outputs of the component. + :param inputs: The current global input state. + :param receivers: List of receiver_name, sender_socket, receiver_socket for connected components. + :param include_outputs_from: List of component names that should always return an output from the pipeline. + """ + for receiver_name, sender_socket, receiver_socket in receivers: + # We either get the value that was produced by the actor or we use the _NO_OUTPUT_PRODUCED class to indicate + # that the sender did not produce an output for this socket. + # This allows us to track if a pre-decessor already ran but did not produce an output. + value = component_outputs.get(sender_socket.name, _NO_OUTPUT_PRODUCED) + if receiver_name not in inputs: + inputs[receiver_name] = {} + + # If we have a non-variadic or a greedy variadic receiver socket, we can just overwrite any inputs + # that might already exist (to be reconsidered but mirrors current behavior). + if not is_socket_lazy_variadic(receiver_socket): + inputs[receiver_name][receiver_socket.name] = [{"sender": component_name, "value": value}] + + # If the receiver socket is lazy variadic, and it already has an input, we need to append the new input. + # Lazy variadic sockets can collect multiple inputs. + else: + if not inputs[receiver_name].get(receiver_socket.name): + inputs[receiver_name][receiver_socket.name] = [] + + inputs[receiver_name][receiver_socket.name].append({"sender": component_name, "value": value}) + + # If we want to include all outputs from this actor in the final outputs, we don't need to prune any consumed + # outputs + if component_name in include_outputs_from: + return component_outputs, inputs + + # We prune outputs that were consumed by any receiving sockets. + # All remaining outputs will be added to the final outputs of the pipeline. + consumed_outputs = {sender_socket.name for _, sender_socket, __ in receivers} + pruned_outputs = {key: value for key, value in component_outputs.items() if key not in consumed_outputs} + + return pruned_outputs, inputs + + @staticmethod + def _merge_component_and_pipeline_outputs( + component_name: str, component_outputs: Dict, pipeline_outputs: Dict + ) -> Dict: + """ + Merges the outputs of a component with the current pipeline outputs. + + :param component_name: The name of the component. + :param component_outputs: The outputs of the component. + :param pipeline_outputs: The pipeline outputs. + :returns: New pipeline outputs. + """ + if not component_outputs: + return pipeline_outputs + elif component_name not in pipeline_outputs: + pipeline_outputs[component_name] = component_outputs + else: + for key, value in component_outputs.items(): + if key not in pipeline_outputs[component_name]: + pipeline_outputs[component_name][key] = value + + return pipeline_outputs + + @staticmethod + def _is_queue_stale(priority_queue: FIFOPriorityQueue) -> bool: + """ + Checks if the priority queue needs to be recomputed because the priorities might have changed. + + :param priority_queue: Priority queue of component names. + """ + return len(priority_queue) == 0 or priority_queue.peek()[0] > ComponentPriority.READY + + def validate_pipeline(self, priority_queue: FIFOPriorityQueue): + """ + Validate the pipeline to check if it is blocked or has no valid entry point. + + :param priority_queue: Priority queue of component names. + """ + candidate = priority_queue.peek() + if candidate is not None and candidate[0] == ComponentPriority.BLOCKED: + raise PipelineRuntimeError( + "Cannot run pipeline - all components are blocked. " + "This typically happens when:\n" + "1. There is no valid entry point for the pipeline\n" + "2. There is a circular dependency preventing the pipeline from running\n" + "Check the connections between these components and ensure all required inputs are provided." + ) + + def run( # noqa: PLR0915, PLR0912 + self, data: Dict[str, Any], include_outputs_from: Optional[Set[str]] = None + ) -> Dict[str, Any]: + """ + Runs the Pipeline with given input data. + + Usage: + ```python + from haystack import Pipeline, Document + from haystack.utils import Secret + from haystack.document_stores.in_memory import InMemoryDocumentStore + from haystack.components.retrievers.in_memory import InMemoryBM25Retriever + from haystack.components.generators import OpenAIGenerator + from haystack.components.builders.answer_builder import AnswerBuilder + from haystack.components.builders.prompt_builder import PromptBuilder + + # Write documents to InMemoryDocumentStore + document_store = InMemoryDocumentStore() + document_store.write_documents([ + Document(content="My name is Jean and I live in Paris."), + Document(content="My name is Mark and I live in Berlin."), + Document(content="My name is Giorgio and I live in Rome.") + ]) + + prompt_template = \"\"\" + Given these documents, answer the question. + Documents: + {% for doc in documents %} + {{ doc.content }} + {% endfor %} + Question: {{question}} + Answer: + \"\"\" + + retriever = InMemoryBM25Retriever(document_store=document_store) + prompt_builder = PromptBuilder(template=prompt_template) + llm = OpenAIGenerator(api_key=Secret.from_token(api_key)) + + rag_pipeline = Pipeline() + rag_pipeline.add_component("retriever", retriever) + rag_pipeline.add_component("prompt_builder", prompt_builder) + rag_pipeline.add_component("llm", llm) + rag_pipeline.connect("retriever", "prompt_builder.documents") + rag_pipeline.connect("prompt_builder", "llm") + + # Ask a question + question = "Who lives in Paris?" + results = rag_pipeline.run( + { + "retriever": {"query": question}, + "prompt_builder": {"question": question}, + } + ) + + print(results["llm"]["replies"]) + # Jean lives in Paris + ``` + + :param data: + A dictionary of inputs for the pipeline's components. Each key is a component name + and its value is a dictionary of that component's input parameters: + ``` + data = { + "comp1": {"input1": 1, "input2": 2}, + } + ``` + For convenience, this format is also supported when input names are unique: + ``` + data = { + "input1": 1, "input2": 2, + } + ``` + :param include_outputs_from: + Set of component names whose individual outputs are to be + included in the pipeline's output. For components that are + invoked multiple times (in a loop), only the last-produced + output is included. + :returns: + A dictionary where each entry corresponds to a component name + and its output. If `include_outputs_from` is `None`, this dictionary + will only contain the outputs of leaf components, i.e., components + without outgoing connections. + + :raises ValueError: + If invalid inputs are provided to the pipeline. + :raises PipelineRuntimeError: + If the Pipeline contains cycles with unsupported connections that would cause + it to get stuck and fail running. + Or if a Component fails or returns output in an unsupported type. + :raises PipelineMaxComponentRuns: + If a Component reaches the maximum number of times it can be run in this Pipeline. + """ + pipeline_running(self) + + # Reset the visits count for each component + self._init_graph() + + # TODO: Remove this warmup once we can check reliably whether a component has been warmed up or not + # As of now it's here to make sure we don't have failing tests that assume warm_up() is called in run() + self.warm_up() + + # normalize `data` + data = self._prepare_component_input_data(data) + + # Raise ValueError if input is malformed in some way + self._validate_input(data) + + if include_outputs_from is None: + include_outputs_from = set() + + # We create a list of components in the pipeline sorted by name, so that the algorithm runs deterministically + # and independent of insertion order into the pipeline. + ordered_component_names = sorted(self.graph.nodes.keys()) + + # We need to access a component's receivers multiple times during a pipeline run. + # We store them here for easy access. + cached_receivers = {name: self._find_receivers_from(name) for name in ordered_component_names} + + pipeline_outputs: Dict[str, Any] = {} + with tracing.tracer.trace( + "haystack.pipeline.run", + tags={ + "haystack.pipeline.input_data": data, + "haystack.pipeline.output_data": pipeline_outputs, + "haystack.pipeline.metadata": self.metadata, + "haystack.pipeline.max_runs_per_component": self._max_runs_per_component, + }, + ) as span: + inputs = self._convert_from_legacy_format(pipeline_inputs=data) + priority_queue = self._fill_queue(ordered_component_names, inputs) + + # check if pipeline is blocked before execution + self.validate_pipeline(priority_queue) + + while True: + candidate = self._get_next_runnable_component(priority_queue) + if candidate is None: + break + + priority, component_name, component = candidate + if len(priority_queue) > 0: + next_priority, next_name = priority_queue.peek() + + if ( + priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST] + and next_priority == priority + ): + msg = ( + f"Components '{component_name}' and '{next_name}' are waiting for " + f"optional inputs at the same time. The pipeline will execute '{component_name}' " + f"first based on lexicographical ordering." + ) + warnings.warn(msg) + + component_outputs, inputs = self._run_component(component, inputs, parent_span=span) + component_pipeline_outputs, inputs = self._write_component_outputs( + component_name=component_name, + component_outputs=component_outputs, + inputs=inputs, + receivers=cached_receivers[component_name], + include_outputs_from=include_outputs_from, + ) + # TODO check original logic in pipeline, it looks like we don't want to override existing outputs + # e.g. for cycles but the tests check if intermediate outputs from components in cycles are overwritten + if component_pipeline_outputs: + pipeline_outputs[component_name] = component_pipeline_outputs + if self._is_queue_stale(priority_queue): + priority_queue = self._fill_queue(ordered_component_names, inputs) + + return pipeline_outputs diff --git a/haystack_experimental/core/pipeline/utils.py b/haystack_experimental/core/pipeline/utils.py new file mode 100644 index 00000000..a9006cf1 --- /dev/null +++ b/haystack_experimental/core/pipeline/utils.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import heapq +from itertools import count +from typing import Any, List, Optional, Tuple + + +def parse_connect_string(connection: str) -> Tuple[str, Optional[str]]: + """ + Returns component-connection pairs from a connect_to/from string. + + :param connection: + The connection string. + :returns: + A tuple containing the component name and the connection name. + """ + if "." in connection: + split_str = connection.split(".", maxsplit=1) + return (split_str[0], split_str[1]) + return connection, None + + +class FIFOPriorityQueue: + """ + A priority queue that maintains FIFO order for items of equal priority. + + Items with the same priority are processed in the order they were added. + This queue ensures that when multiple items share the same priority level, + they are dequeued in the same order they were enqueued (First-In-First-Out). + """ + + def __init__(self) -> None: + """ + Initialize a new FIFO priority queue. + """ + # List of tuples (priority, count, item) where count ensures FIFO order + self._queue: List[Tuple[int, int, Any]] = [] + # Counter to maintain insertion order for equal priorities + self._counter = count() + + def push(self, item: Any, priority: int) -> None: + """ + Push an item into the queue with a given priority. + + Items with equal priority maintain FIFO ordering based on insertion time. + Lower priority numbers are dequeued first. + + :param item: + The item to insert into the queue. + :param priority: + Priority level for the item. Lower numbers indicate higher priority. + """ + count = next(self._counter) + entry = (priority, count, item) + heapq.heappush(self._queue, entry) + + def pop(self) -> Tuple[int, Any]: + """ + Remove and return the highest priority item from the queue. + + For items with equal priority, returns the one that was inserted first. + + :returns: + A tuple containing (priority, item) with the lowest priority number. + :raises IndexError: + If the queue is empty. + """ + if not self._queue: + raise IndexError("pop from empty queue") + priority, _, item = heapq.heappop(self._queue) + return priority, item + + def peek(self) -> Tuple[int, Any]: + """ + Return but don't remove the highest priority item from the queue. + + For items with equal priority, returns the one that was inserted first. + + :returns: + A tuple containing (priority, item) with the lowest priority number. + :raises IndexError: + If the queue is empty. + """ + if not self._queue: + raise IndexError("peek at empty queue") + priority, _, item = self._queue[0] + return priority, item + + def get(self) -> Optional[Tuple[int, Any]]: + """ + Remove and return the highest priority item from the queue. + + For items with equal priority, returns the one that was inserted first. + Unlike pop(), returns None if the queue is empty instead of raising an exception. + + :returns: + A tuple containing (priority, item), or None if the queue is empty. + """ + if not self._queue: + return None + priority, _, item = heapq.heappop(self._queue) + return priority, item + + def __len__(self) -> int: + """ + Return the number of items in the queue. + + :returns: + The number of items currently in the queue. + """ + return len(self._queue) + + def __bool__(self) -> bool: + """ + Return True if the queue has items, False if empty. + + :returns: + True if the queue contains items, False otherwise. + """ + return bool(self._queue) diff --git a/pyproject.toml b/pyproject.toml index a6776e5b..53c66c59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,8 @@ dependencies = [ # Linting "pylint", "ruff", + # PipelineBase.show + "ipython" ] [tool.hatch.envs.test]