响应式编程 reactive-programming

使用RxJS、流、可观察对象和背压处理实现响应式编程模式。适用于构建事件驱动的UI、处理异步数据流或管理复杂的数据流。

前端开发 0 次安装 0 次浏览 更新于 3/4/2026

响应式编程

概览

使用响应式流和可观察对象构建响应式应用程序,用于处理异步数据流。

何时使用

  • 复杂的异步数据流
  • 实时数据更新
  • 事件驱动架构
  • UI状态管理
  • WebSocket/SSE处理
  • 合并多个数据源

实现示例

1. RxJS基础

import { Observable, Subject, BehaviorSubject, fromEvent, interval } from 'rxjs';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

// 从数组创建可观察对象
const numbers$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

numbers$.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('完成')
});

// 主题(多播)
const subject = new Subject<number>();

subject.subscribe(value => console.log('订阅1:', value));
subject.subscribe(value => console.log('订阅2:', value));

subject.next(1); // 所有订阅者接收

// BehaviorSubject(带有初始值)
const state$ = new BehaviorSubject({ count: 0 });

state$.subscribe(state => console.log('状态:', state));

state$.next({ count: 1 });
state$.next({ count: 2 });

// 操作符
const source$ = interval(1000);

source$.pipe(
  map(n => n * 2),
  filter(n => n > 5),
  take(5)
).subscribe(value => console.log(value));

2. 带防抖的搜索

import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs/operators';
import { of } from 'rxjs';

const searchInput = document.querySelector('#search') as HTMLInputElement;

const search$ = fromEvent(searchInput, 'input').pipe(
  map((event: Event) => (event.target as HTMLInputElement).value),
  debounceTime(300), // 打字后等待300ms
  distinctUntilChanged(), // 只有当值改变时
  switchMap(query => {
    if (!query) return of([]);

    return fetch(`/api/search?q=${query}`)
      .then(res => res.json())
      .catch(() => of([]));
  }),
  catchError(error => {
    console.error('搜索错误:', error);
    return of([]);
  })
);

search$.subscribe(results => {
  console.log('搜索结果:', results);
  显示结果(results);
});

function 显示结果(results: any[]) {
  // 更新UI
}

3. 状态管理

import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';

interface AppState {
  user: { id: string; name: string } | null;
  cart: Array<{ id: string; quantity: number }>;
  loading: boolean;
}

class StateManager {
  private state$ = new BehaviorSubject<AppState>({
    user: null,
    cart: [],
    loading: false
  });

  // 选择器
  user$ = this.state$.pipe(
    map(state => state.user),
    distinctUntilChanged()
  );

  cart$ = this.state$.pipe(
    map(state => state.cart),
    distinctUntilChanged()
  );

  cartTotal$ = this.cart$.pipe(
    map(cart => cart.reduce((sum, item) => sum + item.quantity, 0))
  );

  loading$ = this.state$.pipe(
    map(state => state.loading)
  );

  // 动作
  setUser(user: AppState['user']): void {
    this.state$.next({
      ...this.state$.value,
      user
    });
  }

  addToCart(item: { id: string; quantity: number }): void {
    const cart = [...this.state$.value.cart];
    const existing = cart.find(i => i.id === item.id);

    if (existing) {
      existing.quantity += item.quantity;
    } else {
      cart.push(item);
    }

    this.state$.next({
      ...this.state$.value,
      cart
    });
  }

  setLoading(loading: boolean): void {
    this.state$.next({
      ...this.state$.value,
      loading
    });
  }

  getState(): AppState {
    return this.state$.value;
  }
}

// 使用
const store = new StateManager();

store.user$.subscribe(user => {
  console.log('用户:', user);
});

store.cartTotal$.subscribe(total => {
  console.log('购物车商品:', total);
});

store.setUser({ id: '123', name: 'John' });
store.addToCart({ id: 'item1', quantity: 2 });

4. 带重连的WebSocket

import { Observable, timer } from 'rxjs';
import { retryWhen, tap, delayWhen } from 'rxjs/operators';

