名称: asyncredux-streams-timers 描述: 使用AsyncRedux管理流和计时器。涵盖创建动作以启动/停止流、在存储属性中存储流订阅、从流回调调度动作以及使用disposeProps()进行正确清理。
AsyncRedux 流与计时器
核心原则
在AsyncRedux中处理流和计时器的两个基本规则:
-
不要将流或计时器传递到小部件中。 不要在小部件中声明、订阅或取消订阅它们。
-
不要将流或计时器放入Redux存储状态中。 它们会产生状态变化,但本身不是状态。
相反,将流和计时器存储在存储的属性中——一个可以容纳任何对象类型的键值容器。
存储属性 API
AsyncRedux 提供了在 Store 和 ReduxAction 中管理属性的方法:
setProp(key, value)
将对象(计时器、流订阅等)存储在存储的属性中:
setProp('myTimer', Timer.periodic(Duration(seconds: 1), callback));
setProp('priceStream', priceStream.listen(onData));
prop<T>(key)
从存储中检索属性:
var timer = prop<Timer>('myTimer');
var subscription = prop<StreamSubscription>('priceStream');
disposeProp(key)
通过键处置单个属性。自动取消/关闭计时器、未来和流订阅:
disposeProp('myTimer'); // 取消计时器并从属性中移除
disposeProps([predicate])
处置多个属性。如果没有谓词,处置所有与Timer、Future和Stream相关的属性:
// 处置所有计时器、未来、流订阅
disposeProps();
// 仅处置计时器
disposeProps(({Object? key, Object? value}) => value is Timer);
// 处置具有特定键的属性
disposeProps(({Object? key, Object? value}) => key.toString().startsWith('temp_'));
计时器模式
启动计时器
创建一个动作来设置 Timer.periodic 并将其存储在属性中:
class StartPollingAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
// 将计时器存储在属性中
setProp('pollingTimer', Timer.periodic(
Duration(seconds: 5),
(timer) => dispatch(FetchDataAction()),
));
return null; // 此动作无状态变化
}
}
停止计时器
创建一个动作来处置计时器:
class StopPollingAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
disposeProp('pollingTimer');
return null;
}
}
带计数的计时器
在回调中访问计时器的计数:
class StartTimerAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
setProp('myTimer', Timer.periodic(
Duration(seconds: 1),
(timer) => dispatch(UpdateTickAction(timer.tick)),
));
return null;
}
}
class UpdateTickAction extends ReduxAction<AppState> {
final int tick;
UpdateTickAction(this.tick);
@override
AppState? reduce() => state.copy(tickCount: tick);
}
流模式
订阅流
创建一个动作来订阅流并存储订阅:
class StartListeningAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
final subscription = myDataStream.listen(
(data) => dispatch(DataReceivedAction(data)),
onError: (error) => dispatch(StreamErrorAction(error)),
);
setProp('dataSubscription', subscription);
return null;
}
}
取消订阅流
class StopListeningAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
disposeProp('dataSubscription');
return null;
}
}
处理流数据
流回调调度一个带有数据的动作,更新状态:
class DataReceivedAction extends ReduxAction<AppState> {
final MyData data;
DataReceivedAction(this.data);
@override
AppState? reduce() => state.copy(latestData: data);
}
生命周期管理
屏幕特定流/计时器
使用 StoreConnector 的 onInit 和 onDispose 回调:
class PriceScreen extends StatelessWidget {
@override
Widget build(BuildContext context) {
return StoreConnector<AppState, _Vm>(
vm: () => _Factory(),
onInit: _onInit,
onDispose: _onDispose,
builder: (context, vm) => PriceWidget(price: vm.price),
);
}
void _onInit(Store<AppState> store) {
store.dispatch(StartPriceStreamAction());
}
void _onDispose(Store<AppState> store) {
store.dispatch(StopPriceStreamAction());
}
}
应用范围流/计时器
在存储创建后启动,应用关闭时停止:
void main() {
final store = Store<AppState>(initialState: AppState.initialState());
// 启动应用范围流/计时器
store.dispatch(StartGlobalPollingAction());
runApp(StoreProvider<AppState>(
store: store,
child: MyApp(),
));
}
// 在应用的处置逻辑中
store.dispatch(StopGlobalPollingAction());
store.disposeProps(); // 清理所有剩余属性
store.shutdown();
单个切换动作
在一个动作中组合启动/停止:
class TogglePollingAction extends ReduxAction<AppState> {
final bool start;
TogglePollingAction(this.start);
@override
AppState? reduce() {
if (start) {
setProp('polling', Timer.periodic(
Duration(seconds: 5),
(_) => dispatch(RefreshDataAction()),
));
} else {
disposeProp('polling');
}
return null;
}
}
完整示例:实时价格更新
// 状态
class AppState {
final double price;
final bool isStreaming;
AppState({required this.price, required this.isStreaming});
static AppState initialState() => AppState(price: 0.0, isStreaming: false);
AppState copy({double? price, bool? isStreaming}) => AppState(
price: price ?? this.price,
isStreaming: isStreaming ?? this.isStreaming,
);
}
// 启动价格流
class StartPriceStreamAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
// 如果已经在流式传输,则不要启动
if (state.isStreaming) return null;
final subscription = priceService.priceStream.listen(
(price) => dispatch(UpdatePriceAction(price)),
onError: (e) => dispatch(PriceStreamErrorAction(e)),
);
setProp('priceSubscription', subscription);
return state.copy(isStreaming: true);
}
}
// 停止价格流
class StopPriceStreamAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
if (!state.isStreaming) return null;
disposeProp('priceSubscription');
return state.copy(isStreaming: false);
}
}
// 处理价格更新
class UpdatePriceAction extends ReduxAction<AppState> {
final double price;
UpdatePriceAction(this.price);
@override
AppState? reduce() => state.copy(price: price);
}
// 处理流错误
class PriceStreamErrorAction extends ReduxAction<AppState> {
final Object error;
PriceStreamErrorAction(this.error);
@override
AppState? reduce() {
// 发生错误时停止流式传输
disposeProp('priceSubscription');
return state.copy(isStreaming: false);
}
}
测试 onInit/onDispose
使用 ConnectorTester 测试生命周期回调,无需完整小部件测试:
test('在屏幕生命周期中启动和停止轮询', () async {
var store = Store<AppState>(initialState: AppState.initialState());
var connectorTester = store.getConnectorTester(PriceScreen());
// 模拟屏幕进入视图
connectorTester.runOnInit();
var startAction = await store.waitAnyActionTypeFinishes([StartPriceStreamAction]);
expect(store.state.isStreaming, true);
// 模拟屏幕离开视图
connectorTester.runOnDispose();
var stopAction = await store.waitAnyActionTypeFinishes([StopPriceStreamAction]);
expect(store.state.isStreaming, false);
});
在存储关闭时清理
在关闭存储之前调用 disposeProps() 以清理所有剩余的计时器和流订阅:
// 清理所有Timer、Future和Stream相关属性
store.disposeProps();
// 关闭存储
store.shutdown();
disposeProps() 方法自动:
- 取消
Timer对象 - 取消
StreamSubscription对象 - 关闭
StreamController和StreamSink对象 - 忽略
Future对象(以防止未处理的错误)
常规(非可处置)属性将保留,除非您提供匹配它们的谓词。
参考文献
文档中的URL: