Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEW] OBSERVE command for enhanced observability in Valkey: Data Collection #1167

Open
mwarzynski opened this issue Oct 14, 2024 · 18 comments
Open
Labels
client-changes-needed Client changes are required for this feature

Comments

@mwarzynski
Copy link

mwarzynski commented Oct 14, 2024

Original proposal was divided into two sections: data collection and data analysis. This issue is re-scoped to reflect the design of 'data collection'. New proposal is here: #1167 (comment).

I am leaving the original proposal for full blown observability pipelines unchanged. If you are not interested in previous versions and all of the historical context, please go directly to the new proposal here: #1167 (comment).


TLDR, I propose to improve observability for Valkey, like built-in RED time-series metrics

Overview

This proposal outlines a new OBSERVE command to improve Valkey’s observability capabilities. By enabling advanced time-series metrics, custom gathering pipelines, and in-server data aggregation, OBSERVE will equip Valkey users with first-class monitoring commands for granular insight into server behavior and performance.

Background

After discussions with Irfan Ahmad, an attendee at the '24 Valkey Summit, I developed this initial proposal to introduce native observability pipelines within Valkey. Currently, Valkey lacks comprehensive, customizable observability tools embedded directly within the server, and this proposal aims to fill that gap.

Note: This proposal is a work in progress. Feedback on the overall approach and any preliminary design concerns would be greatly appreciated.


Current Observability Limitations in Valkey

Currently, Valkey’s observability relies on commands like MONITOR, SLOWLOG, and INFO.

While useful, these commands have limitations:

  • MONITOR: Streams every command, generating high data volume that may overload production environments.
  • SLOWLOG: Logs only commands exceeding a set execution time, omitting quick operations and general command patterns.
  • INFO: Provides server statistics but lacks detailed command- and key-specific insights.

These commands lack the flexibility for in-depth, customizable observability exposed directly within the valkey-server instance,
such as filtering specific events, sampling data, executing custom processing steps, aggregating metrics over time windows.

Feature proposal

Problem statement and goals

The proposed OBSERVE command suite will bring observability as a core Valkey feature. Through user-defined “observability pipelines,” Valkey instances can produce detailed insights in a structured, efficient manner. These pipelines will be customizable to support diverse use cases, providing users with foundational building blocks for monitoring without overwhelming server resources. This new functionality could be enhanced with integration with tools like Prometheus and Grafana for visualization or alerting, though its fully customizable and primary purpose is in-server analysis.

Proposed solution -- Commands

The OBSERVE command set introduces the concept of observability pipelines — user-defined workflows for collecting, filtering, aggregating, and storing metrics.

Core Commands

  • OBSERVE CREATE <pipeline_name> <configuration>
    Creates an observability pipeline with a specified configuration. Configuration details, specified in the next section, define steps such as filtering, partitioning, sampling, and aggregation.
    Pipeline and it's configuration is persisted in the runtime memory (i.e. user needs to re-create the pipeline after server restart).

  • OBSERVE START <pipeline_name>
    Starts data collection for the specified pipeline.

  • OBSERVE STOP <pipeline_name>
    Stops data collection for the specified pipeline.

  • OBSERVE DELETE <pipeline_name>
    Deletes the pipeline and its configuration.

  • OBSERVE RETRIEVE <pipeline_name>
    Retrieves collected data. Alternatively, GET could potentially serve for this function, but further design discussion is needed.

  • OBSERVE LOADSTEPF <step_name> <lua_code>
    Allows defining custom processing steps using Lua, for cases where built-in steps do not meet needed requirements.

Pipeline configuration

Pipelines are configured as chains of data processing stages, including filtering, aggregation, and output buffering. Format is similar to the Unix piping.

Key stages in this pipeline model include:

  • filter(f): Filters events based on defined conditions (e.g., command type).
  • partition(f): Partitions events according to a function (e.g., by key prefix).
  • sample(f): Samples events at a specified rate.
  • map(f): Transforms each event with a specified function.
  • window(f): Aggregates data within defined time windows.
  • reduce(f): Reduces data over a window via an aggregation function.
  • output(f): Directs output to specified sinks.

Example configuration syntax:

OBSERVE CREATE get_errors_pipeline "
filter(filter_by_commands(['GET'])) |
filter(filter_for_errors) |
window(window_duration(1m)) |
reduce(count) |
output(output_timeseries_to_key('get_errors_count', max_length=1000))
"

Output

The goal is to capture time-series metrics within the defined pipeline outputs, f.e. for the pipeline above it would be structured as follows:

[<timestamp1, errors_count1>, <timestamp2, errors_count2>, ...] // capped at 1000 items

It remains uncertain whether storing output data in a format compatible with direct retrieval via GET (or another existing command) will be feasible. Consequently, we might need to introduce an OBSERVE RETRIEVE <since_offset> command for clients polling results data. This command would provide:

{
    current_offset: <latest_returned_offset as a number>,
    data: [ ... result items ],
    lag_detected: <true or false> // true if `since_offset` points to data that’s been removed, signaling potential data loss.
}

Here, offset represents the sequence number of items produced by the pipeline, including any items removed due to buffer constraints. This approach allows clients to poll for results while adjusting their polling frequency based on the lag_detected flag. If lag_detected is true, clients would be advised to increase polling frequency to reduce data loss.


