Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pulsar Binder to Spring Cloud Stream #1322

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public void customize(Build build) {
.add("cloud-stream-binder-kafka", "org.springframework.cloud", "spring-cloud-stream-binder-kafka",
DependencyScope.COMPILE);
}
if (isSpringBoot3xWithPulsarSupport() && hasDependency("pulsar", build)) {
build.dependencies()
.add("cloud-stream-binder-pulsar", "org.springframework.cloud", "spring-cloud-stream-binder-pulsar",
DependencyScope.COMPILE);
}
}
// Spring Cloud Stream specific
if (hasDependency("cloud-stream", build)) {
Expand Down Expand Up @@ -92,4 +97,9 @@ protected boolean isSpringBoot3x() {
return platformVersion.compareTo(Version.parse("3.0.0-M1")) > 0;
}

protected boolean isSpringBoot3xWithPulsarSupport() {
Version platformVersion = this.description.getPlatformVersion();
return platformVersion.compareTo(Version.parse("3.2.0-M3")) >= 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*
* @author Stephane Nicoll
* @author Brian Clozel
* @author Chris Bono
*/
class SpringCloudStreamBuildCustomizerTests extends AbstractExtensionTests {

Expand All @@ -43,6 +44,9 @@ class SpringCloudStreamBuildCustomizerTests extends AbstractExtensionTests {
private static final Dependency KAFKA_STREAMS_BINDER = Dependency.withId("cloud-stream-binder-kafka-streams",
"org.springframework.cloud", "spring-cloud-stream-binder-kafka-streams");

private static final Dependency PULSAR_BINDER = Dependency.withId("cloud-stream-binder-pulsar",
"org.springframework.cloud", "spring-cloud-stream-binder-pulsar");

private static final Dependency RABBIT_BINDER = Dependency.withId("cloud-stream-binder-rabbit",
"org.springframework.cloud", "spring-cloud-stream-binder-rabbit");

Expand Down Expand Up @@ -85,6 +89,19 @@ void springCloudStreamWithKafkaStreams(Version springBootVersion, Dependency tes
.hasDependenciesSize(5);
}

@ParameterizedTest
@MethodSource("springCloudStreamWithPulsarArguments")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not super excited of screaming "...WithPulsar" here but the variations we are now trying to test are:

  • Boot 2.7.x (the test binder deps differ across 2.x and 3.x)
  • Boot 3.x (test test binder deps differ across 2.x and 3.x)
  • Boot 3.2.0-M3++ (new binder was added here)

Because the Pulsar binder does not exist <=3.2.0-M3 we can simply add more args to the test params for boot32x as the behavior is additive (iow new binder dep).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of those either. I've polished to make it short for now. We can revisit those tests once we remove support for 2.x which is soon.

void springCloudStreamWithPulsar(Version springBootVersion, Dependency testDependency) {
ProjectRequest request = createProjectRequest("cloud-stream", "pulsar");
request.setBootVersion(springBootVersion.toString());
assertThat(mavenPom(request)).hasDependency(getDependency("cloud-stream"))
.hasDependency(getDependency("pulsar"))
.hasDependency(PULSAR_BINDER)
.hasDependency(Dependency.createSpringBootStarter("test", Dependency.SCOPE_TEST))
.hasDependency(testDependency)
.hasDependenciesSize(5);
}

@ParameterizedTest
@MethodSource("springCloudStreamArguments")
void springCloudStreamWithAllBinders(Version springBootVersion, Dependency testDependency) {
Expand All @@ -103,20 +120,27 @@ void springCloudStreamWithAllBinders(Version springBootVersion, Dependency testD
}

@ParameterizedTest
@MethodSource("springCloudStreamArguments")
void springCloudBusWithRabbit(Version springBootVersion, Dependency testDependency) {
ProjectRequest request = createProjectRequest("cloud-bus", "amqp");
@MethodSource("springCloudStreamWithPulsarArguments")
void springCloudStreamWithAllBindersInBoot32x(Version springBootVersion, Dependency testDependency) {
ProjectRequest request = createProjectRequest("cloud-stream", "amqp", "kafka", "kafka-streams", "pulsar");
request.setBootVersion(springBootVersion.toString());
assertThat(mavenPom(request)).hasDependency(getDependency("cloud-bus"))
assertThat(mavenPom(request)).hasDependency(getDependency("cloud-stream"))
.hasDependency(getDependency("amqp"))
.hasDependency(getDependency("kafka"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(getDependency("pulsar"))
.hasDependency(RABBIT_BINDER)
.hasDependency(KAFKA_BINDER)
.hasDependency(KAFKA_STREAMS_BINDER)
.hasDependency(PULSAR_BINDER)
.hasDependency(Dependency.createSpringBootStarter("test", Dependency.SCOPE_TEST))
.hasDependenciesSize(5);
.hasDependency(testDependency)
.hasDependenciesSize(13);
}

@ParameterizedTest
@MethodSource("springCloudStreamArguments")
void springCloudBusWithKafka(Version springBootVersion, Dependency testDependency) {
void springCloudBusWithRabbit(Version springBootVersion, Dependency ignoredTestDependency) {
ProjectRequest request = createProjectRequest("cloud-bus", "amqp");
request.setBootVersion(springBootVersion.toString());
assertThat(mavenPom(request)).hasDependency(getDependency("cloud-bus"))
Expand All @@ -128,17 +152,43 @@ void springCloudBusWithKafka(Version springBootVersion, Dependency testDependenc

@ParameterizedTest
@MethodSource("springCloudStreamArguments")
void springCloudBusWithAllBinders(Version springBootVersion, Dependency testDependency) {
ProjectRequest request = createProjectRequest("cloud-bus", "amqp", "kafka", "kafka-streams");
void springCloudBusWithKafka(Version springBootVersion, Dependency ignoredTestDependency) {
ProjectRequest request = createProjectRequest("cloud-bus", "kafka");
request.setBootVersion(springBootVersion.toString());
assertThat(mavenPom(request)).hasDependency(getDependency("cloud-bus"))
.hasDependency(getDependency("kafka"))
.hasDependency(KAFKA_BINDER)
.hasDependency(Dependency.createSpringBootStarter("test", Dependency.SCOPE_TEST))
.hasDependenciesSize(5);
}

@ParameterizedTest
@MethodSource("springCloudStreamWithPulsarArguments")
void springCloudBusWithPulsar(Version springBootVersion, Dependency ignoredTestDependency) {
ProjectRequest request = createProjectRequest("cloud-bus", "pulsar");
request.setBootVersion(springBootVersion.toString());
assertThat(mavenPom(request)).hasDependency(getDependency("cloud-bus"))
.hasDependency(getDependency("pulsar"))
.hasDependency(PULSAR_BINDER)
.hasDependency(Dependency.createSpringBootStarter("test", Dependency.SCOPE_TEST))
.hasDependenciesSize(4);
}

@ParameterizedTest
@MethodSource("springCloudStreamWithPulsarArguments")
void springCloudBusWithAllBindersInBoot32x(Version springBootVersion, Dependency ignoredTestDependency) {
ProjectRequest request = createProjectRequest("cloud-bus", "amqp", "kafka", "kafka-streams", "pulsar");
request.setBootVersion(springBootVersion.toString());
assertThat(mavenPom(request)).hasDependency(getDependency("cloud-bus"))
.hasDependency(getDependency("amqp"))
.hasDependency(getDependency("kafka"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(getDependency("pulsar"))
.hasDependency(RABBIT_BINDER)
.hasDependency(KAFKA_BINDER)
.hasDependency(PULSAR_BINDER)
.hasDependency(Dependency.createSpringBootStarter("test", Dependency.SCOPE_TEST))
.hasDependenciesSize(9);
.hasDependenciesSize(11);
}

@Test
Expand All @@ -159,4 +209,11 @@ private static Stream<Arguments> springCloudStreamArguments() {
Arguments.of(Version.parse("3.0.0"), testBinder));
}

private static Stream<Arguments> springCloudStreamWithPulsarArguments() {
Dependency testBinder = Dependency.withId("cloud-stream-test", "org.springframework.cloud",
"spring-cloud-stream-test-binder", null, Dependency.SCOPE_TEST);
return Stream.of(Arguments.of(Version.parse("3.2.0-M3"), testBinder),
Arguments.of(Version.parse("3.2.0-RC1"), testBinder));
}

}