-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathRxICacheImpl.java
More file actions
63 lines (49 loc) · 1.84 KB
/
RxICacheImpl.java
File metadata and controls
63 lines (49 loc) · 1.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.hazelcast.rxjava.impl;
import com.hazelcast.cache.ICache;
import com.hazelcast.rxjava.RxICache;
import rx.Observable;
import javax.cache.expiry.ExpiryPolicy;
import java.util.concurrent.Executor;
/**
* TODO
*
* @author Viktor Gamov on 7/29/16.
* Twitter: @gamussa
* @since 0.0.1
*/
public class RxICacheImpl<K, V> implements RxICache<K, V> {
private final ICache<K, V> cache;
private final Executor executor;
public RxICacheImpl(ICache<K, V> cache) {
this.cache = cache;
this.executor = null;
}
public RxICacheImpl(ICache<K, V> cache, Executor executor) {
this.cache = cache;
this.executor = executor;
}
@Override public Observable<V> get(K key) {
return RxIObservable.from(cache.getAsync(key), executor);
}
@Override public Observable<V> getAndPut(K key, V newValue) {
return RxIObservable.from(cache.getAndPutAsync(key, newValue), executor);
}
@Override public Observable<V> getAndPut(K key, V newValue, ExpiryPolicy expiryPolicy) {
return RxIObservable.from(cache.getAndPutAsync(key, newValue, expiryPolicy), executor);
}
@Override public ICache<K, V> getDelegate() {
return this.cache;
}
@Override public Observable<Void> put(K key, V value) {
return RxIObservable.from(cache.putAsync(key, value), executor);
}
@Override public Observable<Void> put(K key, V value, ExpiryPolicy expiryPolicy) {
return RxIObservable.from(cache.putAsync(key, value, expiryPolicy), executor);
}
public static RxICache<String, String> from(ICache<String, String> cache) {
return new RxICacheImpl<String, String>(cache);
}
public static RxICache<String, String> from(ICache<String, String> cache, Executor executor) {
return new RxICacheImpl<String, String>(cache, executor);
}
}