Use-Case Examples

Below are examples of how the proposed OBSERVE command and pipeline configurations could be used to address various
observability needs.

  1. Counting Specific Commands Per Minute with Buffer Size

    Use Case: Count the number of GET commands executed per minute.

    Pipeline Creation:

    OBSERVE CREATE get_commands_per_minute "
    filter(filter_by_commands(['GET'])) |
    window(window_duration(1m)) |
    reduce(reduce_count) |
    output(output_timeseries_to_key('get_command_count', buffer_size=1440))
    "
    

    Explanation: This pipeline filters for GET commands, counts them per every minute, and stores the counts
    in a time-series key get_command_count with a buffer size of 1440 (e.g., one day's worth of minute-level data).

  2. Hot Key Analysis

    Use Case: Identify and monitor the most frequently accessed keys within a certain time window, allowing for proactive load management and identification of potential bottlenecks.

    Pipeline Creation:

    OBSERVE CREATE hot_keys_analysis "
    filter(filter_by_commands(['GET'])) |
    sample(sample_percentage(0.005)) |
    partition(partition_by_key()) |
    window(window_duration(1m)) |
    reduce(reduce_count) |
    map(map_top_keys(10)) |
    output(output_timeseries_to_key('hot_keys', buffer_size=60))
    "
    

    Explanation: This pipeline filters for sampled 0.5% of GET commands, partitions events by the accessed key, and aggregates their counts in one-minute intervals.
    The map_top_keys(10) step then selects the top 10 most frequently accessed keys in each interval along with the access counts.
    The result is stored as a time-series in hot_keys with a buffer size of 60, retaining one hour of hot key data.

  3. Average Latency Per Time Window with Buffer

    Use Case: Monitor average latency of SET commands per minute.

    Pipeline Creation:

    OBSERVE CREATE set_latency_monitor "
    filter(filter_by_commands('SET')) |
    sample(sample_percentage(0.005)) |
    window(window_duration(1m)) |
    map(map_get_latency) |
    reduce(average) |
    output(timeseries_to_key('set_average_latency', buffer_size=720))
    "
    

    Explanation: This pipeline filters for SET commands, extracts their latency, aggregates the average latency every
    minute, and stores it with a buffer size of 720 (e.g., 12 hours of minute-level data).

  4. Client Statistics

    Use Case: Gather command counts per client for GET and SET commands, sampled at 5%.

    Pipeline Creation:

    OBSERVE CREATE client_stats_per_minute "
    filter(filter_by_commands(['GET', 'SET'])) |
    sample(sample_percentage(0.05)) |
    map(map_client_info) |
    window(window_duration(1m)) |
    reduce(count_by_client) |
    output(timeseries_to_key('client_stats', buffer_size=1440))
    "

    Explanation: This pipeline filters for GET and SET commands, samples 5% of them, extracts client information, c
    ounts commands per client every minute, and stores the data under client_stats with a buffer size of 1440.

  5. Error Tracking

    Use Case: Monitor the number of errors occurring per minute.

    Pipeline Creation:

    OBSERVE CREATE error_tracking_pipeline "
    filter(filter_event_type('error')) | # likely filter for errors would have to be more advanced
    window(window_duration(1m)) |
    reduce(count) |
    output(timeseries_to_key('total_errors', buffer_size=1440))
    "

    Explanation: This pipeline filters events of type 'error', counts them every minute, and stores the totals in tota l_errors with a buffer size of 1440.

  6. TTL Analysis

    Use Case: Analyze the average TTL of keys set with SETEX command per minute.

    Pipeline Creation:

    OBSERVE CREATE ttl_analysis_pipeline "
    filter(filter_by_commands(['SETEX'])) |
    map(map_extract_ttl) |
    window(window_duration(1m)) |
    reduce(average) |
    output(timeseries_to_key('average_ttl', buffer_size=1440))
    "

    Explanation: This pipeline filters for SETEX commands, extracts the TTL values, calculates the average TTL every
    minute, and stores it in average_ttl with a buffer size of 1440.

  7. Distribution of Key and Value Sizes

    Use Case: Create a histogram of value sizes for SET commands.

    Pipeline Creation:

    OBSERVE CREATE value_size_distribution "
    filter(command('SET')) |
    map(extract_value_size) |
    window(window_duration(1m)) |
    reduce(histogram(buckets([0, 64, 256, 1024, 4096, 16384]))) |
    output(timeseries_to_key('value_size_distribution', buffer_size=1440))
    "

    Explanation: This pipeline filters for SET commands, extracts the size of the values, aggregates them into histog
    ram buckets every minute, and stores the distributions with a buffer size of 1440.


Feedback Request

Feedback is requested on the following points:

  1. Feature Scope: Does the proposed OBSERVE command align with your vision for Valkey’s observability?
  2. Command Design: Are there any suggestions for the OBSERVE command syntax and structure?

Let's first reach the consensus for the 'Feature Scope'. If the answer is yes, we can discuss the designs.
I am ready to commit to building this feature as soon as the designs are accepted, even in draft form.


Thank you for your time and consideration. I look forward to discussing this proposal further.

@allenss-amazon
Copy link
Member

I like the concepts and directionality here. I think it would be profitable to split this into two subsections. One section would be to get more specific on the events that would feed into the observability framework.

A second section would focus on the representation and processing of those events. I mention this because there's quite a bit of overlap in the functionality of the second section and any implementation of a timestream processing module. In other words, can we get both timestream processing of the observability event stream (part 1) and more generic timestream data processing capabilities in the same development effort or conversely split the development effort into two parts that cooperate?

@mwarzynski
Copy link
Author

mwarzynski commented Nov 5, 2024

@allenss-amazon Thank you for the insightful feedback! We’re definitely on the same page about maintaining flexibility for future enhancements, particularly around timestream processing and event-based observability.

For now, our approach is to keep the implementation streamlined and tailored to Valkey’s current state of the codebase. By initially focusing on passing command execution details directly as the input to our observability pipeline, we’ll establish a foundation that can be then adapted for timestream events in the future.

If and when we introduce events, we could replace or extend the input system for our pipeline to accommodate event-based data, allowing for similar processing while offering additional data input types. Keeping a direct dependency between command execution and the observability implementation will help us maintain a simple architecture and deliver a more focused solution in the short term.

I’ve also included a simplified diagram of this first-approach implementation to illustrate the flow we envision. I’d love to hear your thoughts on whether this approach makes sense. What do you think?

flowchart LR
    A[User: AnyCommand] --> B("observePostCommand()")
    subgraph valkey-server
        subgraph "Command Execution: call()"
            B
        end

        subgraph "observeUnitsProcess()"
            B --> C("Filter()")
            C --> D("Sample()")
            D --> E("Partition()")
            E --> F("Window()")
            F --> G("Reduce()")
            G --> H("Output()")
        end

        H --> I("Observe Results Store")
    end
Loading

Thanks again for the input—we really appreciate the foresight around extensibility here!

@allenss-amazon
Copy link
Member

I think it's important to provide an architecture that's lightweight enough that it could be enabled nearly everywhere -- maybe even by default. That means that the data collection needs to be fast. I think it's important to get more specific about the kinds of data events you're going to rely on and how this data is generated.

@mwarzynski
Copy link
Author

@allenss-amazon:

That means that the data collection needs to be fast. I think it's important to get more specific about the kinds of data events you're going to rely on and how this data is generated.

Absolutely. Performance is one of the primary goals.

My initial approach is to check whether any observe functionality needs to do something after command executes.
This check is performed through observePostCommand at the end of the call function.

Currently, this creates an observeUnit that captures basic information about each command execution:

typedef struct observeUnit {
    int command_id;

    robj **argv;
    size_t argv_len;

    size_t response_size_bytes;

    long long duration_microseconds;
} observeUnit;

If the observe functionality is disabled or unconfigured, there’s no impact on command execution performance—it’s just a simple check of a boolean flag. If it’s enabled, I construct this unit on the stack and attempt to process it through the pipeline. (Note that the pipeline processing part hasn’t been implemented yet.)

I think it's important to provide an architecture that's lightweight enough that it could be enabled nearly everywhere -- maybe even by default.

At this early stage, the structs feel lightweight and execution should be 'fast', but I anticipate that 'data gathering' may need additional complexity. Currently, observeUnit is constructed from client and duration, but our pipelines may require more detailed information. Especially if we will want to allow the pipelines to do some fancy stuff.
One example of required enhancements could be capturing command errors -- at the moment there is no straightforward, efficient method to access the OK/ERROR result.

@allenss-amazon What do you think about my approach? Do you have any suggestions for enhancing 'data gathering' for observability?

@allenss-amazon
Copy link
Member

allenss-amazon commented Nov 7, 2024

I'm skeptical of the "one size fits all" interface style here. For example, using the end of call as your insertion point will miss all client commands that block. But that clearly can be fixed by tapping into the unblocking machinery also.

This also misses all non-command related activity -- cluster bus, evictions, etc. Which I think could also be quite interesting. I'd propose that we implement a mechanism to self-monitor the existing metrics in the core. For example, having a periodic task to execute an "info" command and collect the various values into samples to feed into the machinery could also be quite valuable.

The automatic info scheme has a low enough overhead (you adjust the frequency of collection to match your CPU wallet ;-)) that it could reasonably be left on in any production environment. It also creates an incentive to increase instrumentation in the core.

