AsyncRedux流与计时器管理Skill asyncredux-streams-timers

AsyncRedux 流与计时器管理技能用于在Flutter应用中高效管理异步数据流和定时任务,通过Redux状态管理框架实现,涉及动作创建、订阅存储、生命周期清理和SEO关键词如AsyncRedux、Flutter、状态管理、流、计时器。

移动开发 0 次安装 0 次浏览 更新于 3/19/2026

名称: asyncredux-streams-timers 描述: 使用AsyncRedux管理流和计时器。涵盖创建动作以启动/停止流、在存储属性中存储流订阅、从流回调调度动作以及使用disposeProps()进行正确清理。

AsyncRedux 流与计时器

核心原则

在AsyncRedux中处理流和计时器的两个基本规则:

  1. 不要将流或计时器传递到小部件中。 不要在小部件中声明、订阅或取消订阅它们。

  2. 不要将流或计时器放入Redux存储状态中。 它们会产生状态变化,但本身不是状态。

相反,将流和计时器存储在存储的属性中——一个可以容纳任何对象类型的键值容器。

存储属性 API

AsyncRedux 提供了在 StoreReduxAction 中管理属性的方法:

setProp(key, value)

将对象(计时器、流订阅等)存储在存储的属性中:

setProp('myTimer', Timer.periodic(Duration(seconds: 1), callback));
setProp('priceStream', priceStream.listen(onData));

prop<T>(key)

从存储中检索属性:

var timer = prop<Timer>('myTimer');
var subscription = prop<StreamSubscription>('priceStream');

disposeProp(key)

通过键处置单个属性。自动取消/关闭计时器、未来和流订阅:

disposeProp('myTimer'); // 取消计时器并从属性中移除

disposeProps([predicate])

处置多个属性。如果没有谓词,处置所有与Timer、Future和Stream相关的属性:

// 处置所有计时器、未来、流订阅
disposeProps();

// 仅处置计时器
disposeProps(({Object? key, Object? value}) => value is Timer);

// 处置具有特定键的属性
disposeProps(({Object? key, Object? value}) => key.toString().startsWith('temp_'));

计时器模式

启动计时器

创建一个动作来设置 Timer.periodic 并将其存储在属性中:

class StartPollingAction extends ReduxAction<AppState> {
  @override
  AppState? reduce() {
    // 将计时器存储在属性中
    setProp('pollingTimer', Timer.periodic(
      Duration(seconds: 5),
      (timer) => dispatch(FetchDataAction()),
    ));
    return null; // 此动作无状态变化
  }
}

停止计时器

创建一个动作来处置计时器:

class StopPollingAction extends ReduxAction<AppState> {
  @override
  AppState? reduce() {
    disposeProp('pollingTimer');
    return null;
  }
}

带计数的计时器

在回调中访问计时器的计数:

class StartTimerAction extends ReduxAction<AppState> {
  @override
  AppState? reduce() {
    setProp('myTimer', Timer.periodic(
      Duration(seconds: 1),
      (timer) => dispatch(UpdateTickAction(timer.tick)),
    ));
    return null;
  }
}

class UpdateTickAction extends ReduxAction<AppState> {
  final int tick;
  UpdateTickAction(this.tick);

  @override
  AppState? reduce() => state.copy(tickCount: tick);
}

流模式

订阅流

创建一个动作来订阅流并存储订阅:

class StartListeningAction extends ReduxAction<AppState> {
  @override
  AppState? reduce() {
    final subscription = myDataStream.listen(
      (data) => dispatch(DataReceivedAction(data)),
      onError: (error) => dispatch(StreamErrorAction(error)),
    );
    setProp('dataSubscription', subscription);
    return null;
  }
}

取消订阅流

class StopListeningAction extends ReduxAction<AppState> {
  @override
  AppState? reduce() {
    disposeProp('dataSubscription');
    return null;
  }
}

处理流数据

流回调调度一个带有数据的动作,更新状态:

class DataReceivedAction extends ReduxAction<AppState> {
  final MyData data;
  DataReceivedAction(this.data);

  @override
  AppState? reduce() => state.copy(latestData: data);
}

生命周期管理

屏幕特定流/计时器

使用 StoreConnectoronInitonDispose 回调:

