Skip to content

Commit

Permalink
Merge pull request #1 from iturcino/mm2-updates
Browse files Browse the repository at this point in the history
Mm2 updates (Copy of OneCricketeer#26)
  • Loading branch information
eddyv authored May 28, 2021
2 parents 6d387cf + 2080a90 commit ae561bb
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 89 deletions.
55 changes: 31 additions & 24 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>cricket.jmoore</groupId>
<artifactId>schema-registry-transfer-smt</artifactId>
<version>0.2.1-SNAPSHOT</version>
<version>0.2.1</version>
<name>schema-registry-transfer-smt</name>
<description>
A Kafka Connect Transform for copying Confluent Schema Registry schemas between different registries.
Expand Down Expand Up @@ -66,11 +66,10 @@
<maven.compiler.target>1.8</maven.compiler.target>

<slf4j.version>1.7.25</slf4j.version>
<kafka.version>2.1.0</kafka.version>
<confluent.version>5.1.0</confluent.version>
<confluent.patch.version>-cp1</confluent.patch.version>
<jackson.version>2.9.7</jackson.version>
<jackson.asl.version>1.9.13</jackson.asl.version>
<confluent.version>5.5.0</confluent.version>
<jackson.version>2.10.2</jackson.version>
<avro.version>1.9.2</avro.version>
<jersey.bean.validation.version>2.30</jersey.bean.validation.version>

<spotless.version>1.19.0</spotless.version>

Expand All @@ -90,14 +89,14 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}${confluent.patch.version}</version>
<version>${confluent.version}-ccs</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>${kafka.version}${confluent.patch.version}</version>
<version>${confluent.version}-ccs</version>
<scope>provided</scope>
</dependency>

Expand All @@ -122,13 +121,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -137,16 +129,16 @@
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${jackson.asl.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${jackson.asl.version}</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -178,11 +170,18 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.ext</groupId>
<artifactId>jersey-bean-validation</artifactId>
<version>${jersey.bean.validation.version}</version>
<scope>provided</scope>
</dependency>

<!-- Runtime dependencies -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
<version>${avro.version}</version>
</dependency>

<dependency>
Expand All @@ -201,8 +200,16 @@
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.ext</groupId>
<artifactId>jersey-bean-validation</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;

Expand All @@ -44,17 +45,17 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans

public static final String SRC_PREAMBLE = "For source consumer's schema registry, ";
public static final String SRC_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy from. The consumer's Schema Registry.";
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String SRC_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String SRC_USER_INFO_CONFIG_DOC = SRC_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String SRC_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;

public static final String DEST_PREAMBLE = "For target producer's schema registry, ";
public static final String DEST_SCHEMA_REGISTRY_CONFIG_DOC = "A list of addresses for the Schema Registry to copy to. The producer's Schema Registry.";
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DOC;
public static final String DEST_BASIC_AUTH_CREDENTIALS_SOURCE_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT;
public static final String DEST_USER_INFO_CONFIG_DOC = DEST_PREAMBLE + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DOC;
public static final String DEST_USER_INFO_CONFIG_DEFAULT = AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_USER_INFO_DEFAULT;

public static final String TRANSFER_KEYS_CONFIG_DOC = "Whether or not to copy message key schemas between registries.";
public static final Boolean TRANSFER_KEYS_CONFIG_DEFAULT = true;
Expand All @@ -63,7 +64,7 @@ public class SchemaRegistryTransfer<R extends ConnectRecord<R>> implements Trans

private CachedSchemaRegistryClient sourceSchemaRegistryClient;
private CachedSchemaRegistryClient destSchemaRegistryClient;
private SubjectNameStrategy<org.apache.avro.Schema> subjectNameStrategy;
private SubjectNameStrategy subjectNameStrategy;
private boolean transferKeys, includeHeaders;

// caches from the source registry to the destination registry
Expand Down Expand Up @@ -98,17 +99,17 @@ public void configure(Map<String, ?> props) {

List<String> sourceUrls = config.getList(ConfigName.SRC_SCHEMA_REGISTRY_URL);
final Map<String, String> sourceProps = new HashMap<>();
sourceProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
sourceProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
"SRC_" + config.getString(ConfigName.SRC_BASIC_AUTH_CREDENTIALS_SOURCE));
sourceProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
sourceProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.SRC_USER_INFO)
.value());

List<String> destUrls = config.getList(ConfigName.DEST_SCHEMA_REGISTRY_URL);
final Map<String, String> destProps = new HashMap<>();
destProps.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
destProps.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
"DEST_" + config.getString(ConfigName.DEST_BASIC_AUTH_CREDENTIALS_SOURCE));
destProps.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,
destProps.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG,
config.getPassword(ConfigName.DEST_USER_INFO)
.value());