@mwarzynski
Copy link
Author

@allenss-amazon:

I'm skeptical of the "one size fits all" interface style here. For example, using the end of call as your insertion point will miss all client commands that block. But that clearly can be fixed by tapping into the unblocking machinery also.

The primary goal is to design observability pipelines with enough flexibility to support a variety of input sources. In my initial implementation, I focused on integrating the first input source as 'command executions.' However, with the right design, we should be able to expand this model to support additional sources or even rework the data push mechanism to accommodate a new system based on event streams.

Here’s a diagram illustrating this approach:

image

The concept is that multiple sources can feed into the Observe Units Processor, which will process them through the pipelines. Additionally, the Observe Units Processor should allow us to easily implement new input sources. (I may need some guidance on how best to structure this in the code.)

This also misses all non-command related activity -- cluster bus, evictions, etc. Which I think could also be quite interesting. I'd propose that we implement a mechanism to self-monitor the existing metrics in the core. For example, having a periodic task to execute an "info" command and collect the various values into samples to feed into the machinery could also be quite valuable.

You’re right that the initial input source doesn’t cover several internal cluster activities (and blocking commands), but the design should allow us to extend the list of sources in future iterations. Structuring this properly may require some guidance, particularly to ensure compatibility with all potential Valkey input streams.

Output from the INFO command is already accessible to the Clients, so I wonder—if we limited ourselves to just these results, would it be valuable enough to build out the entire observability pipeline? It’s feasible for users to set up a custom client to periodically fetch INFO data and compute time-series metrics. To bring real value to the observability pipeline, I aimed to start with something less/not accessible in the current Valkey feature set, while aligning with Google's 4 Golden Signals. Hence, I opted to implement this at the command level (though we could debate if this approach is ideal).

