Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add regional topics to integration tests #343

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.google.pubsublite.kafka.source;

import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.ProjectPath;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
Expand All @@ -35,7 +35,8 @@ public Poller newPoller(Map<String, String> params) {
.setProject(
ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value())
.project())
.setLocation(CloudZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString()))
.setLocation(
CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString()))
.setName(
SubscriptionName.of(
config.get(ConfigDefs.SUBSCRIPTION_NAME_FLAG).value().toString()))
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/it/Base.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected static void createInstanceTemplate(
.setKey("cps_source_connector_properties_name")
.setValue(cpsSourceConnectorPropertiesGCSName)
.build())
.addItems(Items.newBuilder().setKey("psl_zone").setValue(location).build())
.addItems(Items.newBuilder().setKey("psl_location").setValue(region).build())
.addItems(
Items.newBuilder()
.setKey("psl_sink_connector_properties_name")
Expand Down
133 changes: 106 additions & 27 deletions src/test/java/it/StandaloneIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,37 +126,63 @@ public class StandaloneIT extends Base {
.setName(ReservationName.of((pslReservationId)))
.build();

private static final String kafkaPslSinkTestTopic = "psl-sink-test-topic";
private static final String kafkaPslSinkZonalTestTopic = "psl-sink-test-topic-zonal";
private static final String kafkaPslSinkRegionalTestTopic = "psl-sink-test-topic-regional";
private static final String pslSinkTopicId = "psl-sink-topic-" + runId;
private static final TopicPath pslSinkTopicPath =
private static final TopicPath pslSinkZonalTopicPath =
TopicPath.newBuilder()
.setProject(ProjectId.of(projectId))
.setLocation(CloudZone.of(CloudRegion.of(region), zone))
.setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId))
.build();
private static final TopicPath pslSinkRegionalTopicPath =
TopicPath.newBuilder()
.setProject(ProjectId.of(projectId))
.setLocation(CloudRegion.of(region))
.setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId))
.build();
private static final String pslSinkSubscriptionId = "psl-sink-subscription-" + runId;
private static final SubscriptionPath pslSinkSubscriptionPath =
private static final SubscriptionPath pslSinkZonalSubscriptionPath =
SubscriptionPath.newBuilder()
.setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId))
.setProject(ProjectId.of(projectId))
.setLocation(CloudZone.of(CloudRegion.of(region), zone))
.build();
private static final SubscriptionPath pslSinkRegionalSubscriptionPath =
SubscriptionPath.newBuilder()
.setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId))
.setProject(ProjectId.of(projectId))
.setLocation(CloudRegion.of(region))
.build();

private static final String kafkaPslSourceTestTopic = "psl-source-test-topic";
private static final String kafkaPslSourceZonalTestTopic = "psl-source-test-topic-zonal";
private static final String kafkaPslSourceRegionalTestTopic = "psl-source-test-topic-regional";
private static final String pslSourceTopicId = "psl-source-topic-" + runId;
private static final TopicPath pslSourceTopicPath =
private static final TopicPath pslSourceZonalTopicPath =
TopicPath.newBuilder()
.setProject(ProjectId.of(projectId))
.setLocation(CloudZone.of(CloudRegion.of(region), zone))
.setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId))
.build();
private static final TopicPath pslSourceRegionalTopicPath =
TopicPath.newBuilder()
.setProject(ProjectId.of(projectId))
.setLocation(CloudRegion.of(region))
.setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId))
.build();
private static final String pslSourceSubscriptionId = "psl-source-subscription-" + runId;
private static final SubscriptionPath pslSourceSubscriptionPath =
private static final SubscriptionPath pslSourceZonalSubscriptionPath =
SubscriptionPath.newBuilder()
.setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId))
.setProject(ProjectId.of(projectId))
.setLocation(CloudZone.of(CloudRegion.of(region), zone))
.build();
private static final SubscriptionPath pslSourceRegionalSubscriptionPath =
SubscriptionPath.newBuilder()
.setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId))
.setProject(ProjectId.of(projectId))
.setLocation(CloudRegion.of(region))
.build();

private static final String instanceName = "kafka-it-" + runId;
private static final String instanceTemplateName = "kafka-it-template-" + runId;
Expand Down Expand Up @@ -186,7 +212,8 @@ public static void setUp() throws Exception {
log.atInfo().log("Packaged connector jar.");
uploadGCSResources();
setupCpsResources();
setupPslResources();
setupPslZonalResources();
setupPslRegionalResources();
setupGceInstance();
}

Expand Down Expand Up @@ -245,7 +272,28 @@ protected static void setupCpsResources() throws IOException {
}
}

