响应式编程
概览
使用响应式流和可观察对象构建响应式应用程序,用于处理异步数据流。
何时使用
- 复杂的异步数据流
- 实时数据更新
- 事件驱动架构
- UI状态管理
- WebSocket/SSE处理
- 合并多个数据源
实现示例
1. RxJS基础
import { Observable, Subject, BehaviorSubject, fromEvent, interval } from 'rxjs';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
// 从数组创建可观察对象
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
numbers$.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('完成')
});
// 主题(多播)
const subject = new Subject<number>();
subject.subscribe(value => console.log('订阅1:', value));
subject.subscribe(value => console.log('订阅2:', value));
subject.next(1); // 所有订阅者接收
// BehaviorSubject(带有初始值)
const state$ = new BehaviorSubject({ count: 0 });
state$.subscribe(state => console.log('状态:', state));
state$.next({ count: 1 });
state$.next({ count: 2 });
// 操作符
const source$ = interval(1000);
source$.pipe(
map(n => n * 2),
filter(n => n > 5),
take(5)
).subscribe(value => console.log(value));
2. 带防抖的搜索
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
const searchInput = document.querySelector('#search') as HTMLInputElement;
const search$ = fromEvent(searchInput, 'input').pipe(
map((event: Event) => (event.target as HTMLInputElement).value),
debounceTime(300), // 打字后等待300ms
distinctUntilChanged(), // 只有当值改变时
switchMap(query => {
if (!query) return of([]);
return fetch(`/api/search?q=${query}`)
.then(res => res.json())
.catch(() => of([]));
}),
catchError(error => {
console.error('搜索错误:', error);
return of([]);
})
);
search$.subscribe(results => {
console.log('搜索结果:', results);
显示结果(results);
});
function 显示结果(results: any[]) {
// 更新UI
}
3. 状态管理
import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
interface AppState {
user: { id: string; name: string } | null;
cart: Array<{ id: string; quantity: number }>;
loading: boolean;
}
class StateManager {
private state$ = new BehaviorSubject<AppState>({
user: null,
cart: [],
loading: false
});
// 选择器
user$ = this.state$.pipe(
map(state => state.user),
distinctUntilChanged()
);
cart$ = this.state$.pipe(
map(state => state.cart),
distinctUntilChanged()
);
cartTotal$ = this.cart$.pipe(
map(cart => cart.reduce((sum, item) => sum + item.quantity, 0))
);
loading$ = this.state$.pipe(
map(state => state.loading)
);
// 动作
setUser(user: AppState['user']): void {
this.state$.next({
...this.state$.value,
user
});
}
addToCart(item: { id: string; quantity: number }): void {
const cart = [...this.state$.value.cart];
const existing = cart.find(i => i.id === item.id);
if (existing) {
existing.quantity += item.quantity;
} else {
cart.push(item);
}
this.state$.next({
...this.state$.value,
cart
});
}
setLoading(loading: boolean): void {
this.state$.next({
...this.state$.value,
loading
});
}
getState(): AppState {
return this.state$.value;
}
}
// 使用
const store = new StateManager();
store.user$.subscribe(user => {
console.log('用户:', user);
});
store.cartTotal$.subscribe(total => {
console.log('购物车商品:', total);
});
store.setUser({ id: '123', name: 'John' });
store.addToCart({ id: 'item1', quantity: 2 });
4. 带重连的WebSocket
import { Observable, timer } from 'rxjs';
import { retryWhen, tap, delayWhen } from 'rxjs/operators';
function createWebSocketObservable(url: string): Observable<any> {
return new Observable(subscriber => {
let ws: WebSocket;
const connect = () => {
ws = new WebSocket(url);
ws.onopen = () => {
console.log('WebSocket连接');
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
subscriber.next(data);
} catch (error) {
console.error('解析错误:', error);
}
};
ws.onerror = (error) => {
console.error('WebSocket错误:', error);
subscriber.error(error);
};
ws.onclose = () => {
console.log('WebSocket关闭');
subscriber.error(new Error('连接关闭'));
};
};
connect();
return () => {
if (ws) {
ws.close();
}
};
}).pipe(
retryWhen(errors =>
errors.pipe(
tap(err => console.log('重连中...', err)),
delayWhen((_, i) => timer(Math.min(1000 * Math.pow(2, i), 30000)))
)
)
);
}
// 使用
const ws$ = createWebSocketObservable('wss://api.example.com/ws');
ws$.subscribe({
next: data => console.log('接收到:', data),
error: err => console.error('错误:', err)
});
5. 合并多个流
import { combineLatest, merge, forkJoin, zip } from 'rxjs';
// combineLatest - 当任何输入发射时发射
const users$ = fetchUsers();
const settings$ = fetchSettings();
combineLatest([users$, settings$]).subscribe(([users, settings]) => {
console.log('用户:', users);
console.log('设置:', settings);
});
// merge - 合并多个可观察对象
const clicks$ = fromEvent(button1, 'click');
const hovers$ = fromEvent(button2, 'mouseover');
merge(clicks$, hovers$).subscribe(event => {
console.log('事件:', event.type);
});
// forkJoin - 等待全部完成(像Promise.all)
forkJoin({
users: fetchUsers(),
posts: fetchPosts(),
comments: fetchComments()
}).subscribe(({ users, posts, comments }) => {
console.log('所有数据加载:', { users, posts, comments });
});
// zip - 合并对应的值
const names$ = of('Alice', 'Bob', 'Charlie');
const ages$ = of(25, 30, 35);
zip(names$, ages$).subscribe(([name, age]) => {
console.log(`${name} 是 ${age} 岁`);
});
6. 背压处理
import { Subject } from 'rxjs';
import { bufferTime, throttleTime } from 'rxjs/operators';
// 缓冲事件
const events$ = new Subject<string>();
events$.pipe(
bufferTime(1000), // 1秒内收集事件
filter(buffer => buffer.length > 0)
).subscribe(events => {
console.log('批次:', events);
处理批次(events);
});
// 节流事件
const clicks$ = fromEvent(button, 'click');
clicks$.pipe(
throttleTime(1000) // 每秒只允许一个
).subscribe(() => {
console.log('点击处理');
});
function 处理批次(events: string[]) {
// 处理批次
}
7. 自定义操作符
import { Observable } from 'rxjs';
function tapLog<T>(message: string) {
return (source: Observable<T>) => {
return new Observable<T>(subscriber => {
return source.subscribe({
next: value => {
console.log(message, value);
subscriber.next(value);
},
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
});
};
}
// 使用
source$.pipe(
tapLog('映射前:'),
map(x => x * 2),
tapLog('映射后:')
).subscribe();
最佳实践
✅ 要做
- 取消订阅以防止内存泄漏
- 使用操作符转换数据
- 正确处理错误
- 对昂贵的操作使用shareReplay
- 需要时合并流
- 测试响应式代码
❌ 不要做
- 多次订阅同一个可观察对象
- 忘记取消订阅
- 使用嵌套订阅
- 忽略错误处理
- 使可观察对象有状态
常用操作符
| 操作符 | 目的 |
|---|---|
| map | 转换值 |
| filter | 过滤值 |
| debounceTime | 发射前等待 |
| distinctUntilChanged | 仅当改变时发射 |
| switchMap | 切换到新的可观察对象 |
| mergeMap | 合并多个可观察对象 |
| catchError | 处理错误 |
| tap | 副作用 |
| take | 取n个值 |
| takeUntil | 直到条件取值 |