RxJS响应式编程技能Skill rxjs

RxJS响应式编程技能是专门用于在Angular和JavaScript应用中处理异步数据流和事件流的专家级工具。它提供了一套完整的操作符库和模式,帮助开发者高效管理HTTP请求、WebSocket连接、状态管理和复杂数据管道。关键词:RxJS响应式编程、Angular异步处理、JavaScript数据流、Observable操作符、WebSocket实时通信、状态管理、错误处理、多播模式、订阅管理。

前端开发 0 次安装 2 次浏览 更新于 2/26/2026

name: rxjs description: RxJS响应式编程模式,包括操作符、错误处理、多播和Angular集成。 allowed-tools: Read, Write, Edit, Bash, Glob, Grep

RxJS 技能

为Angular和JavaScript应用程序中实现响应式编程提供专家级协助。

能力

  • 创建和组合可观察对象
  • 应用转换和过滤操作符
  • 处理错误和重试
  • 实现多播模式
  • 管理订阅和内存
  • 与Angular HTTP和表单集成

使用场景

在以下情况时调用此技能:

  • 处理异步数据流
  • 组合复杂的数据管道
  • 实现实时功能
  • 管理并发请求
  • 处理WebSocket流

输入参数

参数 类型 是否必需 描述
useCase string http、websocket、events、state
operators array 需要的特定操作符
errorHandling boolean 是否包含错误处理

配置示例

{
  "useCase": "http",
  "operators": ["switchMap", "debounceTime", "catchError"],
  "errorHandling": true
}

可观察对象模式

带操作符的HTTP请求

import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import {
  Observable,
  Subject,
  BehaviorSubject,
  catchError,
  debounceTime,
  distinctUntilChanged,
  switchMap,
  retry,
  shareReplay,
  tap,
  map,
  of,
} from 'rxjs';

@Injectable({ providedIn: 'root' })
export class SearchService {
  private http = inject(HttpClient);
  private searchTerms = new Subject<string>();

  // 带重播的共享缓存
  private users$ = this.http.get<User[]>('/api/users').pipe(
    retry(3),
    shareReplay(1),
    catchError(() => of([]))
  );

  search(term: string) {
    this.searchTerms.next(term);
  }

  results$ = this.searchTerms.pipe(
    debounceTime(300),
    distinctUntilChanged(),
    switchMap((term) =>
      term.length < 2
        ? of([])
        : this.http.get<User[]>(`/api/users?search=${term}`).pipe(
            catchError(() => of([]))
          )
    )
  );
}

状态管理模式

import { BehaviorSubject, Observable, distinctUntilChanged, map } from 'rxjs';

interface AppState {
  user: User | null;
  loading: boolean;
  error: string | null;
}

@Injectable({ providedIn: 'root' })
export class StateService {
  private state = new BehaviorSubject<AppState>({
    user: null,
    loading: false,
    error: null,
  });

  // 选择器
  user$ = this.select((state) => state.user);
  loading$ = this.select((state) => state.loading);
  error$ = this.select((state) => state.error);
  isAuthenticated$ = this.user$.pipe(map((user) => !!user));

  private select<T>(selector: (state: AppState) => T): Observable<T> {
    return this.state.pipe(map(selector), distinctUntilChanged());
  }

  setState(partial: Partial<AppState>) {
    this.state.next({ ...this.state.value, ...partial });
  }

  getState(): AppState {
    return this.state.value;
  }
}

WebSocket流

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
  Observable,
  retry,
  share,
  filter,
  map,
  takeUntil,
  Subject,
} from 'rxjs';

interface Message {
  type: string;
  payload: unknown;
}

@Injectable({ providedIn: 'root' })
export class WebSocketService {
  private socket$: WebSocketSubject<Message> | null = null;
  private destroy$ = new Subject<void>();

  connect(url: string): Observable<Message> {
    if (!this.socket$) {
      this.socket$ = webSocket<Message>(url);
    }

    return this.socket$.pipe(
      retry({ delay: 3000, count: 5 }),
      share(),
      takeUntil(this.destroy$)
    );
  }

  on<T>(type: string): Observable<T> {
    return this.socket$!.pipe(
      filter((msg) => msg.type === type),
      map((msg) => msg.payload as T)
    );
  }

  send(type: string, payload: unknown) {
    this.socket$?.next({ type, payload });
  }

  disconnect() {
    this.destroy$.next();
    this.socket$?.complete();
    this.socket$ = null;
  }
}

组合流

import {
  combineLatest,
  forkJoin,
  merge,
  concat,
  race,
  zip,
  switchMap,
  map,
} from 'rxjs';

@Injectable({ providedIn: 'root' })
export class DashboardService {
  // 并行请求 - 等待所有完成
  loadDashboard() {
    return forkJoin({
      user: this.http.get<User>('/api/user'),
      stats: this.http.get<Stats>('/api/stats'),
      notifications: this.http.get<Notification[]>('/api/notifications'),
    });
  }

  // 来自多个流的最新值
  dashboardData$ = combineLatest({
    user: this.userService.user$,
    theme: this.settingsService.theme$,
  }).pipe(
    map(({ user, theme }) => ({
      greeting: `Hello, ${user?.name}`,
      isDark: theme === 'dark',
    }))
  );

  // 合并多个事件流
  allEvents$ = merge(
    this.userEvents$,
    this.systemEvents$,
    this.notificationEvents$
  );
}

错误处理模式

import {
  catchError,
  retry,
  retryWhen,
  delay,
  throwError,
  EMPTY,
  of,
} from 'rxjs';

// 简单重试
this.http.get('/api/data').pipe(
  retry(3),
  catchError((error) => {
    console.error('请求失败:', error);
    return of(fallbackData);
  })
);

// 带延迟的重试
this.http.get('/api/data').pipe(
  retry({
    count: 3,
    delay: (error, retryCount) => {
      console.log(`第${retryCount}次重试`);
      return of(null).pipe(delay(1000 * retryCount));
    },
  }),
  catchError((error) => throwError(() => new Error('超过最大重试次数')))
);

// 错误恢复
this.http.get('/api/primary').pipe(
  catchError(() => this.http.get('/api/fallback')),
  catchError(() => of(defaultValue))
);

订阅管理

import { Component, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';

@Component({...})
export class MyComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    this.dataService.data$
      .pipe(takeUntil(this.destroy$))
      .subscribe((data) => {
        this.data = data;
      });

    this.userService.user$
      .pipe(takeUntil(this.destroy$))
      .subscribe((user) => {
        this.user = user;
      });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

操作符参考

类别 操作符
创建 of, from, interval, timer, fromEvent
转换 map, switchMap, mergeMap, concatMap, exhaustMap
过滤 filter, take, takeUntil, first, distinctUntilChanged
组合 combineLatest, forkJoin, merge, concat, zip
错误 catchError, retry, throwError
多播 share, shareReplay, publish

最佳实践

  • 始终取消订阅或使用takeUntil
  • 使用shareReplay进行缓存
  • HTTP请求优先使用switchMap
  • 在适当层级处理错误
  • 谨慎使用Subject

目标流程

  • angular-enterprise-development
  • real-time-features
  • state-management-setup
  • data-streaming