Expand Down Expand Up @@ -206,14 +207,14 @@ protected Optional<Integer> copySchema(ByteBuffer buffer, String topic, boolean

schemaAndDestId = schemaCache.get(sourceSchemaId);
if (schemaAndDestId != null) {
log.trace("Schema id {} has been seen before. Not registering with destination registry again.");
log.trace("Schema id {} has been seen before. Not registering with destination registry again.", sourceSchemaId);
} else { // cache miss
log.trace("Schema id {} has not been seen before", sourceSchemaId);
schemaAndDestId = new SchemaAndId();
try {
log.trace("Looking up schema id {} in source registry", sourceSchemaId);
// Can't do getBySubjectAndId because that requires a Schema object for the strategy
schemaAndDestId.schema = sourceSchemaRegistryClient.getById(sourceSchemaId);
schemaAndDestId.schema = sourceSchemaRegistryClient.getSchemaById(sourceSchemaId);
} catch (IOException | RestClientException e) {
log.error(String.format("Unable to fetch source schema for id %d.", sourceSchemaId), e);
throw new ConnectException(e);
Expand Down Expand Up @@ -244,25 +245,25 @@ public void close() {
}

interface ConfigName {
String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String SRC_USER_INFO = "src." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG;
String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String DEST_USER_INFO = "dest." + AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG;
String SRC_SCHEMA_REGISTRY_URL = "src." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String SRC_BASIC_AUTH_CREDENTIALS_SOURCE = "src." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String SRC_USER_INFO = "src." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
String DEST_SCHEMA_REGISTRY_URL = "dest." + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
String DEST_BASIC_AUTH_CREDENTIALS_SOURCE = "dest." + AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
String DEST_USER_INFO = "dest." + AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
String SCHEMA_CAPACITY = "schema.capacity";
String TRANSFER_KEYS = "transfer.message.keys";
String INCLUDE_HEADERS = "include.message.headers";
}

private static class SchemaAndId {
private Integer id;
private org.apache.avro.Schema schema;
private ParsedSchema schema;

SchemaAndId() {
}

SchemaAndId(int id, org.apache.avro.Schema schema) {
SchemaAndId(int id, ParsedSchema schema) {
this.id = id;
this.schema = schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
Expand Down Expand Up @@ -152,22 +154,22 @@ public void beforeEach(final ExtensionContext context) {
.willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName())));
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(CONFIG_PATTERN))
.willReturn(WireMock.aResponse().withTransformers(this.getConfigHandler.getName())));
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+"))
this.stubFor.apply(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+/(?:fetchMaxId=false)"))
.willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND)));
}

public int registerSchema(final String topic, boolean isKey, final Schema schema) {
public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema) {
return this.registerSchema(topic, isKey, schema, new TopicNameStrategy());
}

public int registerSchema(final String topic, boolean isKey, final Schema schema, SubjectNameStrategy<Schema> strategy) {
public int registerSchema(final String topic, boolean isKey, final ParsedSchema schema, SubjectNameStrategy strategy) {
return this.register(strategy.subjectName(topic, isKey, schema), schema);
}

private int register(final String subject, final Schema schema) {
private int register(final String subject, final ParsedSchema schema) {
try {
final int id = this.schemaRegistryClient.register(subject, schema);
this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id))
this.stubFor.apply(WireMock.get(WireMock.urlEqualTo(SCHEMA_BY_ID_PATTERN + id + "?fetchMaxId=false"))
.willReturn(ResponseDefinitionBuilder.okForJson(new SchemaString(schema.toString()))));
log.debug("Registered schema {}", id);
return id;
Expand Down Expand Up @@ -242,8 +244,8 @@ public ResponseDefinition transform(final Request request, final ResponseDefinit
final FileSource files, final Parameters parameters) {
try {
final int id = SchemaRegistryMock.this.register(getSubject(request),
new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()));
new AvroSchema(new Schema.Parser()
.parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema())));
final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse();
registerSchemaResponse.setId(id);
return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse);
Expand Down Expand Up @@ -279,7 +281,8 @@ private class GetVersionHandler extends SubjectsVersioHandler {
@Override
public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition,
final FileSource files, final Parameters parameters) {
String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3);
String versionStrFull = Iterables.get(this.urlSplitter.split(request.getUrl()), 3);
String versionStr = versionStrFull.substring(0, versionStrFull.indexOf("?"));
SchemaMetadata metadata;
if (versionStr.equals("latest")) {
metadata = SchemaRegistryMock.this.getSubjectVersion(getSubject(request), versionStr);
Expand Down
Loading

0 comments on commit ae561bb

Please sign in to comment.