name: rxjs-patterns-for-angular description: “在Angular中实现响应式编程的RxJS模式。当处理Observables、操作符、订阅、异步数据流和错误处理时使用这项技能。涵盖常见模式如combineLatest、switchMap、debounceTime、catchError、retry逻辑以及与Angular信号的集成,使用toSignal()和toObservable()。确保使用takeUntilDestroyed()进行适当的订阅清理。” license: “MIT”
RxJS模式 for Angular 技能
这项技能有助于在Angular应用程序中使用RxJS实现响应式模式。
核心原则
现代Angular + RxJS
- 信号优先:使用信号处理状态,RxJS处理异步操作
- 自动清理:使用
takeUntilDestroyed()进行订阅管理 - 互操作性:使用
toSignal()和toObservable()进行信号/Observable转换 - AsyncPipe:当不使用信号时,在模板中优先使用AsyncPipe
关键概念
- Observables用于异步数据流
- 操作符用于数据转换
- 订阅管理和清理
- 错误处理和重试逻辑
信号 + RxJS集成
toSignal() - Observable到信号
import { Component, inject } from '@angular/core';
import { toSignal } from '@angular/core/rxjs-interop';
import { HttpClient } from '@angular/common/http';
@Component({
selector: 'app-task-list',
template: `
@if (tasks(); as taskList) {
@for (task of taskList; track task.id) {
<div>{{ task.title }}</div>
}
}
`
})
export class TaskListComponent {
private http = inject(HttpClient);
// 将Observable转换为信号
tasks = toSignal(
this.http.get<Task[]>('/api/tasks'),
{ initialValue: [] }
);
}
toObservable() - 信号到Observable
import { Component, signal } from '@angular/core';
import { toObservable } from '@angular/core/rxjs-interop';
import { switchMap } from 'rxjs/operators';
@Component({
selector: 'app-search',
template: `
<input
nz-input
[ngModel]="searchQuery()"
(ngModelChange)="searchQuery.set($event)"
/>
@if (results(); as resultList) {
@for (result of resultList; track result.id) {
<div>{{ result.name }}</div>
}
}
`
})
export class SearchComponent {
searchQuery = signal('');
// 将信号转换为Observable并转换
private searchQuery$ = toObservable(this.searchQuery);
results = toSignal(
this.searchQuery$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
),
{ initialValue: [] }
);
}
订阅管理
takeUntilDestroyed() - 自动清理
import { Component, inject, signal, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';
@Component({
selector: 'app-timer',
template: `<div>Time: {{ time() }}</div>`
})
export class TimerComponent {
private destroyRef = inject(DestroyRef);
time = signal(0);
constructor() {
// 订阅在组件销毁时自动清理
interval(1000)
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(value => this.time.set(value));
}
}
手动清理(遗留模式 - 避免)
// ❌ 不要:手动订阅管理(旧模式)
export class LegacyComponent implements OnDestroy {
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(
this.dataService.getData().subscribe(data => {
// 处理数据
})
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
// ✅ 做:使用takeUntilDestroyed()
export class ModernComponent {
private destroyRef = inject(DestroyRef);
data = signal<any>(null);
constructor() {
this.dataService.getData()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(data => this.data.set(data));
}
}
常用操作符
switchMap - 切换到新的Observable
// 在每个查询更改时切换到新的搜索
searchResults$ = this.searchQuery$.pipe(
debounceTime(300),
switchMap(query => this.http.get(`/api/search?q=${query}`))
);
mergeMap - 合并多个Observables
// 并行处理所有任务
processTasks$ = this.tasks$.pipe(
mergeMap(tasks =>
from(tasks).pipe(
mergeMap(task => this.processTask(task))
)
)
);
concatMap - 顺序处理
// 按顺序一个接一个处理任务
processTasks$ = this.tasks$.pipe(
concatMap(tasks =>
from(tasks).pipe(
concatMap(task => this.processTask(task))
)
)
);
debounceTime - 抑制输入
// 用户停止输入后等待300ms
search$ = this.searchInput$.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query))
);
distinctUntilChanged - 跳过重复项
// 仅在值实际变化时发出
status$ = this.statusSubject$.pipe(
distinctUntilChanged()
);
filter - 过滤值
// 仅发出非空字符串
nonEmptySearch$ = this.searchQuery$.pipe(
filter(query => query.trim().length > 0),
switchMap(query => this.search(query))
);
map - 转换值
// 将任务转换为显示格式
taskDisplay$ = this.task$.pipe(
map(task => ({
title: task.title,
status: task.status.toUpperCase(),
dueDate: formatDate(task.dueDate)
}))
);
tap - 副作用
// 不转换地记录
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
tap(tasks => console.log('Loaded tasks:', tasks.length)),
tap(tasks => this.analyticsService.track('tasks_loaded'))
);
组合Observables
combineLatest - 等待全部
import { combineLatest } from 'rxjs';
// 组合多个observables
viewModel$ = combineLatest([
this.tasks$,
this.users$,
this.settings$
]).pipe(
map(([tasks, users, settings]) => ({
tasks,
users,
settings
}))
);
// 转换为信号
viewModel = toSignal(this.viewModel$);
forkJoin - 等待全部完成
import { forkJoin } from 'rxjs';
// 并行加载多个资源
loadAll$ = forkJoin({
tasks: this.taskService.getTasks(),
users: this.userService.getUsers(),
projects: this.projectService.getProjects()
}).pipe(
map(({ tasks, users, projects }) => ({
tasks,
users,
projects
}))
);
merge - 合并多个流
import { merge } from 'rxjs';
// 组合多个事件流
allEvents$ = merge(
this.createEvent$,
this.updateEvent$,
this.deleteEvent$
).pipe(
tap(event => this.handleEvent(event))
);
zip - 配对值
import { zip } from 'rxjs';
// 从两个流中配对匹配的值
paired$ = zip(
this.stream1$,
this.stream2$
).pipe(
map(([value1, value2]) => ({ value1, value2 }))
);
错误处理
catchError - 处理错误
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
catchError(error => {
console.error('Failed to load tasks:', error);
this.notificationService.error('Failed to load tasks');
return of([]); // 返回空数组作为回退
})
);
retry - 失败时重试
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
retry(3), // 最多重试3次
catchError(error => {
console.error('Failed after 3 retries:', error);
return of([]);
})
);
retryWhen - 有条件的重试和退避
import { retryWhen, delay, scan, throwError } from 'rxjs';
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) {
throw error; // 达到最大重试次数
}
console.log(`Retry ${retryCount + 1}/3`);
return retryCount + 1;
}, 0),
delay(1000) // 重试间隔1秒
)
),
catchError(error => {
console.error('Failed after retries:', error);
return of([]);
})
);
实时数据
interval - 定期更新
import { interval, switchMap } from 'rxjs';
// 每30秒轮询一次
liveData$ = interval(30000).pipe(
startWith(0), // 立即发出
switchMap(() => this.http.get('/api/live-data')),
takeUntilDestroyed(this.destroyRef)
);
liveData = toSignal(this.liveData$);
WebSocket模式
import { webSocket } from 'rxjs/webSocket';
export class RealtimeService {
private socket$ = webSocket('wss://api.example.com/ws');
messages$ = this.socket$.pipe(
catchError(error => {
console.error('WebSocket error:', error);
return EMPTY;
}),
retry({ delay: 5000 }) // 5秒后重连
);
sendMessage(msg: any): void {
this.socket$.next(msg);
}
}
加载状态
共享加载状态
import { shareReplay } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class TaskService {
private http = inject(HttpClient);
// 缓存并共享结果
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
}
加载指示器模式
@Component({
selector: 'app-task-list',
template: `
@if (loading()) {
<nz-spin />
} @else if (error()) {
<nz-alert nzType="error" [nzMessage]="error()!" />
} @else {
@for (task of tasks(); track task.id) {
<div>{{ task.title }}</div>
}
}
`
})
export class TaskListComponent {
private taskService = inject(TaskService);
private destroyRef = inject(DestroyRef);
loading = signal(false);
error = signal<string | null>(null);
tasks = signal<Task[]>([]);
constructor() {
this.loadTasks();
}
loadTasks(): void {
this.loading.set(true);
this.error.set(null);
this.taskService.tasks$
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe({
next: (tasks) => {
this.tasks.set(tasks);
this.loading.set(false);
},
error: (err) => {
this.error.set(err.message || 'Failed to load tasks');
this.loading.set(false);
}
});
}
}
高级模式
节流与去抖
import { throttleTime, debounceTime } from 'rxjs';
// 节流:首先发出,然后忽略一段时间
throttled$ = this.clicks$.pipe(
throttleTime(1000) // 最多每秒一次
);
// 去抖:等待安静期
debounced$ = this.input$.pipe(
debounceTime(300) // 停止输入后等待300ms
);
扫描 - 累积值
// 运行总计
total$ = this.amounts$.pipe(
scan((acc, value) => acc + value, 0)
);
// 历史累积
history$ = this.events$.pipe(
scan((history, event) => [...history, event], [] as Event[])
);
startWith - 初始值
// 以加载状态开始
status$ = this.dataLoad$.pipe(
map(() => 'loaded'),
startWith('loading')
);
pairwise - 前一个 + 当前
// 与前一个值比较
changes$ = this.value$.pipe(
pairwise(),
map(([prev, curr]) => ({
previous: prev,
current: curr,
diff: curr - prev
}))
);
最佳实践
✅ 做
// 使用toSignal()将Observables转换为模板中的响应式数据
data = toSignal(this.data$, { initialValue: [] });
// 使用takeUntilDestroyed()进行清理
this.data$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe();
// 使用switchMap处理用户触发的请求
search$ = this.query$.pipe(switchMap(q => this.search(q)));
// 明确处理错误
data$ = this.http.get('/api/data').pipe(
catchError(err => of(null))
);
❌ 不要
// 不要忘记取消订阅
this.data$.subscribe(); // 内存泄漏!
// 不要使用嵌套订阅
this.data$.subscribe(data => {
this.process(data).subscribe(); // 反模式!
});
// 不要与信号一起使用异步管道
@if (data$ | async) { } // 使用信号代替
清单
使用RxJS时:
- [ ] 使用toSignal()将Observables转换为信号
- [ ] 使用takeUntilDestroyed()进行订阅清理
- [ ] 使用catchError()处理错误
- [ ] 用户输入去抖(300ms)
- [ ] 使用switchMap进行可取消请求
- [ ] 使用shareReplay()共享昂贵的Observables
- [ ] 使用startWith()提供初始值
- [ ] 过滤掉空/无效值
- [ ] 测试异步操作
- [ ] 记录复杂的操作符链