Skip to content

Commit

Permalink
[GOBBLIN-1978] Initialize dag action store monitor metrics (#3851)
Browse files Browse the repository at this point in the history
* Create dagAction handling metrics when monitor is initialized

* Add warning comment

* Initialize all metrics and context earlier

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Dec 22, 2023
1 parent 18fba9e commit e392c97
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public abstract class HighLevelConsumer<K,V> extends AbstractIdleService {
protected Counter messagesRead;
@Getter
private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
private final ScheduledExecutorService consumerExecutor;
protected final ScheduledExecutorService consumerExecutor;
private final ExecutorService queueExecutor;
private final BlockingQueue[] queues;
private ContextAwareGauge[] queueSizeGauges;
Expand Down Expand Up @@ -253,7 +253,7 @@ protected void startUp() {
* Note: All records from a KafkaPartition are added to the same queue.
* A queue can contain records from multiple partitions if partitions > numThreads(queues)
*/
private void consume() {
protected void consume() {
try {
Iterator<KafkaConsumerRecord> itr = gobblinKafkaConsumerClient.consume();
// TODO: we may be committing too early and only want to commit after process messages
Expand All @@ -275,7 +275,7 @@ private void consume() {
* Assigns a queue to each thread of the {@link #queueExecutor}
* Note: Assumption here is that {@link #numThreads} is same a number of queues
*/
private void processQueues() {
protected void processQueues() {
for(BlockingQueue queue : queues) {
queueExecutor.execute(new QueueProcessor(queue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagManager dagMa
this.orchestrator = orchestrator;
this.dagActionStore = dagActionStore;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;

/*
Metrics need to be created before initializeMonitor() below is called (or more specifically handleDagAction() is
called on any dagAction)
*/
buildMetricsContextAndMetrics();
}

@Override
Expand All @@ -136,6 +142,23 @@ protected void initializeMonitor() {
}
}

/*
Override this method to do the same sequence as the parent class, except create metrics. Instead, we create metrics
earlier upon class initialization because they are used immediately as dag actions are loaded and processed from
the DagActionStore.
*/
@Override
protected void startUp() {
// Method that starts threads that processes queues
processQueues();
// Main thread that constantly polls messages from kafka
consumerExecutor.execute(() -> {
while (!shutdownRequested) {
consume();
}
});
}

@Override
/*
This class is multithreaded and this method will be called by multiple threads, however any given message will be
Expand Down

0 comments on commit e392c97

Please sign in to comment.