diff --git a/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java b/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java index b927f2178..4dc6214f6 100644 --- a/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java +++ b/server/src/main/java/io/littlehorse/server/streams/topology/core/processors/CommandProcessor.java @@ -175,7 +175,7 @@ private void forwardMetricsUpdates(long timestamp) { PartitionMetricInventoryModel metricInventory = clusterStore.get( PartitionMetricInventoryModel.METRIC_INVENTORY_STORE_KEY, PartitionMetricInventoryModel.class); if (metricInventory != null) { - Map commandsPerTenant = new HashMap<>(); + Map> commandsPerTenant = new HashMap<>(); for (PartitionMetricIdModel partitionMetricId : metricInventory.getMetrics()) { TenantScopedStore tenantStore = TenantScopedStore.newInstance( nativeStore, partitionMetricId.getTenantId(), new BackgroundContext()); @@ -187,17 +187,20 @@ private void forwardMetricsUpdates(long timestamp) { List windowedMetrics = partitionMetric.buildRepartitionCommand(LocalDateTime.now()); tenantStore.put(new StoredGetable<>(partitionMetric)); - AggregateMetricsModel current = commandsPerTenant.getOrDefault( + List metricsPerTenant = + commandsPerTenant.getOrDefault(partitionMetricId.getTenantId(), new ArrayList<>()); + AggregateMetricsModel current = new AggregateMetricsModel( partitionMetricId.getTenantId(), - new AggregateMetricsModel( - partitionMetricId.getTenantId(), - partitionMetricId.getMetricId(), - new ArrayList<>(), - ctx.taskId().partition())); + partitionMetricId.getMetricId(), + new ArrayList<>(), + ctx.taskId().partition()); current.addWindowedMetric(windowedMetrics); - commandsPerTenant.putIfAbsent(partitionMetricId.getTenantId(), current); + metricsPerTenant.add(current); + commandsPerTenant.putIfAbsent(partitionMetricId.getTenantId(), metricsPerTenant); } - forwardRepartitionCommands(commandsPerTenant.values()); + forwardRepartitionCommands(commandsPerTenant.values().stream() + .flatMap(Collection::stream) + .toList()); } }