Skip to content

Commit

Permalink
W-17464266: Add Kinesis consumer test
Browse files Browse the repository at this point in the history
  • Loading branch information
peterzxu-crm committed Jan 3, 2025
1 parent befdb02 commit 00cdd4e
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 4 deletions.
3 changes: 3 additions & 0 deletions carbonj.service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ dependencies {
}
implementation group:"org.springframework.boot", name:"spring-boot-starter-actuator", version: "${springbootVersion}"
testImplementation group:"org.springframework.boot", name:"spring-boot-starter-test", version: "${springbootVersion}"
testImplementation "org.testcontainers:junit-jupiter:${testContainers}"
testImplementation "org.testcontainers:localstack:${testContainers}"
testImplementation "software.amazon.kinesis:amazon-kinesis-client:${kinesisClient}"
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void reconfigureConsumers(Set<String> newRules, Set<String> currentRules
kinesisApplicationName = kinesisApplicationNamePropValue;
}
} catch (FileNotFoundException e) {
log.warn(" config/" + consumerCfgFile + "not found in the classpath ");
log.warn(consumerCfgFile + " not found in the classpath ");
log.info(" Falling back to default values ");
} catch (Throwable e) {
log.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.demandware.carbonj.service.engine.kinesis.GzipDataPointCodec;
import com.demandware.carbonj.service.engine.kinesis.kcl.MemLeaseManager;
import com.demandware.carbonj.service.engine.recovery.*;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,16 +47,27 @@ public class KinesisConsumer extends Thread {

private Worker worker;

private PointProcessor recoveryPointProcessor;
private final PointProcessor recoveryPointProcessor;

private volatile boolean closed;

private String kinesisConsumerRegion;
private final String kinesisConsumerRegion;

private final String overrideKinesisEndpoint;

public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor,
String kinesisStreamName, String kinesisApplicationName,
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr,
Counter noOfRestarts, String kinesisConsumerRegion) {
this(metricRegistry, pointProcessor, recoveryPointProcessor, kinesisStreamName, kinesisApplicationName, kinesisConfig,
checkPointMgr, noOfRestarts, kinesisConsumerRegion, null);
}

public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProcessor, PointProcessor recoveryPointProcessor,
String kinesisStreamName, String kinesisApplicationName,
KinesisConfig kinesisConfig, CheckPointMgr<Date> checkPointMgr,
Counter noOfRestarts, String kinesisConsumerRegion,
String overrideKinesisEndpoint) {
this.metricRegistry = metricRegistry;
this.pointProcessor = Preconditions.checkNotNull(pointProcessor);
this.recoveryPointProcessor = recoveryPointProcessor;
Expand All @@ -64,6 +77,7 @@ public KinesisConsumer(MetricRegistry metricRegistry, PointProcessor pointProces
this.checkPointMgr = checkPointMgr;
this.noOfRestarts = noOfRestarts;
this.kinesisConsumerRegion = kinesisConsumerRegion;
this.overrideKinesisEndpoint = overrideKinesisEndpoint;
log.info("Kinesis consumer started");
this.start();
}
Expand Down Expand Up @@ -91,6 +105,12 @@ public void run () {
kinesisClientLibConfiguration.withMaxRecords(maxRecords);
}

// For testing only
if (!StringUtils.isEmpty(overrideKinesisEndpoint)) {
kinesisClientLibConfiguration.withKinesisEndpoint(overrideKinesisEndpoint);
kinesisClientLibConfiguration.withMetricsLevel(MetricsLevel.NONE);
}

log.info(" Kinesis Client Library started with application name " + kinesisApplicationName + " with stream "
+ kinesisStreamName + " and worker id is " + workerId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import java.util.List;

public class PointProcessorMock implements PointProcessor {
private int counter = 0;

@Override
public void process(List<DataPoint> points) {

counter += points.size();
}

@Override
Expand All @@ -38,6 +40,9 @@ public void flushAggregations(boolean force) {

@Override
public void dumpStats() {
}

public int getCounter() {
return counter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright (c) 2018, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
package com.demandware.carbonj.service.engine;

import com.codahale.metrics.MetricRegistry;
import com.demandware.carbonj.service.ns.NamespaceCounter;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.regions.Region;

import java.io.File;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;

@Testcontainers
public class TestConsumers {

@BeforeAll
static void setUp() throws Exception {
File dir = new File("./config");
if (!dir.exists()) {
dir.mkdirs();
}
FileUtils.writeLines(new File(dir, "kinesis-test-stream-consumer.conf"), List.of("foo=bar"));
}

@Test
public void testConsumers() throws Exception {
MetricRegistry metricRegistry = new MetricRegistry();
File rulesFile = new File(Objects.requireNonNull(this.getClass().getClassLoader().getResource("consumer-rules.conf")).getFile());
KinesisConfig kinesisConfig = new KinesisConfig(true, false, 60000, 60000, 60000,
1, Path.of("/tmp/checkpoint"), 60, 60, "recoveryProvider", 1, 1, 1000);
FileCheckPointMgr checkPointMgr = new FileCheckPointMgr(Path.of("/tmp/checkpoint"), 5);
Consumers consumers = new Consumers(metricRegistry, new PointProcessorMock(), new PointProcessorMock(),
rulesFile, kinesisConfig, checkPointMgr, Region.US_EAST_1.id(), new NamespaceCounter(metricRegistry, 60), new File("/tmp/sync"));
new KinesisRecordProcessorFactory(metricRegistry, new PointProcessorMock(), kinesisConfig, "test-stream");
consumers.dumpStats();
consumers.syncNamespaces();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Copyright (c) 2018, salesforce.com, inc.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
*/
package com.demandware.carbonj.service.engine;

import com.codahale.metrics.MetricRegistry;
import com.demandware.carbonj.service.engine.kinesis.DataPointCodec;
import com.demandware.carbonj.service.engine.kinesis.DataPoints;
import com.demandware.carbonj.service.engine.kinesis.GzipDataPointCodec;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

import java.lang.reflect.Field;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;

@Testcontainers
public class TestKinesisConsumer {

@Container
public static LocalStackContainer localstack = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:1.4.0")).withServices(KINESIS, CLOUDWATCH);

private static final String STREAM_NAME = "test-stream";
private static KinesisClient kinesisClient;

@BeforeAll
static void setUp() throws Exception {
setEnvironmentVariable("AWS_ACCESS_KEY_ID", "accessKey");
setEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "secretKey");

kinesisClient = KinesisClient.builder()
.endpointOverride(localstack.getEndpointOverride(KINESIS))
.region(Region.US_EAST_1)
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("accessKey", "secretKey")))
.build();

// Create the stream
kinesisClient.createStream(builder -> builder.streamName(STREAM_NAME).shardCount(1));
boolean isActive = false;
while (!isActive) {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(STREAM_NAME)
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest);

StreamStatus status = describeStreamResponse.streamDescription().streamStatus();
if (status == StreamStatus.ACTIVE) {
isActive = true;
} else {
Thread.sleep(1000);
}
}
}

@Test
public void test() throws Exception {
ListStreamsResponse listStreamsResponse = kinesisClient.listStreams();
assertEquals(1, listStreamsResponse.streamNames().size());
assertEquals(STREAM_NAME, listStreamsResponse.streamNames().get(0));

MetricRegistry metricRegistry = new MetricRegistry();
KinesisConfig kinesisConfig = new KinesisConfig(true, false, 60000, 60000, 60000,
1, Path.of("/tmp/checkpoint"), 60, 60, "recoveryProvider", 1, 1, 1000);
FileCheckPointMgr checkPointMgr = new FileCheckPointMgr(Path.of("/tmp/checkpoint"), 5);
PointProcessorMock pointProcessor = new PointProcessorMock();
KinesisConsumer kinesisConsumer = new KinesisConsumer(metricRegistry, pointProcessor, pointProcessor,
STREAM_NAME, STREAM_NAME + "-app", kinesisConfig, checkPointMgr, metricRegistry.counter("kinesis-consumer-counter"),
Region.US_EAST_1.id(), localstack.getEndpointOverride(KINESIS).toString());
Thread.sleep(30000);
int current = (int) (System.currentTimeMillis() / 1000);
DataPoints dataPoints = new DataPoints(List.of(new DataPoint("foo.bar", 123.45, current)), current);
DataPointCodec dataPointCodec = new GzipDataPointCodec();
PutRecordRequest putRecordRequest = PutRecordRequest.builder()
.streamName(STREAM_NAME)
.data(SdkBytes.fromByteArray(dataPointCodec.encode(dataPoints)))
.partitionKey("1")
.build();
kinesisClient.putRecord(putRecordRequest);
int count = 0;
while (count < 30) {
if (pointProcessor.getCounter() == 1) break;
count++;
Thread.sleep(1000);
}
assertTrue(count < 30);
kinesisConsumer.dumpStats();
kinesisConsumer.closeQuietly();
}

private static void setEnvironmentVariable(String key, String value) throws Exception {
// Use reflection to modify the internal environment map
Map<String, String> env = System.getenv();
Field field = env.getClass().getDeclaredField("m");
field.setAccessible(true);

@SuppressWarnings("unchecked")
Map<String, String> writableEnv = (Map<String, String>) field.get(env);
writableEnv.put(key, value);
}
}
1 change: 1 addition & 0 deletions carbonj.service/src/test/resources/consumer-rules.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kinesis:test-stream
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ mavenPublish=2.0.1
dependencyLicenseReport=2.9
hierynomusLicense=0.16.1
sonarqube=5.1.0.4882
testContainers=1.19.7
kinesisClient=2.6.0

0 comments on commit 00cdd4e

Please sign in to comment.