Skip to content

Commit

Permalink
build repartition commands
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Feb 14, 2025
1 parent 77d6d48 commit 509fb40
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 129 deletions.
1 change: 1 addition & 0 deletions schemas/internal/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ message AggregateTaskMetrics {

message AggregateMetrics {
repeated RepartitionWindowedMetric windowed_metrics = 1;
TenantId tenant_id = 2;
}

message WfMetricUpdate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@

import com.google.protobuf.Message;
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.model.getable.objectId.TenantIdModel;
import io.littlehorse.common.model.repartitioncommand.RepartitionSubCommand;
import io.littlehorse.common.proto.AggregateMetrics;
import io.littlehorse.common.proto.RepartitionWindowedMetric;
import io.littlehorse.sdk.common.exception.LHSerdeError;
import io.littlehorse.server.streams.stores.TenantScopedStore;
import io.littlehorse.server.streams.topology.core.ExecutionContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.Getter;
import org.apache.kafka.streams.processor.api.ProcessorContext;

public class AggregateMetricsModel extends LHSerializable<AggregateMetrics> implements RepartitionSubCommand {

@Getter
private TenantIdModel tenantId;
private List<RepartitionWindowedMetricModel> windowedMetrics = new ArrayList<>();

public AggregateMetricsModel() {}

public AggregateMetricsModel(List<RepartitionWindowedMetricModel> windowedMetrics) {
public AggregateMetricsModel(TenantIdModel tenantId, List<RepartitionWindowedMetricModel> windowedMetrics) {
this.windowedMetrics = windowedMetrics;
this.tenantId = tenantId;
}

@Override
Expand All @@ -30,7 +32,9 @@ public AggregateMetrics.Builder toProto() {
.map(RepartitionWindowedMetricModel::toProto)
.map(RepartitionWindowedMetric.Builder::build)
.toList();
return AggregateMetrics.newBuilder().addAllWindowedMetrics(windowedMetricsPb);
return AggregateMetrics.newBuilder()
.addAllWindowedMetrics(windowedMetricsPb)
.setTenantId(tenantId.toProto());
}

@Override
Expand All @@ -39,6 +43,15 @@ public void initFrom(Message proto, ExecutionContext context) throws LHSerdeErro
this.windowedMetrics = p.getWindowedMetricsList().stream()
.map(pb -> LHSerializable.fromProto(pb, RepartitionWindowedMetricModel.class, context))
.toList();
this.tenantId = LHSerializable.fromProto(p.getTenantId(), TenantIdModel.class, context);
}

public List<RepartitionWindowedMetricModel> getWindowedMetrics() {
return Collections.unmodifiableList(windowedMetrics);
}

public void addWindowedMetric(List<RepartitionWindowedMetricModel> windowedMetrics) {
this.windowedMetrics.addAll(windowedMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.protobuf.Message;
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.model.AbstractGetable;
import io.littlehorse.common.model.AggregateMetricsModel;
import io.littlehorse.common.model.CoreGetable;
import io.littlehorse.common.model.RepartitionWindowedMetricModel;
import io.littlehorse.common.model.getable.objectId.MetricIdModel;
Expand Down Expand Up @@ -118,14 +117,14 @@ Set<PartitionWindowedMetricModel> getActiveWindowedMetrics() {
return activeWindowedMetrics;
}

public Optional<AggregateMetricsModel> buildRepartitionCommand(LocalDateTime currentTime) {
public List<RepartitionWindowedMetricModel> buildRepartitionCommand(LocalDateTime currentTime) {
if (activeWindowedMetrics.isEmpty()) {
return Optional.empty();
return List.of();
}
List<RepartitionWindowedMetricModel> windowedMetrics =
activeWindowedMetrics.stream().map(this::toRepartitionMetric).toList();
activeWindowedMetrics.removeIf(windowedMetric -> windowedMetric.windowClosed(windowLength, currentTime));
return Optional.of(new AggregateMetricsModel(windowedMetrics));
return windowedMetrics;
}

private RepartitionWindowedMetricModel toRepartitionMetric(PartitionWindowedMetricModel windowedMetric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,8 @@ public int compareTo(PartitionWindowedMetricModel o) {

boolean windowClosed(Duration windowLength, LocalDateTime currentTime) {
long elapsed = Duration.between(windowStart, currentTime).toMillis();
boolean closed = elapsed > windowLength.toMillis();
if (closed) {
log.info("Window closed, creating a new one. Elapsed time: {} ms", elapsed);
}
return closed;

return elapsed > windowLength.toMillis();
}

public void increment() {
Expand Down
181 changes: 181 additions & 0 deletions server/src/main/java/io/littlehorse/common/proto/AggregateMetrics.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 509fb40

Please sign in to comment.