What do you think? Are there any other arguments for limiting ourselves to the INFO results besides simplicity / incentives?

@virtualirfan
Copy link

@allenss-amazon said:

having a periodic task to execute an "info" command and collect the various values into samples to feed into the machinery could also be quite valuable.

This is an excellent idea.

@mwarzynski said:

The concept is that multiple sources can feed into the Observe Units Processor, which will process them through the pipelines. Additionally, the Observe Units Processor should allow us to easily implement new input sources. (I may need some guidance on how best to structure this in the code.)

@mwarzynski I like your approach to Allen's concept … IIRC, you want to split (a) the data collection (which can include filters for efficiency) and (b) the trigger from (c) what you term "Observe Units Processing". Then the "data collection" can come (i) command post processing like you're prototyping, or (ii) triggered by another command's execution including but not limited to the INFO command, or (iii) even timers for fully internal sampling.

This frees us up to focus on multiple different parts of the implementation. We can discuss the programmable pipeline implementation while keeping the "data collection" part open for significant extension as we more use cases emerge.

@mwarzynski said:

Output from the INFO command is already accessible to the Clients, so I wonder—if we limited ourselves to just these results, would it be valuable enough to build out the entire observability pipeline? It’s feasible for users to set up a custom client to periodically fetch INFO data and compute time-series metrics.

I think @allenss-amazon's idea is not to limit us to INFO but rather to make INFO command execution be able to feed the processing pipeline as a data source in addition to what you have already been working on. I gather from Allen's comment that the flexibility to be able to feed data (e.g. INFO or other commands) would motivate developers to feed more data into the observability pipeline and potentially have it "always on".

Overall, I see the conceptual two stages "collection" and "programmable processing" as a powerful combination.

Also note that pulling detailed command results (e.g. from INFO) out to the client is very expensive which might limit use cases. However, as Allen hints, having verbose output instead go into a server-side pipeline, especially with filters, could make it feasible to run "always on" observability.

@allenss-amazon Please correct as appropriate … I don't want to go in a direction that you didn't intend.

Overall, I think we have immediate use cases for memory sizing that require data collection from observePostCommand so I'd appreciate if we can start with that and add support for (x) INFO (among other commands) as input, (y) time/event/notifications triggers as a second step. Mostly, I just want to lock down the initial scope so that @mwarzynski can start work on the programmable pipeline part which is where we will discover how tricky this business is.

@allenss-amazon
Copy link
Member

Yes, multiple sources of data feeding the analysis engine. I believe a time-series processing module is in Valkey's future (likely with fidelity to the Redis time series module) and that this observability proposal should use that module as it's analysis engine rather than something unique. Thus the discussion could bifurcate into two threads, one about time-series processing and this thread which focuses on data collection mechanisms.

I proposed a source of data, which is a periodic self-sampling of the "INFO" metrics. An initial implementation of this would trivially be built by having a periodic timer recursively invoke the INFO command and parse the results. Long term, I envision this as driving a re-architecting of stat collection within the Valkey universe to avoid the serialize/deserialize overhead of this approach, gaining efficiency and therefore usability. This would also provide a degree of uniformity in format and semantics for info stats as well as a reflection mechanism (i.e., command getkeysandflags for info stats) that could drive more generic tools like a grafana connector.

@mwarzynski proposed a source of data which is to tap info the command processing and invoke a LUA script with the command, it's arguments and execution time. This is simple, but expensive in that it's going to duplicate a lot of the work that the core already does for you. For example, rather than a single tap-in point for all commands, why not have a per-command tap-in point? I mean the ability to establish a separate LUA script to be invoked for each command.. This would avoid needless LUA execution for commands that aren't of interest. Also the per-command LUA scripts run faster because the command parsing is already completed. With that thought in mind there are other potential tap-in points. For example, leveraging the ACL infrastructure would allow you to tap into core code that validates read and write access for keys independent of the commands., again something that reduces redundant parsing overhead. I'm sure there will be more points that would prove profitable.

If we're in a world of multiple data sources and LUA scripts, then we should think about how those different LUA environments interact. Is there a single global LUA environment for all of OBSERVE or is there a need for multiple environments?

@mwarzynski
Copy link
Author

