RxJS (Reactive Extensions for JavaScript) 是 Angular 中处理异步编程的核心库。 它通过使用Observable(可观察对象)序列来编写异步和基于回调的代码。
一、 核心概念
在 RxJS 中,一切基于数据流。
- Observable (被观察者):数据的源头,发出数据。
- Observer (观察者):数据的消费者,接收数据。
- Subscription (订阅):连接 Observable 和 Observer 的桥梁。注意:必须取消订阅,否则会内存泄漏。
- Operators (操作符):纯函数,用来处理、转换数据流(如 map, filter)。
- Subject (主题):既是 Observable 又是 Observer,可以多播数据(常用于组件通信)。
二、 基础写法
1. 创建 Observable 和 订阅
import { Observable } from 'rxjs'; // 1. 创建 Observable const observable$ = new Observable(subscriber => { subscriber.next(1); // 发出数据 subscriber.next(2); subscriber.next(3); subscriber.complete(); // 结束 // subscriber.error('出错了'); // 抛出异常 }); // 2. 订阅 const subscription = observable$.subscribe({ next: (x) => console.log('收到数据:', x), error: (err) => console.error('错误:', err), complete: () => console.log('流结束') }); // 3. 取消订阅 (非常重要) subscription.unsubscribe();2. 简写订阅 (只关心 next)
observable$.subscribe(data => console.log(data));三、 常用创建操作符
用于生成数据流。
import { of, from, interval, fromEvent, throwError } from 'rxjs'; // 1. of: 依次发出参数 of(1, 2, 3).subscribe(console.log); // 输出: 1, 2, 3 // 2. from: 将数组/Promise 转为 Observable from([10, 20, 30]).subscribe(console.log); // 输出: 10, 20, 30 // 3. interval: 周期性发出数字 (每1秒发一个) interval(1000).subscribe(n => console.log(n)); // 0, 1, 2... // 4. fromEvent: 监听 DOM 事件 fromEvent(document.querySelector('button')!, 'click') .subscribe(() => console.log('按钮被点击')); // 5. throwError: 创建一个只报错的流 // throwError(() => new Error('哎呀出错了')).subscribe();四、 常用转换操作符
这是 RxJS 最强大的部分,管道语法是 Angular 18+ 的标准写法。
import { map, filter, pluck } from 'rxjs/operators'; of(1, 2, 3, 4, 5).pipe( // 1. map: 转换数据 (类似数组的 map) map(x => x * 10), // 2. filter: 过滤数据 (只有 true 才会通过) filter(x => x > 20) ).subscribe(console.log); // 输出: 30, 40, 50 // 3. pluck: 提取对象属性 (已废弃,推荐用 map) // 旧写法: source$.pipe(pluck('user', 'name')) // 新写法: interface User { name: string; age: number; } const user$: Observable<User> = of({ name: 'Tom', age: 18 }); user$.pipe(map(user => user.name)).subscribe(console.log);五、 工具操作符 (面试高频)
用于处理流的逻辑,如限流、防抖、错误处理。
import { delay, tap, catchError, takeUntil, debounceTime } from 'rxjs/operators'; import { of, Subject, throwError } from 'rxjs'; // 1. tap: 副作用操作 (不修改数据,通常用于打印日志、存 LocalStorage) of('Hello').pipe( tap(val => console.log('处理前:', val)), delay(1000) // 延迟1秒发射 ).subscribe(val => console.log('处理后:', val)); // 2. catchError: 错误捕获 (让流不中断) throwError(() => new Error('网络错误')).pipe( catchError(err => { console.error(err); // 捕获错误后,返回一个新的 Observable 给下游,防止程序崩溃 return of('默认数据'); }) ).subscribe(console.log); // 输出: 默认数据 // 3. debounceTime: 防抖 (用户停止输入 300ms 后才发送请求) fromEvent(document.querySelector('input')!, 'input').pipe( debounceTime(300) ).subscribe((event: any) => console.log(event.target.value)); // 4. takeUntil: 立即取消订阅 (在 Angular 组件销毁时最常用) const destroy$ = new Subject<void>(); interval(1000).pipe( takeUntil(destroy$) // 当 destroy$ 发出值时,上面的流自动停止 ).subscribe(console.log); // 模拟组件销毁 setTimeout(() => { destroy$.next(); // 停止上面的 interval destroy$.complete(); }, 5000);六、 高阶操作符 (处理嵌套流)
当一个 Observable 发出的数据还是一个 Observable 时使用。
import { mergeMap, switchMap, concatMap, exhaustMap } from 'rxjs/operators'; // 场景:点击按钮 -> 发送 HTTP 请求 // 假设 click$ 是点击事件流, getData(id) 返回 Observable // 1. mergeMap (并行): 点击一次发一次请求,不管上一个有没有完成。 // 适用:并发上传,互不干扰。 click$.pipe( mergeMap(() => this.http.get('/api/data')) ).subscribe(); // 2. switchMap (切换): **面试必考**。如果有新请求,取消旧请求。 // 适用:搜索框输入。 searchInput$.pipe( switchMap(keyword => this.http.search(keyword)) ).subscribe(); // 3. concatMap (串行): 等前一个请求完成,再发下一个。 // 适用:必须按顺序执行的任务。 // 4. exhaustMap (排他): 如果有请求正在进行,忽略新的点击。 // 适用:防止重复提交表单。 submitBtn$.pipe( exhaustMap(() => this.http.submit()) ).subscribe();七、 Subject (多播)
普通的 Observable 是单播的;Subject 可以让多个订阅者共享同一个数据源。
import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs'; // 1. Subject: 只有订阅后发出的数据才会收到。 const subject = new Subject<number>(); subject.subscribe(n => console.log('A:', n)); subject.next(1); // A 收到 1 subject.subscribe(n => console.log('B:', n)); subject.next(2); // A 收到 2, B 收到 2 (B 错过了 1) // 2. BehaviorSubject: 必须有初始值,新订阅者会立即收到**最新**的值。 const bs = new BehaviorSubject<number>(0); // 初始值 0 bs.subscribe(n => console.log('C:', n)); // C 立即收到 0 bs.next(100); // 3. ReplaySubject: 可以缓存最近的 N 个值,新订阅者会收到缓存的历史记录。 const rs = new ReplaySubject(2); // 缓存最近 2 个 rs.next(1); rs.next(2); rs.next(3); rs.subscribe(n => console.log('D:', n)); // D 收到 2 和 3八、 Angular 实战:AsyncPipe (语法糖)
在 Angular 中,你甚至不需要手动调用.subscribe()。
// 组件 TS export class MyComponent { // 自动处理订阅、取消订阅、变化检测 data$ = of([{ name: 'Tom' }, { name: 'Jerry' }]); } // 组件 HTML <div *ngFor="let item of data$ | async"> {{ item.name }} </div>注意:如果你需要拿到数据后在 TS 逻辑里做复杂处理,还是需要手动 subscribe 并配合takeUntil使用。
总结速查表
| 类别 | 操作符 | 作用 |
|---|---|---|
| 创建 | of, from, interval | 造数据 |
| 转换 | map, filter | 改数据 |
| 工具 | tap, delay, debounceTime | 辅助/拦截 |
| 组合 | switchMap, mergeMap | 处理嵌套流 (HTTP) |
| 生命周期 | takeUntil, first, take | 管理订阅 |
| 错误 | catchError, retry | 异常处理 |
| 多播 | Subject, BehaviorSubject | 跨组件通信 |
原文: https://juejin.cn/post/75969919