From 7bc99c542c0f96fe67b8ef4f40cf5ba1e06c6778 Mon Sep 17 00:00:00 2001 From: lijun Date: Fri, 23 Mar 2018 18:35:05 +0800 Subject: [PATCH 1/3] Practice1 ~ Practice3 --- java/util/concurrent/atomic/annotations.xml | 5 +++ .../rxjava/share/practices/Practice1.java | 7 +++- .../rxjava/share/practices/Practice2.java | 41 ++++++++++++++++++- .../rxjava/share/practices/Practice3.java | 35 +++++++++++++++- 4 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 java/util/concurrent/atomic/annotations.xml diff --git a/java/util/concurrent/atomic/annotations.xml b/java/util/concurrent/atomic/annotations.xml new file mode 100644 index 0000000..271a48e --- /dev/null +++ b/java/util/concurrent/atomic/annotations.xml @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..9c268de 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -16,6 +16,8 @@ package cn.nextop.rxjava.share.practices; +import java.util.concurrent.atomic.AtomicInteger; + import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; @@ -30,6 +32,9 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); +// // Wrong +// return observable.map( e -> { return new Tuple2<>(e.charAt(0) - "a".charAt(0) + 1, e); } ); + AtomicInteger a = new AtomicInteger(0); + return observable.map( e -> { int index = a.addAndGet(1); return new Tuple2<>(index, e); } ); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..871f174 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -19,9 +19,22 @@ import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.Scheduler; import io.reactivex.Single; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; +import javafx.collections.ObservableList; +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Iterator; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.security.auth.Subject; /** * @author Baoyi Chen @@ -34,7 +47,28 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); +// // Passed, but not good +// Hashtable info = new Hashtable(); +// words.map( e -> { Integer a = info.get(e); a = (a == null ? 1 : a+1); info.put(e, a); return e; } ).subscribe(); +// System.out.println(info.toString()); +// Observable< Tuple2 > ob = Observable.< Tuple2 >create( e -> { +// Enumeration en = info.keys(); +// while (en.hasMoreElements()) { +// String k = en.nextElement(); Integer v = info.get(k); +// e.onNext(new Tuple2(k, v)); +// } +// e.onComplete(); +// }); +// +// return ob; + + //AtomicInteger a = new AtomicInteger(0); + Observable< Tuple2 > ob = Observable.< Tuple2 >create( emitter -> { + words.groupBy( e -> e ).subscribe( e ->{ e.count().subscribe( count -> { emitter.onNext(new Tuple2(e.getKey(), count.intValue())); } ); }); + emitter.onComplete(); + }); + + return ob; } /* @@ -43,7 +77,10 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + HashMap info = new HashMap(); + words.groupBy( e -> e ).subscribe( e ->{ e.count().subscribe( count -> { info.put(e.getKey(), count.intValue()); } ); }); + + return Single.just(info); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..39f2fd2 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -28,7 +28,23 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); +// // Trying codes +// return Maybe.create( emitter -> { +// iterate(observable).scan((x, y) -> { +// return x + y; +// }).subscribe( e -> { +// System.out.println(e.toString()); +// } ); +// observable.last().subscribe( e -> emitter.onSuccess(e) ); +// +// emitter.onComplete(); +// }); +// iterate(observable).scan((x, y) -> { +// return x + y; +// }).subscribe(e -> { +// System.out.println(e.toString()); +// }); + return iterate(observable).scan((x, y) -> x + y).lastElement(); } /* @@ -42,7 +58,22 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + Observable ob = Observable.create( emitter -> { + observable.map( e -> { + if (e.left != null) { + //left = iterate(Observable.just(e.left)).elementAt(0, Observable.just(0)).subscribe() + iterate(Observable.just(e.left)).subscribe(l -> emitter.onNext(l)); + } + if (e.right != null) { + iterate(Observable.just(e.right)).subscribe(r -> emitter.onNext(r)); + } + emitter.onNext(e.value); + return e.value; + }).subscribe(); + emitter.onComplete(); + }); + + return ob; } public static class Node { From 2bf7f57f139c921adac7356146654a2c2f5e78f3 Mon Sep 17 00:00:00 2001 From: lijun Date: Mon, 26 Mar 2018 20:20:54 +0800 Subject: [PATCH 2/3] complete all practices --- .../rxjava/share/practices/Practice1.java | 8 +++-- .../rxjava/share/practices/Practice4.java | 9 ++++- .../rxjava/share/practices/Practice5.java | 35 ++++++++++++++----- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index 9c268de..34ec7f1 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -34,7 +34,11 @@ public class Practice1 { public Observable> indexable(Observable observable) { // // Wrong // return observable.map( e -> { return new Tuple2<>(e.charAt(0) - "a".charAt(0) + 1, e); } ); - AtomicInteger a = new AtomicInteger(0); - return observable.map( e -> { int index = a.addAndGet(1); return new Tuple2<>(index, e); } ); + +// // Bad, has side effect +// AtomicInteger a = new AtomicInteger(0); +// return observable.map( e -> { int index = a.addAndGet(1); return new Tuple2<>(index, e); } ); + + return observable.map(e -> new Tuple2(1, e)).scan( (pre, cur) -> new Tuple2(pre.getV1() + 1, cur.getV2())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..0f65e62 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,8 @@ import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +46,12 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + Observable ob = Observable.create(emitter -> { + observable.subscribe(e -> { + Observable.just(e).subscribeOn(Schedulers.io()).subscribe(a -> emitter.onNext(a)); + }); + }); + return ob; } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..8b09c4b 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,6 +16,9 @@ package cn.nextop.rxjava.share.practices; +import com.sun.xml.internal.xsom.impl.scd.Iterators; + +import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Maybe; import io.reactivex.Observable; import io.reactivex.Single; @@ -35,7 +38,17 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); +// Single single = Single.create(emitter -> { +// Observable> ob = Observable.create( emt -> { +// source.map(e -> { emt.onNext(new Tuple2(1L, e)); return e; }); +// }); +// ob.scan((last, current) -> { +// return new Tuple2(last.getV1() + current.getV1(), current.getV2()); +// }); +// ob.lastElement().subscribe(e -> emitter.onSuccess(e.getV1())); +// }); +// return single; + return source.map( e -> new Tuple2(1L, e)).reduce(new Tuple2(0L, ""), (x, y) -> new Tuple2(x.getV1() + 1L, y.getV2())).map(e -> e.getV1()); } /* @@ -44,7 +57,10 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.map(e -> Observable.create(emitter -> { + e.forEach(item -> emitter.onNext(item)); + emitter.onComplete(); + })).concatMap(e -> e); } /* @@ -53,7 +69,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.groupBy(e -> e).map(e -> e.getKey()); } /* @@ -62,7 +78,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.map(e -> { if (conditon.test(e)) { return Observable.just(e); } else { return Observable.empty(); } }).concatMap(e -> e); } /* @@ -71,7 +87,8 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.map(e -> new Tuple2(0, e)).scan((pre, cur) -> new Tuple2(pre.getV1() + 1, cur.getV2())).reduce((pre, cur) -> cur.getV1() == (Integer)index ? cur : + pre).map(e -> e.getV2()); } /* @@ -80,7 +97,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return Observable.range(1, count).concatMap(e -> source); } /* @@ -89,7 +106,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).concatMap(e -> e); } /* @@ -98,7 +115,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(e -> e); } /* @@ -107,7 +124,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.map(e -> Observable.just(e).delay(1, TimeUnit.SECONDS)).concatMap(e -> e); } } From 16710eec39398dfcbae1108eb5fada8ff0ab7a9e Mon Sep 17 00:00:00 2001 From: lijun Date: Mon, 26 Mar 2018 20:29:30 +0800 Subject: [PATCH 3/3] fix compiling errors --- src/main/java/cn/nextop/rxjava/share/practices/Practice5.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 8b09c4b..b40c4e0 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,8 +16,6 @@ package cn.nextop.rxjava.share.practices; -import com.sun.xml.internal.xsom.impl.scd.Iterators; - import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Maybe; import io.reactivex.Observable;