RxJSPatternsforAngularSkill rxjs-patterns-for-angular

这项技能帮助开发者在Angular应用中使用RxJS实现响应式编程模式,包括处理异步数据流、错误处理、信号与Observable的转换等关键概念,以及最佳实践和高级模式。

前端开发 0 次安装 0 次浏览 更新于 2/28/2026

rxjs-patterns-for-angular 描述: “在Angular中实现响应式编程的RxJS模式。当处理Observables、操作符、订阅、异步数据流和错误处理时使用此技能。涵盖常见模式,如combineLatest、switchMap、debounceTime、catchError、retry逻辑,以及使用toSignal()和toObservable()与Angular信号集成。确保使用takeUntilDestroyed()进行适当的订阅清理。” 许可证: “MIT”

RxJS模式 for Angular技能

此技能有助于在Angular应用程序中使用RxJS实现响应式模式。

核心原则

现代Angular + RxJS

  • 信号优先: 使用信号处理状态,使用RxJS处理异步操作
  • 自动清理: 使用takeUntilDestroyed()进行订阅管理
  • 互操作性: 使用toSignal()toObservable()进行信号/可观察转换
  • AsyncPipe: 当不使用信号时,优先在模板中使用AsyncPipe

关键概念

  • 用于异步数据流的Observables
  • 用于数据转换的操作符
  • 订阅管理和清理
  • 错误处理和重试逻辑

信号 + RxJS集成

toSignal() - 从Observable到信号

import { Component, inject } from '@angular/core';
import { toSignal } from '@angular/core/rxjs-interop';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'app-task-list',
  template: `
    @if (tasks(); as taskList) {
      @for (task of taskList; track task.id) {
        <div>{{ task.title }}</div>
      }
    }
  `
})
export class TaskListComponent {
  private http = inject(HttpClient);
  
  // 将Observable转换为信号
  tasks = toSignal(
    this.http.get<Task[]>('/api/tasks'),
    { initialValue: [] }
  );
}

toObservable() - 从信号到Observable

import { Component, signal } from '@angular/core';
import { toObservable } from '@angular/core/rxjs-interop';
import { switchMap } from 'rxjs/operators';

@Component({
  selector: 'app-search',
  template: `
    <input 
      nz-input 
      [ngModel]="searchQuery()" 
      (ngModelChange)="searchQuery.set($event)" 
    />
    
    @if (results(); as resultList) {
      @for (result of resultList; track result.id) {
        <div>{{ result.name }}</div>
      }
    }
  `
})
export class SearchComponent {
  searchQuery = signal('');
  
  // 将信号转换为Observable并转换
  private searchQuery$ = toObservable(this.searchQuery);
  
  results = toSignal(
    this.searchQuery$.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(query => this.searchService.search(query))
    ),
    { initialValue: [] }
  );
}

订阅管理

takeUntilDestroyed() - 自动清理

import { Component, inject, signal, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';

@Component({
  selector: 'app-timer',
  template: `<div>Time: {{ time() }}</div>`
})
export class TimerComponent {
  private destroyRef = inject(DestroyRef);
  time = signal(0);
  
  constructor() {
    // 订阅在组件销毁时自动清理
    interval(1000)
      .pipe(takeUntilDestroyed(this.destroyRef))
      .subscribe(value => this.time.set(value));
  }
}

手动清理(遗留模式 - 避免)

// ❌ 不要: 手动订阅管理(旧模式)
export class LegacyComponent implements OnDestroy {
  private subscription = new Subscription();
  
  ngOnInit() {
    this.subscription.add(
      this.dataService.getData().subscribe(data => {
        // 处理数据
      })
    );
  }
  
  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

// ✅ 做: 使用takeUntilDestroyed()
export class ModernComponent {
  private destroyRef = inject(DestroyRef);
  data = signal<any>(null);
  
  constructor() {
    this.dataService.getData()
      .pipe(takeUntilDestroyed(this.destroyRef))
      .subscribe(data => this.data.set(data));
  }
}

常用操作符

switchMap - 切换到新的Observable

// 每次查询更改时切换到新的搜索
searchResults$ = this.searchQuery$.pipe(
  debounceTime(300),
  switchMap(query => this.http.get(`/api/search?q=${query}`))
);

mergeMap - 合并多个Observables

// 并行处理所有任务
processTasks$ = this.tasks$.pipe(
  mergeMap(tasks => 
    from(tasks).pipe(
      mergeMap(task => this.processTask(task))
    )
  )
);

concatMap - 顺序处理

// 按顺序一个接一个地处理任务
processTasks$ = this.tasks$.pipe(
  concatMap(tasks =>
    from(tasks).pipe(
      concatMap(task => this.processTask(task))
    )
  )
);

debounceTime - 防抖输入

// 用户停止输入后等待300ms
search$ = this.searchInput$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => this.searchService.search(query))
);

