diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java index 28e49242..d3fc3452 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java @@ -178,6 +178,20 @@ private void initializeStateForPartitions(int partitionNum) { } _partitionNum.set(partitionNum); } + + private void updateStateForPartitions(int partitionNum) { + Map keyMapping = generateKeyMappings(partitionNum); + for (int partition = 0; partition < partitionNum; partition++) { + String key = keyMapping.get(partition); + /* This is what preserves sequence numbers across restarts */ + if (!_nextIndexPerPartition.containsKey(partition)) { + _nextIndexPerPartition.put(partition, new AtomicLong(0)); + _sensors.addPartitionSensors(partition); + _produceExecutor.scheduleWithFixedDelay(new ProduceRunnable(partition, key), _produceDelayMs, _produceDelayMs, TimeUnit.MILLISECONDS); + } + } + _partitionNum.set(partitionNum); + } private Map generateKeyMappings(int partitionNum) { HashMap keyMapping = new HashMap<>(); @@ -280,7 +294,7 @@ public void run() { } LOG.info("{}/ProduceService detected new partitions of topic {}", _name, _topic); //TODO: Should the ProduceService exit if we can't restart the producer runnables? - _produceExecutor.shutdown(); + /* _produceExecutor.shutdown(); try { _produceExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -295,6 +309,8 @@ public void run() { } _produceExecutor = Executors.newScheduledThreadPool(_threadsNum); initializeStateForPartitions(currentPartitionNum); + */ + updateStateForPartitions(currentPartitionNum); LOG.info("New partitions added to monitoring."); } catch (InterruptedException e) { LOG.error("InterruptedException occurred.", e);