protected static void setupPslResources() throws Exception {
protected static void setupPslZonalResources() throws Exception {
setupPslResources(
pslSinkZonalTopicPath,
pslSourceZonalTopicPath,
pslSinkZonalSubscriptionPath,
pslSourceZonalSubscriptionPath);
}

protected static void setupPslRegionalResources() throws Exception {
setupPslResources(
pslSinkRegionalTopicPath,
pslSourceRegionalTopicPath,
pslSinkRegionalSubscriptionPath,
pslSourceRegionalSubscriptionPath);
}

protected static void setupPslResources(
TopicPath pslSinkTopicPath,
TopicPath pslSourceTopicPath,
SubscriptionPath pslSinkSubscriptionPath,
SubscriptionPath pslSourceSubscriptionPath)
throws Exception {
try (AdminClient pslAdminClient =
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) {
Expand Down Expand Up @@ -384,25 +432,33 @@ public Void apply(Runnable runnable) {
try (AdminClient pslAdminClient =
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) {
notFoundIgnoredClosureRunner.apply(
() -> {
pslAdminClient.deleteSubscription(pslSinkSubscriptionPath);
});
notFoundIgnoredClosureRunner.apply(
() -> {
pslAdminClient.deleteSubscription(pslSourceSubscriptionPath);
});
notFoundIgnoredClosureRunner.apply(
() -> {
pslAdminClient.deleteTopic(pslSinkTopicPath);
});
notFoundIgnoredClosureRunner.apply(
() -> {
pslAdminClient.deleteTopic(pslSourceTopicPath);
});
final SubscriptionPath[] subscriptionPaths = {
pslSinkZonalSubscriptionPath,
pslSinkRegionalSubscriptionPath,
pslSourceZonalSubscriptionPath,
pslSourceRegionalSubscriptionPath
};
for (SubscriptionPath subscriptionPath : subscriptionPaths) {
notFoundIgnoredClosureRunner.apply(
() -> {
pslAdminClient.deleteSubscription(subscriptionPath);
});
}
final TopicPath[] topicPaths = {
pslSinkZonalTopicPath,
pslSinkRegionalTopicPath,
pslSourceZonalTopicPath,
pslSourceRegionalTopicPath
};
for (TopicPath topicPath : topicPaths) {
notFoundIgnoredClosureRunner.apply(
() -> {
pslAdminClient.deleteTopic(topicPath);
});
}

log.atInfo().log("Deleted PSL topics and subscriptions.");
}

try (InstancesClient instancesClient = InstancesClient.create()) {
instancesClient.deleteAsync(projectId, location, instanceName).get(3, MINUTES);
}
Expand Down Expand Up @@ -555,7 +611,20 @@ public void onPartitionsAssigned(Collection<TopicPartition> collection) {
}

@Test
public void testPslSinkConnector() throws Exception {
public void testPslZonalSinkConnector() throws Exception {
testPslSinkConnector(
pslSinkZonalTopicPath, pslSinkZonalSubscriptionPath, kafkaPslSinkZonalTestTopic);
}

// @Test
// public void testPslRegionalSinkConnector() throws Exception {
// testPslSinkConnector(pslSinkRegionalTopicPath, pslSinkRegionalSubscriptionPath,
// kafkaPslSinkRegionalTestTopic);
// }

public void testPslSinkConnector(
TopicPath topicPath, SubscriptionPath pslSinkSubscriptionPath, String kafkaPslSinkTestTopic)
throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaInstanceIpAddress + ":" + KAFKA_PORT);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Expand Down Expand Up @@ -614,7 +683,17 @@ public void testPslSinkConnector() throws Exception {
}

@Test(timeout = 5 * 60 * 1000L)
public void testPslSourceConnector() throws Exception {
public void testPslZonalSourceConnector() throws Exception {
testPslSourceConnector(pslSinkZonalTopicPath, kafkaPslSourceZonalTestTopic);
}

@Test(timeout = 5 * 60 * 1000L)
public void testPslRegionalSourceConnector() throws Exception {
testPslSourceConnector(pslSinkRegionalTopicPath, kafkaPslSourceRegionalTestTopic);
}

public void testPslSourceConnector(TopicPath pslSourceTopicPath, String kafkaPslSourceTestTopic)
throws Exception {
// Publish to CPS topic
PublisherSettings publisherSettings =
PublisherSettings.newBuilder()
Expand Down
4 changes: 2 additions & 2 deletions src/test/resources/kafka_vm_startup_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ ls -l $GCS_DIR/
# Prepare properties files for this run
RUN_ID=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/run_id -H "Metadata-Flavor: Google")
PROJECT_NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/project_id -H "Metadata-Flavor: Google")
PSL_ZONE=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/psl_zone -H "Metadata-Flavor: Google")
PSL_LOCATION=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/psl_location -H "Metadata-Flavor: Google")

sed -i "s/<runId>/$RUN_ID/g" $GCS_DIR/*.properties
sed -i "s/<projectName>/$PROJECT_NAME/g" $GCS_DIR/*.properties
sed -i "s/<pslZone>/$PSL_ZONE/g" $GCS_DIR/*.properties
sed -i "s/<pslLocation>/$PSL_LOCATION/g" $GCS_DIR/*.properties

# Install and run Kafka brokers
KAFKA_VERSION=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/kafka_version -H "Metadata-Flavor: Google")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ connector.class=com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector
tasks.max=2
topics=psl-sink-test-topic
pubsublite.project=<projectName>
pubsublite.location=<pslZone>
pubsublite.location=<pslLocation>
pubsublite.topic=psl-sink-topic-<runId>
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ name=PubSubLiteSourceConnector
connector.class=com.google.pubsublite.kafka.source.PubSubLiteSourceConnector
tasks.max=2
pubsublite.project=<projectName>
pubsublite.location=<pslZone>
pubsublite.location=<pslLocation>
pubsublite.subscription=psl-source-subscription-<runId>
kafka.topic=psl-source-test-topic
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Expand Down
Loading