Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Timer Batch Limit in DoFnOp #139

Open
wants to merge 4 commits into
base: li_trunk
Choose a base branch
from

Conversation

FuRyanf
Copy link

@FuRyanf FuRyanf commented Jan 16, 2025

Description

This PR introduces a configurable timer batch limit (TIMER_BATCH_LIMIT) in DoFnOp to address memory issues observed in high-scale scenarios where multiple timers fire simultaneously. In the current implementation, all ready timers are fired in a single loop without batching, leading to excessive memory usage and out-of-memory (OOM) errors. The key changes in this PR are:

  1. Batch Timer Processing:

    • Added a configurable TIMER_BATCH_LIMIT parameter (beam.samza.dofnop.timerBatchLimit).
    • Updated the doProcessWatermark method to process timers in chunks, limiting the number of timers fired in a single loop to reduce memory spikes.
    • This ensures incremental processing of timers, reducing memory usage and improving system stability.
  2. Implementation Details:

    • A default limit of Integer.MAX_VALUE is used if no configuration is provided, maintaining current behavior for backward compatibility.
    • Timer processing is divided into batches, where each batch processes up to TIMER_BATCH_LIMIT timers. After each batch, outputs are emitted and memory is cleared.
  3. Code Changes:

    • Introduced TIMER_BATCH_LIMIT_CONFIG for configurable timer batch limits.
    • Modified doProcessWatermark to iterate over timers in chunks, respecting the batch limit.

Addresses

This PR addresses memory issues caused by the unbounded accumulation of timer outputs during high-scale workloads in DoFnOp.

Testing

  • build

@github-actions github-actions bot added the build label Jan 16, 2025
if (!readyTimers.isEmpty()) {
// Process timers in chunks
Iterator<? extends KeyedTimerData<?>> timerIterator = readyTimers.iterator();
while (timerIterator.hasNext()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might not work as expected. Even the timeres are processed in chunks, the output will be collected for all the trunks, since they are processed in a single Samza flatmap function. Seems to me we have to break this DoFnOp into two steps, first is to emit the timers in batches, and then the second is to process the timers in batches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants