Skip to content

Latest commit

 

History

History
270 lines (229 loc) · 7.65 KB

File metadata and controls

270 lines (229 loc) · 7.65 KB

Reactive Programming With Angular


Background

  • Reactive programming is programming with asynchronous data streams
  • JavaScript is asynchronous by design
    • HTTP requests
    • Timeouts
    • UI events (clicks, key presses, etc.)
    • How to handle all this?

Callbacks

  • Traditionally solved by registering callback functions to be executed upon completion of task
  • E.g. setTimeout that calls callback function once given amount of milliseconds has passed
window.setTimeout(() => {
	// Executed after one second
}, 1000);

Problem: Messy Code

  • Using callbacks quickly leads to messy code with multiple nested functions that is hard to follow and rationalize
getData((x) => {
    getMoreData(x, (y) => {
        getMoreData(y, (z) => {
            ...
        });
    });
});
  • For more information google for callback hell

Solution: Promises

  • Promise is a promise of providing a value later
  • Promise constructor takes single argument that is a function with two parameters:
    • resolve: function to be called when we want to indicate success
    • reject: function to be called when we want to indicate failure
new Promise((resolve, reject) => {
  if (...) resolve(x);
  else if (...) resolve(y);
  else reject();
});
  • Both functions allow arguments that are provided for promise consumer

Promises are Resolved or Rejected

  • Promises are consumed by calling then on them. then takes two arguments: success and failure handler
somethingReturningPromise().then(
  (value) => { // Resolved
    // Handle success case
  },
  (value) => { // Rejected
    // Handle reject case, e.g. show an error note
  });

Promise Chaining

  • Promises can be "chained" by calling then multiple times in a row
  • Each .then() will change the value of the promise by returning a new value
  • If the value returned it is a promise, it will be waited for
fetch('/users') // Make the HTTP request
  .then(response => response.json()) // .json() will return a promise
  .then(json => json.users) // Map the result to contain only the "users" field
  .then(users => alert('Found ' + users.length + ' users')); // Show an alert with the users

Problem: Stream Handling and Disposability

  • Promises don't work for streams, they are just to subscribe for single events
  • Promises can't be cancelled

Solution: Observables

  • Generalization of promises for streams
  • A way for representing asynchronous event streams
    • e.g. mouse clicks, WebSocket streams
  • Can also be used for single events e.g. HTTP requests

RxJS

  • ReactiveX is a library for representing asynchronous event streams with Observables and modifying them with various stream operations
  • RxJS is a ReactiveX implementation for JavaScript
  • Angular integrates with RxJS 6

Idea:

"In ReactiveX an observer subscribes to an Observable."

  • You subscribe to stream of events so that your handler gets invoked every time there is a new item
observable.subscribe(item => doSomething(item));

Streams can be manipulated with traditional array conversion functions such as map and filter

  observable
    .filter(node => node.children.length > 2)
    .map(node => node.name);

You can merge, concat and do other operations on streams to produce new streams from the existing ones

const resultStream = stream1.merge(stream2);

Observables can also be created from e.g. objects, maps and arrays

Rx.Observable.of(42);
Rx.Observable.from([1,2,3,4]);
Rx.Observable.range(1,10);

Subscribing

  • Subscribe method takes three functions as arguments:

    • onNext: called when a new item is emitted
    • onError: called if observable sequence fails
    • onComplete: called when sequence is complete
    observable.subscribe(
        next => doSomething(next), // onNext
        error => handleError(error), // onError
        () => done() // onComplete
    );

Unsubscribing (cancelling)

  • Observable sequence subscriptions can be unsubscribed
    • E.g. observable that produces events that are saved into the memory
      const eventSubscription = eventStream.subscribe(
            event => this.events.push(event)
      );
    • Sequence will not stop until unsubscribed
      eventSubscription.unsubscribe();

Demo


Catching Errors

  • Observables die on errors
  • The way to survive from errors is by catching them and returning a new observable sequence
    observable.catch((error) => {
        console.log(error);
        return Rx.Observable.of([1, 2, 3]);
    };

Hot vs. Cold Observables

  • Cold observables start running upon subscription
    • E.g. http request
  • Hot observables are already producing values before the subscription is active
    • E.g. mouse move events

Observables in Angular

  • Observables used exclusively instead of promises
    • E.g. HTTP requests only result in single event (one response) but they are modeled as observables
        this.httpClient.get('url/restapi/resource') // Returns observable
            .subscribe(
                data => { this.data = data}, // Success
                err => console.error(err), // Failure
                () => console.log('done') // Done
            );

Observables in Angular

  • Changes in route parameters are propagated through an observable sequence
  constructor(route: ActivatedRoute) {
      route.params.subscribe(params => this.index = +params['index']);
  }

Exercises

Open exercise instructions


RxJS 5.5

  • RxJS 5.5.0 introduced major change to RxJS called pipeable operators
  • Importing is a mess with 5.5 but RxJS 6 will fix it
  • Read more

Pipeable Operators Example

Pre RxJS 5.5

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/map';

const source$ = Observable.range(0, 10);
source$
  .filter(x => x % 2 === 0)
  .map(x => x + x)
  .subscribe(x => console.log(x))

RxJS 5.5 ->

import { range } from 'rxjs/observable/range';
import { map } from 'rxjs/operators/map';
import { filter } from 'rxjs/operators/filter';

const source$ = range(0, 10);
source$.pipe(
  filter(x => x % 2 === 0),
  map(x => x + x)
).subscribe(x => console.log(x))

RxJS 5.5 Renaming

Some of the operators are reserved words in JavaScript:

  • do -> tap
  • catch -> catchError
  • switch -> switchAll
  • finally -> finalize

RxJS 5.5 Pros

  • No more "prototype patching" -> Tree-shaking possible -> Smaller bundle sizes
  • Custom operators are easier to make
  • Better tooling support by linters and compilers

RxJS 6

  • Released 04/2018
  • Major changes:
  • Simpler imports (import { map, filter } from 'rxjs/operators')
  • Errors thrown asynchronously
  • Deprecations
  • New operator (throwIfEmpty)
  • Provides compatibility library (rxjs-compat) to support the migration from 5 to 6
  • See Ben Lesh's (RxJS 5 and 6 author) presentation in ngConf 2018 for more details (slides, video)
  • rxjs-dev