Skip to content

Commit

Permalink
Merge branch 'release/1.0.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
thinker0 committed May 2, 2020
2 parents 02a5c12 + bfba9a2 commit 3f7b13f
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 6 deletions.
7 changes: 7 additions & 0 deletions .gitignroe
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

pom.xml.releaseBackup
release.properties

target/
.idea/
*.iml
2 changes: 1 addition & 1 deletion heron-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>heron-3party</artifactId>
<groupId>com.github.thinker0.heron</groupId>
<version>1.0.1-SNAPSHOT</version>
<version>1.0.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion heron-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.github.thinker0.heron</groupId>
<artifactId>heron-3party</artifactId>
<version>1.0.1-SNAPSHOT</version>
<version>1.0.1</version>
<relativePath>..</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.heron.kafka.spout;

import static java.lang.String.format;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -40,6 +43,7 @@
import org.apache.heron.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.heron.kafka.spout.internal.OffsetManager;
import org.apache.heron.kafka.spout.internal.Timer;
import org.apache.heron.kafka.spout.metrics.KafkaMetricDecorator;
import org.apache.heron.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.heron.kafka.spout.subscription.TopicAssigner;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -48,6 +52,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
Expand All @@ -64,6 +69,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
//Initial delay for the commit and assignment refresh timers
public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private int metricsIntervalInSecs = 60;
private long previousKafkaMetricsUpdatedTimestamp = 0;

// Storm
protected SpoutOutputCollector collector;
Expand All @@ -73,6 +80,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private final ConsumerFactory<K, V> kafkaConsumerFactory;
private final TopicAssigner topicAssigner;
private transient Consumer<K, V> consumer;
private transient Set<MetricName> reportedMetrics;

// Bookkeeping
// Strategy to determine the fetch offset of the first realized by the spout upon activation
Expand Down Expand Up @@ -130,12 +138,13 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
metricsIntervalInSecs = kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs();

offsetManagers = new ConcurrentHashMap<>();
emitted = new HashSet<>();
waitingToEmit = new ConcurrentHashMap<>();
commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());

reportedMetrics = new LinkedHashSet<>();
rebalanceListener = new KafkaSpoutConsumerRebalanceListener();

consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
Expand Down Expand Up @@ -164,6 +173,32 @@ private boolean canRegisterMetrics() {
return true;
}

private void registerConsumerMetrics() {
consumer.metrics().forEach((metricName, o) -> {
if (!reportedMetrics.contains(metricName)) {
reportedMetrics.add(metricName);
String exposedName = extractKafkaMetricName(metricName);
LOG.info("register Kafka Consumer metric {}", exposedName);
context.registerMetric(exposedName, new KafkaMetricDecorator<>(o),
metricsIntervalInSecs);
}
});
}

private String extractKafkaMetricName(MetricName metricName) {
StringBuilder builder = new StringBuilder()
.append("kafkaConsumer-")
.append(metricName.name())
.append('/')
.append(metricName.group())
.append('/');
List<String> tags = new ArrayList<>();
metricName.tags().forEach((s, s2) -> tags.add(format("%s-%s", s, s2)));
builder.append(String.join("/", tags));
LOG.debug("register Kafka Consumer metric {}", builder);
return builder.toString();
}

private boolean isAtLeastOnceProcessing() {
return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
}
Expand Down Expand Up @@ -370,6 +405,12 @@ private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePar
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
// Register KafkaConsumer metrics
if (System.currentTimeMillis() - previousKafkaMetricsUpdatedTimestamp
> metricsIntervalInSecs * 1000) {
registerConsumerMetrics();
previousKafkaMetricsUpdatedTimestamp = System.currentTimeMillis();
}
return consumerRecords;
} finally {
consumer.resume(pausedPartitions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.heron.kafka.spout.metrics;

import org.apache.heron.api.metric.IMetric;
import org.apache.kafka.common.Metric;

/**
* a decorator to convert a Kafka Metric to a Heron Metric so that Kafka
* metrics can be exposed via Heron Metrics Manager
*
* @param <M> the Kafka Metric type
*/
public class KafkaMetricDecorator<M extends Metric> implements IMetric<Object> {
private M metric;

public KafkaMetricDecorator(M metric) {
this.metric = metric;
}

@Override
public Object getValueAndReset() {
return metric.metricValue();
}
}
29 changes: 26 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>com.github.thinker0.heron</groupId>
<artifactId>heron-3party</artifactId>
<version>1.0.1-SNAPSHOT</version>
<version>1.0.1</version>
<packaging>pom</packaging>

<modules>
Expand All @@ -25,12 +25,14 @@

<slf4j.version>1.7.26</slf4j.version>

<storm.kafka.client.version>2.2.2</storm.kafka.client.version>
<kafka.client.version>2.5.0</kafka.client.version>
<jackson.version>2.9.8</jackson.version>
<jackson.version.databind>${jackson.version}</jackson.version.databind>

<heron.version>0.20.1-incubating</heron.version>
<commons-lang.version>2.6</commons-lang.version>

<gitflow.version>1.14.0</gitflow.version>
</properties>

<licenses>
Expand Down Expand Up @@ -69,7 +71,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<version>${kafka.client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -194,6 +196,27 @@
</execution>
</executions>
</plugin>
<plugin>
<!-- https://github.com/aleksandr-m/gitflow-maven-plugin -->
<groupId>com.amashchenko.maven.plugin</groupId>
<artifactId>gitflow-maven-plugin</artifactId>
<version>${gitflow.version}</version>
<configuration>
<installProject>false</installProject>
<verbose>false</verbose>
<allowSnapshots>true</allowSnapshots>
<gitFlowConfig>
<productionBranch>master</productionBranch>
<developmentBranch>develop</developmentBranch>
<featureBranchPrefix>feature/</featureBranchPrefix>
<releaseBranchPrefix>release/</releaseBranchPrefix>
<hotfixBranchPrefix>hotfix/</hotfixBranchPrefix>
<supportBranchPrefix>support/</supportBranchPrefix>
<versionTagPrefix>v</versionTagPrefix>
<origin>origin</origin>
</gitFlowConfig>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
9 changes: 9 additions & 0 deletions scripts/release.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

if [[ -e /usr/libexec/java_home ]]; then
export JAVA_HOME="$(/usr/libexec/java_home -v '1.8')"
fi

${JAVA_HOME}/bin/java -version

mvn -B gitflow:release-start gitflow:release-finish

0 comments on commit 3f7b13f

Please sign in to comment.