diff --git a/build.sbt b/build.sbt index 1d3849e..2778835 100644 --- a/build.sbt +++ b/build.sbt @@ -12,8 +12,9 @@ scalaVersion := "2.11.4" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-experimental" % "0.10-M1", - "io.reactivex" % "rxjava-reactive-streams" % "0.3.0", - "io.ratpack" % "ratpack-rx" % "0.9.10-SNAPSHOT", - "io.ratpack" % "ratpack-test" % "0.9.10-SNAPSHOT", - "org.projectreactor" % "reactor-core" % "2.0.0.M1" + "io.reactivex" % "rxjava-reactive-streams" % "0.5.0", + "io.ratpack" % "ratpack-rx" % "0.9.16-SNAPSHOT", + "io.ratpack" % "ratpack-test" % "0.9.16-SNAPSHOT", + "org.projectreactor" % "reactor-core" % "2.0.0.M1", + "org.slf4j" % "slf4j-simple" % "1.7.12" ) diff --git a/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java b/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java index 8f185c0..cd85d5a 100644 --- a/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java +++ b/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java @@ -1,49 +1,43 @@ package com.rolandkuhn.rsinterop; -import org.reactivestreams.Publisher; - - - import akka.actor.ActorSystem; import akka.stream.FlowMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; +import org.reactivestreams.Publisher; import ratpack.http.ResponseChunks; import ratpack.rx.RxRatpack; -import ratpack.test.embed.EmbeddedApp;import reactor.rx.Stream; +import ratpack.test.embed.EmbeddedApp; +import reactor.rx.Stream; import reactor.rx.Streams; import rx.Observable; import rx.RxReactiveStreams; - public class JavaMain { - - public static void main(String[] args) { - final ActorSystem system = ActorSystem.create("InteropTest"); - final FlowMaterializer mat = FlowMaterializer.create(system); - RxRatpack.initialize(); - - EmbeddedApp.fromHandler(ctx -> { - final Integer[] ints = new Integer[10]; - for (int i = 0; i < ints.length; ++i) { - ints[i] = i; - } - // RxJava Observable - final Observable intObs = Observable.from(ints); - // Reactive Streams Publisher - final Publisher intPub = RxReactiveStreams.toPublisher(intObs); - // Akka Streams Source - final Source stringSource = Source.from(intPub).map(Object::toString); - // Reactive Streams Publisher - final Publisher stringPub = stringSource.runWith(Sink.fanoutPublisher(1, 1), mat); - // Reactor Stream - final Stream linesStream = Streams.create(stringPub).map(i -> i + "\n"); - // and now render the HTTP response - ctx.render(ResponseChunks.stringChunks(linesStream)); - }).test(client -> { - final String text = client.getText(); - System.out.println(text); - system.shutdown(); - });; - } + public static void main(String... args) throws Exception { + ActorSystem system = ActorSystem.create("InteropTest"); + FlowMaterializer mat = FlowMaterializer.create(system); + RxRatpack.initialize(); + + EmbeddedApp.fromHandler(ctx -> { + final Integer[] ints = new Integer[10]; + for (int i = 0; i < ints.length; ++i) { + ints[i] = i; + } + // RxJava Observable + final Observable intObs = Observable.from(ints); + // Reactive Streams Publisher + final Publisher intPub = RxReactiveStreams.toPublisher(intObs); + // Akka Streams Source + final Source stringSource = Source.from(intPub).map(Object::toString); + // Reactive Streams Publisher + final Publisher stringPub = stringSource.runWith(Sink.fanoutPublisher(1, 1), mat); + // Reactor Stream + final Stream linesStream = Streams.create(stringPub).map(i -> i + "\n"); + // and now render the HTTP response + ctx.render(ResponseChunks.stringChunks(linesStream)); + }).test(client -> + System.out.println(client.getText()) + ); + } } diff --git a/src/main/scala/com/rolandkuhn/rsinterop/ScalaMain.scala b/src/main/scala/com/rolandkuhn/rsinterop/ScalaMain.scala index 6f2aa6d..8f8a2e2 100644 --- a/src/main/scala/com/rolandkuhn/rsinterop/ScalaMain.scala +++ b/src/main/scala/com/rolandkuhn/rsinterop/ScalaMain.scala @@ -1,28 +1,25 @@ package com.rolandkuhn.rsinterop -import ratpack.rx.RxRatpack -import ratpack.test.embed.EmbeddedApp -import ratpack.handling.Handler -import ratpack.handling.Context -import rx.Observable -import scala.collection.JavaConverters._ -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Source -import rx.RxReactiveStreams -import akka.stream.scaladsl.Sink import akka.actor.ActorSystem import akka.stream.FlowMaterializer +import akka.stream.scaladsl.{Sink, Source} +import ratpack.func.Action +import ratpack.handling.{Context, Handler} import ratpack.http.ResponseChunks -import java.util.function.Consumer +import ratpack.rx.RxRatpack +import ratpack.test.embed.EmbeddedApp import ratpack.test.http.TestHttpClient import reactor.rx.Streams +import rx.{Observable, RxReactiveStreams} + +import scala.collection.JavaConverters._ object ScalaMain extends App { val system = ActorSystem("InteropTest") implicit val mat = FlowMaterializer()(system) - + RxRatpack.initialize() - + EmbeddedApp.fromHandler(new Handler { override def handle(ctx: Context): Unit = { // RxJava Observable @@ -40,11 +37,9 @@ object ScalaMain extends App { // and now render the HTTP response ctx.render(ResponseChunks.stringChunks(linesStream)) } - }).test(new Consumer[TestHttpClient] { - override def accept(client: TestHttpClient): Unit = { - val text = client.getText() - println(text) - system.shutdown() + }).test(new Action[TestHttpClient] { + override def execute(client: TestHttpClient): Unit = { + println(client.getText()) } }) } \ No newline at end of file