I believe a time-series processing module is in Valkey's future (likely with fidelity to the Redis time series module) and that this observability proposal should use that module as it's analysis engine rather than something unique. Thus the discussion could bifurcate into two threads, one about time-series processing and this thread which focuses on data collection mechanisms.

@allenss-amazon If I understand your proposal correctly, you suggest dividing it into two distinct sections.

  1. Data collection: Encompasses the interface for ‘taps’ and the processing of data for observability. It establishes a foundation that enables us to push data to the observability component through ‘taps’, process it using Lua, and expose a way to retrieve the results. (Please note that this part does not include time series metrics or data aggregation. Essentially, it accepts the data, allows the user to execute a Lua script on it [map], and then allows the user to retrieve the results.)
  2. Data analysis: Involves extending the processing capabilities to support more complex analysis, which includes, among others, time series metrics and data aggregation.

Implementation of the 'data collection' would be much smaller in scope and would be a great first step towards the support of the full-blown observability pipelines functionality. If that's our plan, then I am fully onboard with this idea. I can revise the proposal to the scope of the “data collection” thread in a manner that allows it to be expanded to support a more comprehensive approach that will then support “data analysis.”

@allenss-amazon If you agree, I am going to post a new comment with a new design proposal, and link it at the top of the existing proposal (top of the GH Issue description). I wouldn't replace the current Issue description, because it's worth preserving it for other people as otherwise our discussion in the comments above wouldn't make sense.


You also started the discussion about Lua scripts -- pros and cons of specific taps placement. I might need to understand more technical details about Valkey's codebase, and specifically Lua's impact on performance, to get your perspective. So far I thought we might implement efficient filtering in C for the basic fields (e.g. command type) and run Lua only in cases where users will want to have something custom. However, I might have been mistaken, because Lua could be a much more common use case than I assumed, thus the performance penalty would be too large. I probably also lack other important technical details. Thank you for the insight!

This is simple, but expensive in that it's going to duplicate a lot of the work that the core already does for you.

I’m also unsure which functionalities I’d be duplicating. Do we already have an easy way to hook into any command executions? I couldn’t find a generic way to do so that covers all command executions.

Good news is that our design of taps should be orthogonal to the issue of 'taps placement'.

@allenss-amazon
Copy link
Member

Yes, let's focus initially on data collection.

w.r.t. LUA scripts, my issue isn't about the speed of LUA vs C, but rather about the duplication of work in general. In other words, the core already does substantial processing on commands and keys generically, we can harness that work by adding tap points into the generic command processing infrastructure so that our observability infrastructure need not repeat that work.

One thing to think about is that the existing LUA infrastructure in the core assumes it's only being executed -- between -- commands, i.e., as a command itself. Thus some of the LUA functionality may or may not be available to the tap-point infrastructure. For example, I'm skeptical that your LUA function would be able to "call" a Valkey command in order to insert a result into a key (this would invoke the tap recursively?). Also, the issues with replication are interesting to think about -- is this functionality only useful on a primary? Maybe the idea that the results are delivered back into the Valkey keyspace is itself bogus. So as far as data collection is concerned, I think there's substantial thinking required on how to deliver the output of the Tap function back into the Valkey world.

@mwarzynski
Copy link
Author

mwarzynski commented Nov 23, 2024

Overview

This revised proposal focuses on the "data collection" aspect of the observability pipeline for Valkey. The aim is to establish a foundational system for collecting and processing observability data efficiently, laying the groundwork for future expansion into more advanced data analysis, such as time-series metrics and aggregation. Basic data collection capabilities are powerful enough to expose the useful functionality of polling data and implementing connectors with open-source observability tooling (e.g., Grafana).

Background

Following discussions with Irfan Ahmad at the '24 Valkey Summit and subsequent conversations with @allenss-amazon, we agreed to divide the observability proposal into two distinct phases to better manage its scope and focus on foundational development first:

  1. Data Collection: A foundational system enabling taps for data capture, custom processing via Lua scripts, and output delivery for observability purposes.
  2. Data Analysis: Advanced data processing capabilities such as time-series metrics and aggregation, potentially leveraging a dedicated time-series processing module. (Design proposal: <todo(mwarzynski): provide the URL Link>.)

This document presents the scope of the "Data Collection" phase, which is the groundwork for the observability pipelines feature.


Current Observability Limitations in Valkey

Valkey's existing observability tools, including MONITOR, SLOWLOG, and INFO, provide limited insights:

  • MONITOR: Streams all commands, generating high data volume that is impractical for production environments.
  • SLOWLOG: Captures only commands exceeding a defined execution time, omitting broader patterns.
  • INFO: Offers general server statistics without granular, customizable data collection capabilities.

These commands lack the flexibility for in-depth, customizable observability exposed directly within the valkey-server instance, such as filtering specific events, sampling data, executing custom processing steps, aggregating metrics over time windows.


Proposal: Data Collection

The "Data Collection" phase introduces a lightweight mechanism for observing Valkey's internal events through taps. These taps enable capturing and processing data with Lua scripts and retrieving results without overburdening server resources.

Core Components

1. Taps

Taps serve as entry points for collecting specific data units within Valkey, where "data units" is a generic term encompassing various forms of observability data. For example, data units might represent details about command execution or periodically gathered general server state information. Each Tap implementation has a unique name and provides data units of a specific type, reflecting the varied nature of data collected by different Taps.

