- Reactive programming is programming with asynchronous data streams
- JavaScript is asynchronous by design
- HTTP requests
- Timeouts
- UI events (clicks, key presses, etc.)
- How to handle all this?
- Traditionally solved by registering callback functions to be executed upon completion of task
- E.g.
setTimeoutthat calls callback function once given amount of milliseconds has passed
window.setTimeout(() => {
// Executed after one second
}, 1000);- Using callbacks quickly leads to messy code with multiple nested functions that is hard to follow and rationalize
getData((x) => {
getMoreData(x, (y) => {
getMoreData(y, (z) => {
...
});
});
});- For more information google for callback hell
- Promise is a promise of providing a value later
- Promise constructor takes single argument that is a function with two parameters:
resolve: function to be called when we want to indicate successreject: function to be called when we want to indicate failure
new Promise((resolve, reject) => {
if (...) resolve(x);
else if (...) resolve(y);
else reject();
});- Both functions allow arguments that are provided for promise consumer
- Promises are consumed by calling
thenon them.thentakes two arguments: success and failure handler
somethingReturningPromise().then(
(value) => { // Resolved
// Handle success case
},
(value) => { // Rejected
// Handle reject case, e.g. show an error note
});- Promises can be "chained" by calling then multiple times in a row
- Each
.then()will change the value of the promise by returning a new value - If the value returned it is a promise, it will be waited for
fetch('/users') // Make the HTTP request
.then(response => response.json()) // .json() will return a promise
.then(json => json.users) // Map the result to contain only the "users" field
.then(users => alert('Found ' + users.length + ' users')); // Show an alert with the users- Promises don't work for streams, they are just to subscribe for single events
- Promises can't be cancelled
- Generalization of promises for streams
- A way for representing asynchronous event streams
- e.g. mouse clicks, WebSocket streams
- Can also be used for single events e.g. HTTP requests
- ReactiveX is a library for representing asynchronous event streams with Observables and modifying them with various stream operations
- RxJS is a ReactiveX implementation for JavaScript
- Angular integrates with RxJS 6
"In ReactiveX an observer subscribes to an Observable."
- You subscribe to stream of events so that your handler gets invoked every time there is a new item
observable.subscribe(item => doSomething(item));Streams can be manipulated with traditional array conversion functions such as map and filter
observable
.filter(node => node.children.length > 2)
.map(node => node.name);You can merge, concat and do other operations on streams to produce new streams from the existing ones
const resultStream = stream1.merge(stream2);Observables can also be created from e.g. objects, maps and arrays
Rx.Observable.of(42);
Rx.Observable.from([1,2,3,4]);
Rx.Observable.range(1,10);-
Subscribe method takes three functions as arguments:
- onNext: called when a new item is emitted
- onError: called if observable sequence fails
- onComplete: called when sequence is complete
observable.subscribe( next => doSomething(next), // onNext error => handleError(error), // onError () => done() // onComplete );
- Observable sequence subscriptions can be unsubscribed
- E.g. observable that produces events that are saved into the memory
const eventSubscription = eventStream.subscribe( event => this.events.push(event) );
- Sequence will not stop until unsubscribed
eventSubscription.unsubscribe();
- E.g. observable that produces events that are saved into the memory
- Observables die on errors
- The way to survive from errors is by catching them and returning a new observable sequence
observable.catch((error) => {
console.log(error);
return Rx.Observable.of([1, 2, 3]);
};- Cold observables start running upon subscription
- E.g. http request
- Hot observables are already producing values before the subscription is active
- E.g. mouse move events
- Observables used exclusively instead of promises
- E.g. HTTP requests only result in single event (one response) but they are modeled as observables
this.httpClient.get('url/restapi/resource') // Returns observable .subscribe( data => { this.data = data}, // Success err => console.error(err), // Failure () => console.log('done') // Done );
- Changes in route parameters are propagated through an observable sequence
constructor(route: ActivatedRoute) {
route.params.subscribe(params => this.index = +params['index']);
}- RxJS 5.5.0 introduced major change to RxJS called pipeable operators
- Importing is a mess with 5.5 but RxJS 6 will fix it
- Read more
Pre RxJS 5.5
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/map';
const source$ = Observable.range(0, 10);
source$
.filter(x => x % 2 === 0)
.map(x => x + x)
.subscribe(x => console.log(x))RxJS 5.5 ->
import { range } from 'rxjs/observable/range';
import { map } from 'rxjs/operators/map';
import { filter } from 'rxjs/operators/filter';
const source$ = range(0, 10);
source$.pipe(
filter(x => x % 2 === 0),
map(x => x + x)
).subscribe(x => console.log(x))Some of the operators are reserved words in JavaScript:
do->tapcatch->catchErrorswitch->switchAllfinally->finalize
- No more "prototype patching" -> Tree-shaking possible -> Smaller bundle sizes
- Custom operators are easier to make
- Better tooling support by linters and compilers
- Released 04/2018
- Major changes:
- Simpler imports (
import { map, filter } from 'rxjs/operators') - Errors thrown asynchronously
- Deprecations
- New operator (
throwIfEmpty) - Provides compatibility library (
rxjs-compat) to support the migration from 5 to 6 - See Ben Lesh's (RxJS 5 and 6 author) presentation in ngConf 2018 for more details (slides, video)
- rxjs-dev