function createWebSocketObservable(url: string): Observable<any> {
  return new Observable(subscriber => {
    let ws: WebSocket;

    const connect = () => {
      ws = new WebSocket(url);

      ws.onopen = () => {
        console.log('WebSocket连接');
      };

      ws.onmessage = (event) => {
        try {
          const data = JSON.parse(event.data);
          subscriber.next(data);
        } catch (error) {
          console.error('解析错误:', error);
        }
      };

      ws.onerror = (error) => {
        console.error('WebSocket错误:', error);
        subscriber.error(error);
      };

      ws.onclose = () => {
        console.log('WebSocket关闭');
        subscriber.error(new Error('连接关闭'));
      };
    };

    connect();

    return () => {
      if (ws) {
        ws.close();
      }
    };
  }).pipe(
    retryWhen(errors =>
      errors.pipe(
        tap(err => console.log('重连中...', err)),
        delayWhen((_, i) => timer(Math.min(1000 * Math.pow(2, i), 30000)))
      )
    )
  );
}

// 使用
const ws$ = createWebSocketObservable('wss://api.example.com/ws');

ws$.subscribe({
  next: data => console.log('接收到:', data),
  error: err => console.error('错误:', err)
});

5. 合并多个流

import { combineLatest, merge, forkJoin, zip } from 'rxjs';

// combineLatest - 当任何输入发射时发射
const users$ = fetchUsers();
const settings$ = fetchSettings();

combineLatest([users$, settings$]).subscribe(([users, settings]) => {
  console.log('用户:', users);
  console.log('设置:', settings);
});

// merge - 合并多个可观察对象
const clicks$ = fromEvent(button1, 'click');
const hovers$ = fromEvent(button2, 'mouseover');

merge(clicks$, hovers$).subscribe(event => {
  console.log('事件:', event.type);
});

// forkJoin - 等待全部完成(像Promise.all)
forkJoin({
  users: fetchUsers(),
  posts: fetchPosts(),
  comments: fetchComments()
}).subscribe(({ users, posts, comments }) => {
  console.log('所有数据加载:', { users, posts, comments });
});

// zip - 合并对应的值
const names$ = of('Alice', 'Bob', 'Charlie');
const ages$ = of(25, 30, 35);

zip(names$, ages$).subscribe(([name, age]) => {
  console.log(`${name} 是 ${age} 岁`);
});

6. 背压处理

import { Subject } from 'rxjs';
import { bufferTime, throttleTime } from 'rxjs/operators';

// 缓冲事件
const events$ = new Subject<string>();

events$.pipe(
  bufferTime(1000), // 1秒内收集事件
  filter(buffer => buffer.length > 0)
).subscribe(events => {
  console.log('批次:', events);
  处理批次(events);
});

// 节流事件
const clicks$ = fromEvent(button, 'click');

clicks$.pipe(
  throttleTime(1000) // 每秒只允许一个
).subscribe(() => {
  console.log('点击处理');
});

function 处理批次(events: string[]) {
  // 处理批次
}

7. 自定义操作符

import { Observable } from 'rxjs';

function tapLog<T>(message: string) {
  return (source: Observable<T>) => {
    return new Observable<T>(subscriber => {
      return source.subscribe({
        next: value => {
          console.log(message, value);
          subscriber.next(value);
        },
        error: err => subscriber.error(err),
        complete: () => subscriber.complete()
      });
    });
  };
}

// 使用
source$.pipe(
  tapLog('映射前:'),
  map(x => x * 2),
  tapLog('映射后:')
).subscribe();

最佳实践

✅ 要做

  • 取消订阅以防止内存泄漏
  • 使用操作符转换数据
  • 正确处理错误
  • 对昂贵的操作使用shareReplay
  • 需要时合并流
  • 测试响应式代码

❌ 不要做

  • 多次订阅同一个可观察对象
  • 忘记取消订阅
  • 使用嵌套订阅
  • 忽略错误处理
  • 使可观察对象有状态

常用操作符

操作符 目的
map 转换值
filter 过滤值
debounceTime 发射前等待
distinctUntilChanged 仅当改变时发射
switchMap 切换到新的可观察对象
mergeMap 合并多个可观察对象
catchError 处理错误
tap 副作用
take 取n个值
takeUntil 直到条件取值

资源