2. Tap Attachment

Users can start the data collection from a particular Tap by creating a 'Tap Attachment'. Tap Attachment defines that the data collection functionality should start gathering data from a specific Tap. Optionally, data gathering can be extended with custom processing logic for Tap Attachments using Lua scripts. This enables:

  • Filtering and sampling data points.
  • Transforming event data (e.g., extracting latencies).

3. Output Mechanism

The solution for retrieving processed data via Tap Attachment is yet to be defined but must meet these characteristics:

  • Storing results in temporary buffers for periodic polling.
  • Delivering results directly to clients via an OBSERVE RETRIEVE command or directly from key space via GET command.
  • Allowing collection of data from all instances added to the cluster.

The output mechanism is yet "to be defined," as the solution must satisfy all of the points outlined above.

Commands

OBSERVE TAP ATTACH <tap_name> <attachment_name> [<lua_script_name>]

Attaches a tap with a specified implementation name and an optional Lua script for custom processing. Attaching a tap immediately starts data collection.

Example:

OBSERVE TAP ATTACH tap_command_get my_attachment lua_sampling

OBSERVE TAP DETACH <tap_name> <attachment_name>

Stops data collection and detaches the specified tap attachment. It doesn't affect other tap attachments.

Example:

OBSERVE TAP DETACH tap_command_get my_attachment

OBSERVE TAP RETRIEVE <tap_name> <attachment_name>

Retrieves processed data from the specified tap attachment. Results include a sequence number and data items.

Example response:

{
  "current_offset": 123,
  "data": [
    {"latency": 5, "command": "GET", "key": "key1"},
    {"latency": 3, "command": "GET", "key": "key2"}
  ],
  "lag_detected": false
}

Design Considerations

Generic Tap Interface

The data collection component defines the interface for taps but does not couple directly to any specific part of the Valkey codebase. This design does not impose constraints on where taps can be added. It should be straightforward to implement new taps across Valkey’s core components. The ease of adding new taps should incentivize developers to integrate them into their components, enabling efficient observation of component behavior.

Lua Execution Scope

Lua scripts must operate within constraints to prevent recursive command invocation or replication issues. They should focus on transforming and filtering data, avoiding actions that modify Valkey's state or impose significant computational overhead.


Example Use Cases

Monitoring Command Latencies

Capture latencies of GET commands.

OBSERVE TAP ATTACH tap_command_get latency_attachment lua_sampling

Error Tracking

Track errors per minute.

OBSERVE TAP ATTACH tap_event_error errors_attachment

Custom Metrics

Use Lua to compute custom metrics.

OBSERVE TAP ATTACH tap_command_set custom_metric_attachment lua_custom_metric_function

Expanding to Full Observability Pipelines

The "Data Collection" phase provides a foundational layer that enables efficient data capture and custom processing, forming a base that can later support advanced capabilities like time-series aggregation, complex pipelines, and seamless integration with external observability tools.
By leveraging the "Data Collection" phase, the system can evolve into a robust observability framework that meets diverse use cases while maintaining scalability. This phased approach ensures incremental development without overburdening the initial implementation.

The "Data Analysis" phase is discussed in a separate GitHub Issue: <todo(mwarzynski): create a design for the data analysis, create a GitHub issue, and paste the URL here>.


Thanks @allenss-amazon for all the valuable feedback and proposal ideas!

@mwarzynski mwarzynski changed the title [NEW] OBSERVE command for enhanced observability in Valkey [NEW] OBSERVE command for enhanced observability in Valkey: Data Collection Nov 25, 2024
@zioproto
Copy link

Hello Folks,
I read the discussion is about creating new Valkey commands to get data, like OBSERVE.

For metrics to be usable it is of fundamental importance to have a standard interface to access those metrics.

What is the plan ?
Implementing a Prometheus scraping endpoint ?
Or to instrument the code to emit Open Telemetry traces ?

I read the all GitHub issue but the above is not clear to me.

I believe community users today rely on redis_exporter but this is kind of a workaround, and it would be necessary to have an officially supported solution.

Thanks

@allenss-amazon
Copy link
Member

I think this is a good start, the kinds of things that I'd like to see addressed in the next level of refinement is to:

  1. Let's specific about the commands themselves. You have an attachment name specified, that suggests that you can have multiple attachments to a single tap in existence simultaneously -- correct? If so, what is the relationship between these? For example, are the attachment names tap-specific or is there a global namespace of attachment names? Is there some specific relationship between multiple attachments to the same tap -- for example, can attachment Test #1 pass information to attachment Updated security.md #2, and how would that work.
  2. For each tap point, we should get specific about exactly what information is passed into the LUA script.
  3. What happens at a tap if there's no lua script, i.e., an attachment w/o a LUA script (since it appears to be optional).

In view of @zioproto's comments, I suspect that retrieving the collected data through the mainthread, i.e., by executing a command, is not the right approach for two reasons. (1) Heisenbergian phenomenon. (2) Valkey native data formats are foreign to standard observability tools, meaning that additional conversion logic has to be provided, which just makes it harder to adopt. I believe that directly supporting more standard data formats would make this more useful.