class PriceScreen extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return StoreConnector<AppState, _Vm>(
      vm: () => _Factory(),
      onInit: _onInit,
      onDispose: _onDispose,
      builder: (context, vm) => PriceWidget(price: vm.price),
    );
  }

  void _onInit(Store<AppState> store) {
    store.dispatch(StartPriceStreamAction());
  }

  void _onDispose(Store<AppState> store) {
    store.dispatch(StopPriceStreamAction());
  }
}

应用范围流/计时器

在存储创建后启动,应用关闭时停止:

void main() {
  final store = Store<AppState>(initialState: AppState.initialState());

  // 启动应用范围流/计时器
  store.dispatch(StartGlobalPollingAction());

  runApp(StoreProvider<AppState>(
    store: store,
    child: MyApp(),
  ));
}

// 在应用的处置逻辑中
store.dispatch(StopGlobalPollingAction());
store.disposeProps(); // 清理所有剩余属性
store.shutdown();

单个切换动作

在一个动作中组合启动/停止:

class TogglePollingAction extends ReduxAction<AppState> {
  final bool start;
  TogglePollingAction(this.start);

  @override
  AppState? reduce() {
    if (start) {
      setProp('polling', Timer.periodic(
        Duration(seconds: 5),
        (_) => dispatch(RefreshDataAction()),
      ));
    } else {
      disposeProp('polling');
    }
    return null;
  }
}

完整示例:实时价格更新

// 状态
class AppState {
  final double price;
  final bool isStreaming;

  AppState({required this.price, required this.isStreaming});

  static AppState initialState() => AppState(price: 0.0, isStreaming: false);

  AppState copy({double? price, bool? isStreaming}) => AppState(
    price: price ?? this.price,
    isStreaming: isStreaming ?? this.isStreaming,
  );
}

// 启动价格流
class StartPriceStreamAction extends ReduxAction<AppState> {
  @override
  AppState? reduce() {
    // 如果已经在流式传输,则不要启动
    if (state.isStreaming) return null;

    final subscription = priceService.priceStream.listen(
      (price) => dispatch(UpdatePriceAction(price)),
      onError: (e) => dispatch(PriceStreamErrorAction(e)),
    );

    setProp('priceSubscription', subscription);
    return state.copy(isStreaming: true);
  }
}

// 停止价格流
class StopPriceStreamAction extends ReduxAction<AppState> {
  @override
  AppState? reduce() {
    if (!state.isStreaming) return null;

    disposeProp('priceSubscription');
    return state.copy(isStreaming: false);
  }
}

// 处理价格更新
class UpdatePriceAction extends ReduxAction<AppState> {
  final double price;
  UpdatePriceAction(this.price);

  @override
  AppState? reduce() => state.copy(price: price);
}

// 处理流错误
class PriceStreamErrorAction extends ReduxAction<AppState> {
  final Object error;
  PriceStreamErrorAction(this.error);

  @override
  AppState? reduce() {
    // 发生错误时停止流式传输
    disposeProp('priceSubscription');
    return state.copy(isStreaming: false);
  }
}

测试 onInit/onDispose

使用 ConnectorTester 测试生命周期回调,无需完整小部件测试:

test('在屏幕生命周期中启动和停止轮询', () async {
  var store = Store<AppState>(initialState: AppState.initialState());
  var connectorTester = store.getConnectorTester(PriceScreen());

  // 模拟屏幕进入视图
  connectorTester.runOnInit();
  var startAction = await store.waitAnyActionTypeFinishes([StartPriceStreamAction]);
  expect(store.state.isStreaming, true);

  // 模拟屏幕离开视图
  connectorTester.runOnDispose();
  var stopAction = await store.waitAnyActionTypeFinishes([StopPriceStreamAction]);
  expect(store.state.isStreaming, false);
});

在存储关闭时清理

在关闭存储之前调用 disposeProps() 以清理所有剩余的计时器和流订阅:

// 清理所有Timer、Future和Stream相关属性
store.disposeProps();

// 关闭存储
store.shutdown();

disposeProps() 方法自动:

  • 取消 Timer 对象
  • 取消 StreamSubscription 对象
  • 关闭 StreamControllerStreamSink 对象
  • 忽略 Future 对象(以防止未处理的错误)

常规(非可处置)属性将保留,除非您提供匹配它们的谓词。

参考文献

文档中的URL: