跳转到主要内容

Let’s start from the basics and gradually progress towards more advanced concepts in RxJS in Angular

Observables and Observers:

Observables are the foundation of RxJS, representing a stream of data that can be observed over time. Observers are the consumers of these streams, listening for emitted data and reacting accordingly.

Observables and Observers are similar to the concepts of events and event handlers in C#.

Observables:

In Angular, you can create an Observable using the Observable class from the rxjs library. An Observable can emit data asynchronously over time, and can be subscribed by Observers to receive the emitted data.

Example in Angular:

import { Observable } from 'rxjs';

// Create an Observable that emits a stream of numbers
const numbers$ = new Observable(observer => {
  let count = 1;
  const intervalId = setInterval(() => {
    observer.next(count++);
  }, 1000);

  // Cleanup logic when the Observable is unsubscribed
  return () => {
    clearInterval(intervalId);
  };
});

// Subscribe to the Observable to receive the emitted numbers
numbers$.subscribe(value => console.log(value));

Observers:

In Angular, an Observer is an object that defines how to handle the emitted data from an Observable.

It has three optional methods:

next for handling the emitted data

error for handling any errors

complete for handling the completion of the Observable.

Example in Angular:

import { Observable } from 'rxjs';

// Create an Observable that emits a stream of numbers
const numbers$ = new Observable(observer => {
  let count = 1;
  const intervalId = setInterval(() => {
    observer.next(count++);
  }, 1000);

  // Cleanup logic when the Observable is unsubscribed
  return () => {
    clearInterval(intervalId);
  };
});

// Define an Observer to handle the emitted numbers
const observer = {
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('Observable completed')
};

// Subscribe the Observer to the Observable
numbers$.subscribe(observer);
  1. Operators: Operators are functions that allow you to transform, filter, and combine data streams emitted by Observables.They are similar to LINQ operators in C#.
  • Transformation Operators: Transformation operators allow you to transform the data emitted by an Observable into a different format or structure.
  • Example in Angular with map operator:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use map operator to square each number in the stream
const squaredNumbers$ = numbers$.pipe(
  map(num => num * num)
);

// Subscribe to the transformed data stream
squaredNumbers$.subscribe(value => console.log(value));
  • Filtering Operators: Filtering operators allow you to filter the data emitted by an Observable based on a given condition.

Example in Angular with filter operator:

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use filter operator to get only even numbers from the stream
const evenNumbers$ = numbers$.pipe(
  filter(num => num % 2 === 0)
);

// Subscribe to the filtered even numbers stream
evenNumbers$.subscribe(value => console.log(value));

Subjects:

Subjects are both Observables and Observers, which allows you to emit and subscribe to data streams directly.

Subjects are similar to the concept of event emitters in C#.

Example in Angular with Subject:

import { Subject } from 'rxjs';

// Create a Subject to emit and subscribe to a stream of numbers
const numbersSubject$ = new Subject<number>();

// Subscribe to the Subject to receive emitted numbers
numbersSubject$.subscribe(value => console.log(value));

// Emit numbers to the Subject
numbersSubject$.next(1);
numbersSubject$.next(2);
numbersSubject$.next(3);

Hot and Cold Observables:

Observables can be categorized into two types

  1. Hot
  2. Cold.

Hot Observables emit data regardless of whether there are any subscribers Cold Observables only emit data when there are active subscribers.

Example in Angular with Hot and Cold Observables:

import { interval, fromEvent } from 'rxjs';

// Hot Observable - emits data regardless of subscribers
const hotObservable$ = interval(1000);

// Cold Observable - emits data only when subscribed
const button = document.querySelector('button');
const coldObservable$ = fromEvent(button, 'click');

// Subscribe to both Observables
hotObservable$.subscribe(value => console.log(`Hot: ${value}`));
coldObservable$.subscribe(value => console.log(`Cold: ${value}`));

Error handling:

RxJS provides operators for handling errors in Observables, such as catchError and retry, which allow you to handle errors and retries in a stream of data.

Example in Angular with error handling:

import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

// Create an observable that may emit an error
const numbers$ = of(1, 2, 3, 4, 5, 'six');

// Use catchError operator to handle errors
const numbersWithErrorHandled$ = numbers$.pipe(
  catchError(err => of('Error occurred:', err))
);

// Use retry operator to retry the observable in case of error
const numbersWithRetry$ = numbers$.pipe(
  retry(2) // Retry 2 times in case of error
);

// Subscribe to the error handled and retried observables
numbersWithErrorHandled$.subscribe(value => console.log(value));
numbersWithRetry$.subscribe(value => console.log(value));

Custom operators:

RxJS allows you to create your own custom operators by composing existing operators or extending the Observable class. This gives you flexibility to create reusable and specialized operators for your specific use cases.

Example in Angular with a custom operator:

import { Observable, OperatorFunction } from 'rxjs';
import { map } from 'rxjs/operators';

// Custom operator to multiply each emitted number by a given factor
function multiplyBy(factor: number): OperatorFunction<number, number> {
  return (source: Observable<number>) =>
    source.pipe(map(num => num * factor));
}

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use the custom multiplyBy operator to multiply each number by 10
const multipliedNumbers$ = numbers$.pipe(multiplyBy(10));

// Subscribe to the multiplied numbers
multipliedNumbers$.subscribe(value => console.log(value));

Schedulers:

RxJS allows you to control the execution context or scheduler of an Observable. Schedulers provide options for managing concurrency, controlling timing, and executing code on specific threads or contexts.

Example in Angular with schedulers:

import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use observeOn operator to specify an asyncScheduler for subscription
const asyncNumbers$ = numbers$.pipe(observeOn(asyncScheduler));

// Subscribe to the numbers with asyncScheduler
asyncNumbers$.subscribe(value => console.log(value));

// Use observeOn operator with other schedulers like asapScheduler or queueScheduler
const asapNumbers$ = numbers$.pipe(observeOn(asapScheduler));
const queueNumbers$ = numbers$.pipe(observeOn(queueScheduler));

Multicasting:

By default, Observables are unicast, meaning each subscription creates a separate execution of the Observable.

However, you can multicast Observables to share a single execution among multiple subscribers, which can improve performance and reduce duplicated work.

Example in Angular with multicasting:

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

// Create a hot Observable that emits numbers every second
const numbers$ = interval(1000).pipe(multicast(() => new Subject()), refCount());

// Subscribe to the hot Observable from multiple subscribers
numbers$.subscribe(value => console.log(`Subscriber 1: ${value}`));
numbers$.subscribe(value => console.log(`Subscriber 2: ${value}`));

// Start the execution of the hot Observable
numbers$.connect();

Customizing the Observable:

You can also customize the behavior of an Observable by extending the Observable class and implementing your own logic for emitting values, handling errors, and managing subscriptions.

Example in Angular with a custom Observable:

import { Observable } from 'rxjs';

// Custom Observable that emits a sequence of numbers
class MyNumbersObservable extends Observable<number> {
  private currentNumber = 1;

  constructor(private maxNumber: number) {
    super(subscriber => {
      const intervalId = setInterval(() => {
        if (this.currentNumber <= maxNumber) {
          subscriber.next(this.currentNumber++);
        } else {
          subscriber.complete();
          clearInterval(intervalId);
        }
      }, 1000);
    });
  }
}

// Create an instance of the custom Observable
const myNumbers$ = new MyNumbersObservable(5);

// Subscribe to the custom Observable
myNumbers$.subscribe(value => console.log(value));

Backpressure:

RxJS provides mechanisms for handling backpressure, which occurs when the rate of emission from an Observable is higher than the rate of consumption by subscribers. Backpressure strategies allow you to control how data is buffered, dropped, or managed when dealing with high-rate data streams.

Example in Angular with backpressure:

import { interval, bufferTime } from 'rxjs';

// Create a fast-emitting Observable that emits numbers every 100ms
const fastNumbers$ = interval(100);

// Use bufferTime operator to buffer emitted numbers for every 1 second
const bufferedNumbers$ = fastNumbers$.pipe(bufferTime(1000));

// Subscribe to the buffered numbers
bufferedNumbers$.subscribe(values => console.log(values));

Error Handling:

RxJS provides operators for handling errors that may occur in the Observable stream.

You can catch and handle errors, retry failed Observables, and take other actions to gracefully handle errors in your application.

Example in Angular with error handling:

import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

// Create an Observable that may throw an error
const numbers$ = of(1, 2, 3, 4, 5, 'invalid', 7, 8, 9);

// Use catchError operator to catch and handle errors
const safeNumbers$ = numbers$.pipe(
  catchError(error => {
    console.error(`Error: ${error}`);
    return of('Error occurred. Continuing with default value.');
  })
);

// Use retry operator to retry failed Observables
const retryNumbers$ = safeNumbers$.pipe(
  retry(2) // Retry failed Observables up to 2 times
);

// Subscribe to the safe and retrying numbers
retryNumbers$.subscribe(value => console.log(value));

Custom Operators:

RxJS allows you to create custom operators by combining existing operators or by implementing your own logic for transforming or filtering values in the Observable stream. Custom operators can provide reusable and specialized functionality for your specific use cases.

Example in Angular with a custom operator:

import { Observable, OperatorFunction } from 'rxjs';
import { filter } from 'rxjs/operators';

// Custom operator that filters out odd numbers
function filterOutOddNumbers(): OperatorFunction<number, number> {
  return (source: Observable<number>) =>
    new Observable<number>(subscriber => {
      return source.subscribe(value => {
        if (value % 2 === 0) {
          subscriber.next(value);
        }
      });
    });
}

// Create an Observable that emits a sequence of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use the custom filterOutOddNumbers operator
const filteredNumbers$ = numbers$.pipe(filterOutOddNumbers());

// Subscribe to the filtered numbers
filteredNumbers$.subscribe(value => console.log(value));

This is just a brief overview of some of the basic to advanced concepts and features of RxJS. RxJS is a powerful and flexible library that can greatly simplify and enhance your asynchronous programming in Angular or any other JavaScript environment. I recommend referring to the official RxJS documentation for more in-depth explanations and examples.

I hope this detailed blog post on Observables and subject in Rxjs with examples. Remember to apply these concepts wisely in your code and make use of the code examples provided to enhance your understanding.

Happy coding!

Please let me know your thoughts on this story by clapping or leaving a comment with suggestions for future topics.

标签