When thinking about the implementation. If we adopt @zioproto's comments and target something like supporting a direct Prometheus compatible HTTP(s) endpoint for data collection, I think we can use this to drive a better implementation, i.e., more light weight. Specifically, processing a non-Valkey format I/O interface is best done with on a separate port using more standard HTTP-ish interface -- which is best implemented with a background thread. What if we made the background thread entirely responsible for processing of data. In other words, the mainthread implementation of a tap is solely to capture some relevant information and put it into a queue for background consumption (some kind of lock-less message queue, like a ring buffer). All of the actual processing of the data, (e.g., LUA script consumption) would be done in the background thread. Something like this would be lightweight enough that it could easily be left on in most production environments with negligible impact on normal mainthread processing.

What do you think about this?

@mwarzynski
Copy link
Author

mwarzynski commented Nov 29, 2024

@allenss-amazon Thank you for your detailed feedback and thoughtful points! Below are responses to each aspect of your comments.


Specifics of Taps and Tap Attachments

Let's [be] specific about the commands themselves.

You’re correct that the design allows multiple attachments to a single tap. Let’s clarify and refine the specifics:

  • Tap Attachment Namespace will be global, requiring uniqueness across all taps and tap attachments. Tap Attachment is identified by a (string) name. The Tap Attachment name is unique across the entire cluster (all instances) and for all clients. This ensures predictable outcomes and avoids confusion when identifying the source of data solely based on the Tap Attachment name. In the future, we could consider changing the Attachment Namespace to be separate per Tap, as it wouldn't break any previously used Tap Attachment distributions (i.e., it would allow for more options).

  • Relationship between Taps and Tap Attachments:

    • Attachments currently operate independently by design. A Tap can have multiple, separate Tap Attachments. Each Tap Attachment is attached to a single Tap.
    • Tap Attachments cannot pass information to another Tap Attachment.

This approach focuses on simplicity with a room for future extensibility. What are your thoughts?


Lua Script Argument Specification

For each tap point, we should get specific about exactly what information is passed into the LUA script.

Each tap emits a different type of data unit, which can then be processed by a Lua script that conforms to a defined function signature. Here’s how we could handle this:

  • Structured Schema: Each Tap outputs a structured schema tailored to its data. For example (likely not representative of the actual implementation):

    • tap_command_get: { "latency": 5, "key": "key1", "response_size": 1024 }
    • tap_event_errors: { "component": "housekeeping", "message": "Timeout while cleaning up internal hashmap occurred." }
  • Lua Script Interaction: The data unit would be passed into the Lua script as a Lua object. The Lua script would:

    • Indicate whether to filter out the data unit (e.g., for sampling purposes).
    • Optionally modify the data unit content (e.g., add computed fields or edit existing ones).

To validate this approach, I plan to implement a proof of concept. Since you have more experience with running Lua scripts in Valkey, I’d appreciate your input or suggestions for improvement. I’m also open to adjusting the proposal if needed.

It won't be easy and I might have a better proposal after the prototype implementation. There will be many challenges I didn't think about yet, like e.g. validation of the provided Lua code by users.


Tap Attachment Behavior Without Lua Scripts

What happens at a tap if there's no lua script, i.e., an attachment w/o a LUA script (since it appears to be optional).

Attachments without Lua scripts are valid and will function as passthrough mechanisms. The data collected from the Tap will flow directly to the output mechanism without custom processing. This could be useful for users who want raw data or don’t need any filtering, custom processing or sampling. In other words, the default behavior for no Lua script will be to emit raw Tap data as-is.


Data Retrieval via Commands vs. Standard Observability Formats

I suspect that retrieving the collected data through the mainthread, i.e., by executing a command, is not the right approach.

You’ve raised excellent points regarding performance, the Heisenbergian phenomenon, usability, and integration with standard observability tools. Here’s my thinking:

  • Command-Based Retrieval: My current proposal uses Valkey commands (e.g., OBSERVE RETRIEVE) to retrieve data. This avoids requiring additional infrastructure configuration and simplifies deployment in cloud environments like AWS or Azure, where exposing additional ports or protocols can be restrictive. With this approach, using the OBSERVE functionality on AWS might be actually feasible.

  • Alternative Approaches:

    • Prometheus-Compatible HTTP Endpoint: Implement an HTTP(s) endpoint to expose data in Prometheus (OpenMetrics) format. This would align with open observability standards and simplify integration with tools like Grafana. I think it would be doable only after the second phase with data analysis capabilities. (See the 'Prometheus and Aggregation' section a few lines below.)
    • Dual Mechanism: Support both command-based retrieval for cloud-friendly use cases and an HTTP endpoint for standardised observability setups.
  • Current Proposal: For the initial scope, I propose focusing on command-based retrieval to deliver an MVP quickly. Afterward, we can explore adding Prometheus-compatible endpoints as another part after the second phase (data analysis).

Prometheus and Aggregation

Prometheus primarily works with aggregated data, while our current design outputs raw, non-aggregated data. To bridge this gap in the initial phase, users could deploy a separate collector service to:

  1. Set up Tap Attachments.
  2. Retrieve output data from Valkey.
  3. Aggregate the data.
  4. Expose results in OpenMetrics format via an HTTP endpoint.