distinctUntilChanged - 跳过重复项

// 仅在值实际更改时发出
status$ = this.statusSubject$.pipe(
  distinctUntilChanged()
);

filter - 过滤值

// 仅发出非空字符串
nonEmptySearch$ = this.searchQuery$.pipe(
  filter(query => query.trim().length > 0),
  switchMap(query => this.search(query))
);

map - 转换值

// 将任务转换为显示格式
taskDisplay$ = this.task$.pipe(
  map(task => ({
    title: task.title,
    status: task.status.toUpperCase(),
    dueDate: formatDate(task.dueDate)
  }))
);

tap - 副作用

// 不转换地记录
tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
  tap(tasks => console.log('Loaded tasks:', tasks.length)),
  tap(tasks => this.analyticsService.track('tasks_loaded'))
);

组合Observables

combineLatest - 等待全部

import { combineLatest } from 'rxjs';

// 组合多个observables
viewModel$ = combineLatest([
  this.tasks$,
  this.users$,
  this.settings$
]).pipe(
  map(([tasks, users, settings]) => ({
    tasks,
    users,
    settings
  }))
);

// 转换为信号
viewModel = toSignal(this.viewModel$);

forkJoin - 等待全部完成

import { forkJoin } from 'rxjs';

// 并行加载多个资源
loadAll$ = forkJoin({
  tasks: this.taskService.getTasks(),
  users: this.userService.getUsers(),
  projects: this.projectService.getProjects()
}).pipe(
  map(({ tasks, users, projects }) => ({
    tasks,
    users,
    projects
  }))
);

merge - 合并多个流

import { merge } from 'rxjs';

// 组合多个事件流
allEvents$ = merge(
  this.createEvent$,
  this.updateEvent$,
  this.deleteEvent$
).pipe(
  tap(event => this.handleEvent(event))
);

zip - 配对值

import { zip } from 'rxjs';

// 从两个流中配对匹配的值
paired$ = zip(
  this.stream1$,
  this.stream2$
).pipe(
  map(([value1, value2]) => ({ value1, value2 }))
);

错误处理

catchError - 处理错误

tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
  catchError(error => {
    console.error('Failed to load tasks:', error);
    this.notificationService.error('Failed to load tasks');
    return of([]); // 返回空数组作为回退
  })
);

retry - 失败时重试

tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
  retry(3), // 最多重试3次
  catchError(error => {
    console.error('Failed after 3 retries:', error);
    return of([]);
  })
);

retryWhen - 带退避策略的条件重试

import { retryWhen, delay, scan, throwError } from 'rxjs';

tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
  retryWhen(errors =>
    errors.pipe(
      scan((retryCount, error) => {
        if (retryCount >= 3) {
          throw error; // 达到最大重试次数
        }
        console.log(`Retry ${retryCount + 1}/3`);
        return retryCount + 1;
      }, 0),
      delay(1000) // 重试之间等待1秒
    )
  ),
  catchError(error => {
    console.error('Failed after retries:', error);
    return of([]);
  })
);

实时数据

interval - 定期更新

import { interval, switchMap } from 'rxjs';

// 每30秒轮询一次
liveData$ = interval(30000).pipe(
  startWith(0), // 立即发出
  switchMap(() => this.http.get('/api/live-data')),
  takeUntilDestroyed(this.destroyRef)
);

liveData = toSignal(this.liveData$);

WebSocket模式

import { webSocket } from 'rxjs/webSocket';

