-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRxJavaCodeExamples.java
More file actions
62 lines (52 loc) · 2.55 KB
/
RxJavaCodeExamples.java
File metadata and controls
62 lines (52 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package a_intro;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class RxJavaCodeExamples {
public static void main(String[] args) throws Exception {
//1. doing things in parallel, using multiple threads for IO
Flowable.just("IBM", "Intel", "Apple")
.flatMapSingle(RxJavaCodeExamples::getPrice)
.doOnNext(price -> System.out.println(Thread.currentThread() + ": got price " + price))
.toList()
.map(prices -> prices.stream().mapToDouble(value -> value).average().getAsDouble())
.subscribe(averagePrice -> System.out.println(Thread.currentThread() + ": Average price is " + averagePrice));
TimeUnit.SECONDS.sleep(5);
//2. backpressure handling
Observable<Integer> fastProducer = Observable.range(1, 100)
.doOnNext(integer -> System.out.println(Thread.currentThread() + ": Produced " + integer))
.subscribeOn(Schedulers.io());
Flowable<Integer> droppingProducer = fastProducer.toFlowable(BackpressureStrategy.DROP);
droppingProducer
.observeOn(Schedulers.io(), false, 10)
.subscribe(
integer -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread() + ": Consumed " + integer);
},
Throwable::printStackTrace,
() -> System.out.println("Done")
);
TimeUnit.SECONDS.sleep(5);
//3. zipping
Single.zip(
getPrice("flight"),
getPrice("hotel"),
getPrice("car"),
(price1, price2, price3) -> price1 + price2 + price3
).subscribe(totalPrice -> System.out.println("Total price for flight, hotal and car is: " + totalPrice));
TimeUnit.MINUTES.sleep(1);
}
private static Single<Double> getPrice(String symbol) {
return Single.timer(ThreadLocalRandom.current().nextLong(1000), TimeUnit.MILLISECONDS, Schedulers.io())
.map(ticker -> ThreadLocalRandom.current().nextDouble(200, 800));
}
}