-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathChallenge3RxJava2Example.java
More file actions
46 lines (36 loc) · 2.07 KB
/
Challenge3RxJava2Example.java
File metadata and controls
46 lines (36 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package challenge3.rxjava2;
import externalLegacyCodeNotUnderOurControl.TemperatureValueSource;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.apache.commons.lang3.tuple.Pair;
import java.util.concurrent.TimeUnit;
import static externalLegacyCodeNotUnderOurControl.PrintlnWithThreadname.println;
// https://github.com/ReactiveMeetupLucerne/AsyncNonBlockingExamplesJVM/issues/9
public class Challenge3RxJava2Example {
public static void main(String[] args) throws InterruptedException {
TemperatureValueSource temperatureValueSource = new TemperatureValueSource();
Observable<Integer> temperatureValues = Observable.create(emitter -> {
TemperatureValueSource.TemperatureListener temperatureListener = emitter::onNext;
temperatureValueSource.addListener(temperatureListener);
emitter.setCancellable(() -> temperatureValueSource.removeListener(temperatureListener));
});
Observable<Pair<Integer, Integer>> minMaxValuesWithinWindow = temperatureValues
.doOnNext(tempValue -> println(tempValue + "°Celsius from temperature source"))
.window(10, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(temperatureValuesWithinWindow ->
temperatureValuesWithinWindow.reduce(
Pair.of(Integer.MAX_VALUE, Integer.MIN_VALUE),
(minMaxPair, tempValue) ->
Pair.of(Math.min(tempValue, minMaxPair.getLeft()),
Math.max(tempValue, minMaxPair.getRight()))
).toObservable()
);
minMaxValuesWithinWindow.subscribe(minMaxPair ->
println("Within window: Min=" + minMaxPair.getLeft() + "°Celsius, "
+ "Max=" + minMaxPair.getRight() + "°Celsius. "
+ "Calculated async and non-blocking TM :-)")
);
println("I wasn't blocked");
TimeUnit.MINUTES.sleep(1);
}
}