export class RealtimeService {
  private socket$ = webSocket('wss://api.example.com/ws');
  
  messages$ = this.socket$.pipe(
    catchError(error => {
      console.error('WebSocket error:', error);
      return EMPTY;
    }),
    retry({ delay: 5000 }) // 5秒后重新连接
  );
  
  sendMessage(msg: any): void {
    this.socket$.next(msg);
  }
}

加载状态

共享加载状态

import { shareReplay } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class TaskService {
  private http = inject(HttpClient);
  
  // 缓存并共享结果
  tasks$ = this.http.get<Task[]>('/api/tasks').pipe(
    shareReplay({ bufferSize: 1, refCount: true })
  );
}

加载指示器模式

@Component({
  selector: 'app-task-list',
  template: `
    @if (loading()) {
      <nz-spin />
    } @else if (error()) {
      <nz-alert nzType="error" [nzMessage]="error()!" />
    } @else {
      @for (task of tasks(); track task.id) {
        <div>{{ task.title }}</div>
      }
    }
  `
})
export class TaskListComponent {
  private taskService = inject(TaskService);
  private destroyRef = inject(DestroyRef);
  
  loading = signal(false);
  error = signal<string | null>(null);
  tasks = signal<Task[]>([]);
  
  constructor() {
    this.loadTasks();
  }
  
  loadTasks(): void {
    this.loading.set(true);
    this.error.set(null);
    
    this.taskService.tasks$
      .pipe(takeUntilDestroyed(this.destroyRef))
      .subscribe({
        next: (tasks) => {
          this.tasks.set(tasks);
          this.loading.set(false);
        },
        error: (err) => {
          this.error.set(err.message || 'Failed to load tasks');
          this.loading.set(false);
        }
      });
  }
}

高级模式

节流与防抖

import { throttleTime, debounceTime } from 'rxjs';

// 节流: 首次发出,然后忽略一段时间
throttled$ = this.clicks$.pipe(
  throttleTime(1000) // 每秒最多一次
);

// 防抖: 等待安静期
debounced$ = this.input$.pipe(
  debounceTime(300) // 停止输入后等待300ms
);

扫描 - 累积值

// 运行总计
total$ = this.amounts$.pipe(
  scan((acc, value) => acc + value, 0)
);

// 历史累积
history$ = this.events$.pipe(
  scan((history, event) => [...history, event], [] as Event[])
);

startWith - 初始值

// 以加载状态开始
status$ = this.dataLoad$.pipe(
  map(() => 'loaded'),
  startWith('loading')
);

pairwise - 前一个 + 当前

// 与前一个值比较
changes$ = this.value$.pipe(
  pairwise(),
  map(([prev, curr]) => ({
    previous: prev,
    current: curr,
    diff: curr - prev
  }))
);

最佳实践

✅ 做

// 使用toSignal()将Observables转换为模板中的响应式数据
data = toSignal(this.data$, { initialValue: [] });

// 使用takeUntilDestroyed()进行清理
this.data$.pipe(takeUntilDestroyed(this.destroyRef)).subscribe();

// 使用switchMap处理用户触发的请求
search$ = this.query$.pipe(switchMap(q => this.search(q)));

// 明确处理错误
data$ = this.http.get('/api/data').pipe(
  catchError(err => of(null))
);

❌ 不要

// 不要忘记取消订阅
this.data$.subscribe(); // 内存泄漏!

// 不要使用嵌套订阅
this.data$.subscribe(data => {
  this.process(data).subscribe(); // 反模式!
});

// 不要与信号一起使用异步管道
@if (data$ | async) { } // 使用信号代替

检查清单

使用RxJS时:

  • [ ] 使用toSignal()将Observables转换为信号
  • [ ] 使用takeUntilDestroyed()进行订阅清理
  • [ ] 使用catchError()处理错误
  • [ ] 防抖用户输入(300ms)
  • [ ] 使用switchMap进行可取消的请求
  • [ ] 使用shareReplay()共享昂贵的Observables
  • [ ] 使用startWith()提供初始值
  • [ ] 过滤掉空/ null值
  • [ ] 测试异步操作
  • [ ] 记录复杂的操作符链

参考资料