Skip to content

Commit

Permalink
Adding Kafka Streams native hints (#422)
Browse files Browse the repository at this point in the history
* Adding Kafka Streams native hints

 - Add the initial set of Kafka Streams hints required
   for running a basic Kafka Streams application

 - Add a basic end-to-end test that uses both KStream and KTable
   types and verify that it works as a native application

* Refining metadata for kafka-streams

* Add missing condition fields in reflect-config.json

* Add missing condition fields in jni-config and resource-config json files

* Add missing reachable packages for RocksDB
  • Loading branch information
sobychacko authored Nov 30, 2023
1 parent ca92f1a commit 8d60c36
Show file tree
Hide file tree
Showing 17 changed files with 438 additions and 0 deletions.
4 changes: 4 additions & 0 deletions metadata/index.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@
"directory" : "org.apache.kafka/kafka-clients",
"module" : "org.apache.kafka:kafka-clients"
}, {
"allowed-packages" : [ "org.apache.kafka", "org.rocksdb" ],
"directory" : "org.apache.kafka/kafka-streams",
"module" : "org.apache.kafka:kafka-streams"
},{
"allowed-packages" : [ "org.apache.tomcat", "org.apache.catalina" ],
"directory" : "org.apache.tomcat.embed/tomcat-embed-core",
"module" : "org.apache.tomcat.embed:tomcat-embed-core"
Expand Down
5 changes: 5 additions & 0 deletions metadata/org.apache.kafka/kafka-streams/3.5.1/index.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[
"reflect-config.json",
"resource-config.json",
"jni-config.json"
]
20 changes: 20 additions & 0 deletions metadata/org.apache.kafka/kafka-streams/3.5.1/jni-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"name": "org.rocksdb.Status",
"condition": {
"typeReachable": "org.rocksdb.RocksDB"
},
"allDeclaredMethods": true,
"allDeclaredConstructors": true,
"allDeclaredFields": true
},
{
"name": "org.rocksdb.RocksDBException",
"condition": {
"typeReachable": "org.rocksdb.RocksDB"
},
"allDeclaredMethods": true,
"allDeclaredConstructors": true,
"allDeclaredFields": true
}
]
61 changes: 61 additions & 0 deletions metadata/org.apache.kafka/kafka-streams/3.5.1/reflect-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[
{
"name": "org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor",
"condition": {
"typeReachable": "org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration"
},
"allDeclaredMethods": true,
"allPublicConstructors": true
},
{
"name": "org.apache.kafka.streams.errors.DefaultProductionExceptionHandler",
"condition": {
"typeReachable": "org.apache.kafka.streams.StreamsConfig"
},
"allDeclaredMethods": true,
"allPublicConstructors": true
},
{
"name": "org.apache.kafka.streams.errors.LogAndFailExceptionHandler",
"condition": {
"typeReachable": "org.apache.kafka.streams.StreamsConfig"
},
"allDeclaredMethods": true,
"allPublicConstructors": true
},
{
"name": "org.apache.kafka.streams.processor.FailOnInvalidTimestamp",
"condition": {
"typeReachable": "org.apache.kafka.streams.StreamsConfig"
},
"allDeclaredMethods": true,
"allPublicConstructors": true
},
{
"name":"org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier",
"condition": {
"typeReachable": "org.apache.kafka.streams.StreamsConfig"
},
"allDeclaredMethods": true,
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"org.apache.kafka.streams.processor.internals.StateDirectory$StateDirectoryProcessFile",
"condition": {
"typeReachable": "org.apache.kafka.streams.processor.internals.StateDirectory"
},
"allDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true,
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor",
"condition": {
"typeReachable": "org.apache.kafka.streams.StreamsConfig"
},
"allDeclaredMethods": true,
"allPublicConstructors": true,
"methods":[{"name":"<init>","parameterTypes":[] }]
}
]
16 changes: 16 additions & 0 deletions metadata/org.apache.kafka/kafka-streams/3.5.1/resource-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"resources":{
"includes":[{
"pattern":"\\Qkafka/kafka-streams-version.properties\\E",
"condition": {
"typeReachable": "org.apache.kafka.streams.internals.metrics.ClientMetrics"
}
},
{
"pattern": "\\Qlibrocksdbjni-\\E.*",
"condition": {
"typeReachable": "org.rocksdb.RocksDB"
}
}]},
"bundles":[]
}
10 changes: 10 additions & 0 deletions metadata/org.apache.kafka/kafka-streams/index.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"latest": true,
"metadata-version": "3.5.1",
"module": "org.apache.kafka:kafka-streams",
"tested-versions": [
"3.5.1"
]
}
]
6 changes: 6 additions & 0 deletions tests/src/index.json
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@
"versions" : [ "3.5.1" ]
} ]
}, {
"test-project-path" : "org.apache.kafka/kafka-streams/3.5.1",
"libraries" : [ {
"name" : "org.apache.kafka:kafka-streams",
"versions" : [ "3.5.1" ]
} ]
},{
"test-project-path" : "org.apache.tomcat.embed/tomcat-embed-core/10.0.20",
"libraries" : [ {
"name" : "org.apache.tomcat.embed:tomcat-embed-core",
Expand Down
4 changes: 4 additions & 0 deletions tests/src/org.apache.kafka/kafka-streams/3.5.1/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
gradlew.bat
gradlew
gradle/
build/
35 changes: 35 additions & 0 deletions tests/src/org.apache.kafka/kafka-streams/3.5.1/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright and related rights waived via CC0
*
* You should have received a copy of the CC0 legalcode along with this
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
*/

plugins {
id "org.graalvm.internal.tck"
}

String libraryVersion = tck.testedLibraryVersion.get()

dependencies {
testImplementation "org.apache.kafka:kafka-streams:$libraryVersion"
testImplementation "org.apache.kafka:kafka-streams:$libraryVersion:test"
testImplementation "org.apache.kafka:kafka-clients:$libraryVersion"
testImplementation "org.apache.kafka:kafka-clients:$libraryVersion:test"
testImplementation 'org.assertj:assertj-core:3.22.0'
testImplementation 'ch.qos.logback:logback-classic:1.4.5'
testImplementation 'org.apache.kafka:kafka_2.13:3.5.1'
testImplementation 'org.apache.kafka:kafka_2.13:3.5.1:test'
}

graalvmNative {
agent {
defaultMode = "conditional"
modes {
conditional {
userCodeFilterPath = "metadata-user-code-filter.json"
extraFilterPath = "metadata-extra-filter.json"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
library.version = 3.5.1
metadata.dir = org.apache.kafka/kafka-streams/3.5.1/
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rules": [
{"includeClasses": "org.apache.kafka.**"}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"rules": [
{"includeClasses": "org.apache.kafka.**"}
]
}
13 changes: 13 additions & 0 deletions tests/src/org.apache.kafka/kafka-streams/3.5.1/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pluginManagement {
def tckPath = Objects.requireNonNullElse(
System.getenv("GVM_TCK_TCKDIR"),
"../../../../tck-build-logic"
)
includeBuild(tckPath)
}

plugins {
id "org.graalvm.internal.tck-settings" version "1.0.0-SNAPSHOT"
}

rootProject.name = 'org.apache.kafka.kafka-streams_tests'
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright and related rights waived via CC0
*
* You should have received a copy of the CC0 legalcode along with this
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
*/
package org_apache_kafka.kafka_streams;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaStreamsTest {

private static final Logger logger = LoggerFactory.getLogger("KafkaStreamsTest");

private static final String KAFKA_SERVER = "localhost:9092";

private EmbeddedZookeeper zookeeper;

private KafkaServer kafkaServer;

@BeforeAll
void beforeAll() {
zookeeper = new EmbeddedZookeeper();
logger.info("Embedded zookeeper started");
Properties brokerProperties = new Properties();
brokerProperties.setProperty("zookeeper.connect", "localhost:" + zookeeper.port());
brokerProperties.setProperty("log.dirs", TestUtils.tempDir().getPath());
brokerProperties.setProperty("listeners", "PLAINTEXT://" + KAFKA_SERVER);
brokerProperties.setProperty("offsets.topic.replication.factor", "1");
kafkaServer = TestUtils.createServer(new KafkaConfig(brokerProperties), Time.SYSTEM);
logger.info("Embedded kafka server started");
}

@AfterAll
void afterAll() {
if (kafkaServer != null) {
kafkaServer.shutdown();
logger.info("Embedded kafka server stopped");
}
if (zookeeper != null) {
zookeeper.shutdown();
logger.info("Embedded zookeeper stopped");
}
}

@Test
void testBasicKafkaStreamsProcessor() throws Exception {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://" + KAFKA_SERVER);
try (Admin admin = Admin.create(properties)) {
NewTopic newTopic = new NewTopic("user-regions", 1, (short) 1);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
result.values().get("user-regions").get();

NewTopic newTopic1 = new NewTopic("large-regions", 1, (short) 1);
CreateTopicsResult result1 = admin.createTopics(Collections.singleton(newTopic1));
result1.values().get("large-regions").get();
}

final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-region-lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "user-region-lambda-example-client");
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
// Specify default (de)serializers for record keys and for record values.
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Records should be flushed every second. This is less than the default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1 * 1000);

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

final StreamsBuilder builder = new StreamsBuilder();

final KTable<String, String> userRegions = builder.table("user-regions");

// Aggregate the user counts of by region
final KTable<String, Long> regionCounts = userRegions
// Count by region;
.groupBy((userId, region) -> KeyValue.pair(region, region))
.count();

final KStream<String, Long> regionCountsForConsole = regionCounts
.toStream()
.filter((regionName, count) -> count != null);

// write to the result topic, we need to override the value serializer to for type long
regionCountsForConsole.to("large-regions", Produced.with(stringSerde, longSerde));

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

streams.cleanUp();
streams.start();


Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 50);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
producer.send(new ProducerRecord<>("user-regions", 0, "alice", "asia")).get();
producer.send(new ProducerRecord<>("user-regions", 0, "bob", "europe")).get();
}

List<Map<String, Long>> receivedMessages = new ArrayList<>();

Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(List.of("large-regions"));
long end = System.currentTimeMillis() + 30000L;
while (receivedMessages.size() < 2 && System.currentTimeMillis() < end) {
ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Long> record : records) {
receivedMessages.add(Map.of(record.key(), record.value()));
}
}
}
List<Map<String, Long>> expectedContent = List.of(Map.of("asia", 1L), Map.of("europe", 1L));
assertThat(receivedMessages)
.hasSize(2)
.containsExactly(expectedContent.get(0), expectedContent.get(1));
}
}
Loading

0 comments on commit 8d60c36

Please sign in to comment.