Once the second phase (data analysis) is complete, we can revisit this and consider native OpenMetrics HTTP integration.

(cc: @zioproto)

@allenss-amazon What do you think? Is AWS going to allow us to use the OBSERVE functionality through the commands (e.g. is there some kind of a command whitelist)? Would AWS allow the Clients to have access to another external port in order to fetch the data?

Apologies in advance, because in this section I was focused more on the topic of 'is this feature going to be usable on major clouds' rather then the excellent angles you provided in your comment.


Background Thread for Data Processing

What if we made the background thread entirely responsible for processing of data?

This is a great suggestion, and I agree we should aim for negligible performance impact. I need more time to explore this aspect in detail, but in general I like your proposal -- yes, we could use a queue to offload data capture from the main thread to a background thread. The background thread would handle Lua script execution and data output.

I need to further explore the practicality and trade-offs and implementation feasibility of these approaches. Your input or pointers to similar Valkey implementations would be invaluable for guiding this process.


@allenss-amazon Thank you again for your insights! Looking forward to your thoughts on these approaches. I am open to any suggestions you might have.

@allenss-amazon
Copy link
Member

I'm skeptical that having a separate LUA instance for each tap attachment is the best approach. I can imagine situations where the desired metrics are synthesized by correlating events from different taps and then aggregated. With separate LUA instances you can't do that.

Seems to me that once you let something like LUA into the picture it really wants to absorb as much of your logic as possible, so that flexibility is maintained. In other words, the hard coded functionality in Valkey should be associated with how to deliver to LUA the right information (taps, tap attachments) but that LUA is in charge of consuming that data and maintaining whatever correlations, aggregations, etc. are desired. Then all that's required is some way to retrieve information from LUA.

In this model, a "tap attachment" is really a LUA workspace. When you attach the workspace to a tap, events from that tap are delivered to the workspace for processing. It's the workspace's job to do that processing (in the background), maintaining any global data structures it sees fit to maintain. Then all that's required is some way to retrieve results. No reason we can't have multiple retrieval mechanisms, i.e., a command to start with and maybe some background HTTP-ish thing later. A retrieval is simply an invokation of a LUA routine that delivers a set of results which are then output.

With this framework, what's required in the spec is to get specific with the details of delivering events to a lua workspace. Some description of how to limit the resources consumable by a LUA workspace (how to prevent consuming too much memory, how to get out of infinite loops, etc.), and the retrieval mechanism -- initially the retrieve subcommand you have.

@ranshid
Copy link
Member

ranshid commented Jan 7, 2025

I am a bit late for this long discussion and tried to catchup with the long thread. Overall I do agree we should invest in better observably tooling support but I am not sure the data ingestion should really be a core component.
I agree that "tapping" points are a requirement but I think this should probably finalize the core side of the observability.
Lets take histograms for example. In order to manage a histogram over some metric it should be defined and managed by the core (eg HDR histogram) in order to allow sampling by a TAP observer.
I agree with @allenss-amazon that performance should be carefully considered, but I also think that this will be mainly dependent on the metric data types implementation.
The way I imagine it is that we could offer a lightweight module API to tap into different execution points in the core.
For example we could offer an API for each command execution start and end or (in order to simplify blocking commands tracing) we could just offer a single module API tap for command execution end. We could also offer tracing for more core components such as defrag cycles start and end,
I think modern tools already offer different data types to collect metric data and these can be managed by external modules feeding them when the module API is called.

@ranshid
Copy link
Member

ranshid commented Jan 7, 2025

Also @mwarzynski do we plan to continue the discussion over the proposed RFC? I think it is better to do so.

@eifrah-aws
Copy link
Contributor

eifrah-aws commented Jan 7, 2025

@mwarzynski this is great initiative! I had similar discussions with @ranshid but you beat me to it 😄

A general comment:

I think that any data collection implemented within Valkey, should be based on Open Telemetry (especially using the "Collector" design approach which decouples the backend from the data collection)

Open Telemetry guidelines for Redis / Valkey can be found: (s/redis/valkey/g)
https://github.com/open-telemetry/semantic-conventions/blob/main/docs/database/redis.md
The advantage is that some of Valkey users are already using Open Telemetry, so data collected in the same format can enrich the data they already have (using same attributes)

A note about the implementation:

Ideally, this should be outsourced to a module (preferably written in Rust - which supports multiple crates for open-telemetry, so histograms, spans, counters etc are already supported which leaves us to focus on the data collection points only)

Obviously, Valkey's module API should be extended to support your complete vision, but we already have a good starting point (For example, using RegisterCommandFilter API which uses the lightweight context ValkeyModuleCommandFilterCtx could be the infrastructure for implementing point 7 of your proposal: "Distribution of key and value sizes")

IMO, the advantages of implementing this as a core module outweigh the disadvantages. For example:

  • Any unexpected performance impact of the data collection, can be immediately mitigated by unloading the module (this is better than just "turning it off")
  • Any crashes can be immediately mitigated by unloading the module

@asafpamzn asafpamzn added the client-changes-needed Client changes are required for this feature label Jan 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
client-changes-needed Client changes are required for this feature
Projects
None yet
Development

No branches or pull requests

7 participants