名称: android-kotlin-coroutines 描述: 在Android应用中实现异步操作时使用Kotlin协程、Flow、StateFlow或管理并发。 允许工具:
- 读取
- 写入
- 编辑
- Bash
- Grep
- Glob
Android - Kotlin协程
在Android中使用Kotlin协程和Flow进行异步编程模式。
关键概念
协程基础
// 启动协程
class UserViewModel : ViewModel() {
fun loadUser(id: String) {
// viewModelScope在ViewModel清除时自动取消
viewModelScope.launch {
try {
val user = userRepository.getUser(id)
_uiState.value = UiState.Success(user)
} catch (e: Exception) {
_uiState.value = UiState.Error(e.message)
}
}
}
// 用于返回值的操作
fun fetchUserAsync(id: String): Deferred<User> {
return viewModelScope.async {
userRepository.getUser(id)
}
}
}
// 挂起函数
suspend fun fetchUserFromNetwork(id: String): User {
return withContext(Dispatchers.IO) {
api.getUser(id)
}
}
调度器
// Main - UI操作
withContext(Dispatchers.Main) {
textView.text = "已更新"
}
// IO - 网络、数据库、文件操作
withContext(Dispatchers.IO) {
val data = api.fetchData()
database.save(data)
}
// Default - CPU密集型工作
withContext(Dispatchers.Default) {
val result = expensiveComputation(data)
}
// 自定义调度器用于限制并行性
val limitedDispatcher = Dispatchers.IO.limitedParallelism(4)
Flow基础
// 创建Flow
fun getUsers(): Flow<List<User>> = flow {
while (true) {
val users = api.getUsers()
emit(users)
delay(30_000) // 每30秒轮询一次
}
}
// 来自Room的Flow
@Dao
interface UserDao {
@Query("SELECT * FROM users")
fun getAllUsers(): Flow<List<UserEntity>>
}
// 收集Flow
viewModelScope.launch {
userRepository.getUsers()
.catch { e -> _uiState.value = UiState.Error(e) }
.collect { users ->
_uiState.value = UiState.Success(users)
}
}
StateFlow和SharedFlow
class SearchViewModel : ViewModel() {
// StateFlow - 始终有当前值
private val _searchQuery = MutableStateFlow("")
val searchQuery: StateFlow<String> = _searchQuery.asStateFlow()
// SharedFlow - 用于没有初始值的事件
private val _events = MutableSharedFlow<UiEvent>()
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
// 从Flow派生的状态
val searchResults: StateFlow<List<Item>> = _searchQuery
.debounce(300)
.filter { it.length >= 2 }
.flatMapLatest { query ->
searchRepository.search(query)
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
fun updateQuery(query: String) {
_searchQuery.value = query
}
fun sendEvent(event: UiEvent) {
viewModelScope.launch {
_events.emit(event)
}
}
}
最佳实践
结构化并发
// 好: 使用coroutineScope进行并行操作
suspend fun loadDashboard(): Dashboard = coroutineScope {
val userDeferred = async { userRepository.getUser() }
val ordersDeferred = async { orderRepository.getOrders() }
val notificationsDeferred = async { notificationRepository.getNotifications() }
// 所有完成或全部失败
Dashboard(
user = userDeferred.await(),
orders = ordersDeferred.await(),
notifications = notificationsDeferred.await()
)
}
// 带超时
suspend fun loadWithTimeout(): Data {
return withTimeout(5000) {
api.fetchData()
}
}
// 或超时返回空结果
suspend fun loadWithTimeoutOrNull(): Data? {
return withTimeoutOrNull(5000) {
api.fetchData()
}
}
异常处理
// 使用runCatching
suspend fun safeApiCall(): Result<User> = runCatching {
api.getUser()
}
// 在ViewModel中处理
fun loadUser() {
viewModelScope.launch {
safeApiCall()
.onSuccess { user ->
_uiState.value = UiState.Success(user)
}
.onFailure { error ->
_uiState.value = UiState.Error(error.message)
}
}
}
// SupervisorJob用于独立子任务失败
class MyViewModel : ViewModel() {
private val supervisorJob = SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Main + supervisorJob)
fun loadMultiple() {
scope.launch {
// 此失败不会取消其他子任务
userRepository.getUser()
}
scope.launch {
// 即使上面失败,此任务继续
orderRepository.getOrders()
}
}
}
Flow操作符
// 转换操作符
userRepository.getUsers()
.map { users -> users.filter { it.isActive } }
.distinctUntilChanged()
.collect { activeUsers -> updateUI(activeUsers) }
// 组合Flow
val combined: Flow<Pair<User, Settings>> = combine(
userRepository.getUser(),
settingsRepository.getSettings()
) { user, settings ->
Pair(user, settings)
}
// FlatMapLatest用于搜索
searchQuery
.debounce(300)
.flatMapLatest { query ->
if (query.isEmpty()) flowOf(emptyList())
else searchRepository.search(query)
}
.collect { results -> updateResults(results) }
// 带指数退避重试
api.fetchData()
.retry(3) { cause ->
if (cause is IOException) {
delay(1000 * (2.0.pow(retryCount)).toLong())
true
} else false
}
生命周期感知收集
// 在Compose中 - collectAsStateWithLifecycle
@Composable
fun UserScreen(viewModel: UserViewModel = hiltViewModel()) {
val uiState by viewModel.uiState.collectAsStateWithLifecycle()
UserContent(uiState)
}
// 在Activity/Fragment中 - repeatOnLifecycle
class UserFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
updateUI(state)
}
}
}
}
}
// 多个Flow
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
viewModel.users.collect { updateUserList(it) }
}
launch {
viewModel.events.collect { handleEvent(it) }
}
}
}
常见模式
使用Flow的仓库模式
class UserRepository(
private val api: UserApi,
private val dao: UserDao,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
fun getUser(id: String): Flow<User> = flow {
// 先发射缓存数据
dao.getUser(id)?.let { emit(it.toDomain()) }
// 从网络获取
val networkUser = api.getUser(id)
dao.insertUser(networkUser.toEntity())
emit(networkUser.toDomain())
}
.flowOn(dispatcher)
.catch { e ->
// 记录错误,如果缓存可用则发射
dao.getUser(id)?.let { emit(it.toDomain()) }
?: throw e
}
suspend fun refreshUsers() {
withContext(dispatcher) {
val users = api.getUsers()
dao.deleteAll()
dao.insertAll(users.map { it.toEntity() })
}
}
}
取消处理
suspend fun downloadFile(url: String): ByteArray {
return withContext(Dispatchers.IO) {
val connection = URL(url).openConnection()
connection.inputStream.use { input ->
val buffer = ByteArrayOutputStream()
val data = ByteArray(4096)
while (true) {
// 检查取消
ensureActive()
val count = input.read(data)
if (count == -1) break
buffer.write(data, 0, count)
}
buffer.toByteArray()
}
}
}
// 可取消Flow
fun pollData(): Flow<Data> = flow {
while (currentCoroutineContext().isActive) {
emit(api.fetchData())
delay(5000)
}
}
防抖和节流
// 防抖 - 等待发射暂停
@Composable
fun SearchField(onSearch: (String) -> Unit) {
var query by remember { mutableStateOf("") }
LaunchedEffect(query) {
delay(300) // 防抖
if (query.isNotEmpty()) {
onSearch(query)
}
}
TextField(value = query, onValueChange = { query = it })
}
// 在ViewModel中
private val _searchQuery = MutableStateFlow("")
val searchResults = _searchQuery
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { query ->
searchRepository.search(query)
}
.stateIn(viewModelScope, SharingStarted.Lazily, emptyList())
反模式
GlobalScope使用
坏:
GlobalScope.launch { // 永不取消,内存泄漏
fetchData()
}
好:
viewModelScope.launch { // 正确作用域
fetchData()
}
在主线程上阻塞调用
坏:
fun loadData() {
runBlocking { // 阻塞主线程!
api.fetchData()
}
}
好:
fun loadData() {
viewModelScope.launch {
withContext(Dispatchers.IO) {
api.fetchData()
}
}
}
无生命周期Flow收集
坏:
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
lifecycleScope.launch {
viewModel.uiState.collect { // 即使在后台也收集
updateUI(it)
}
}
}
好:
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { updateUI(it) }
}
}
}
每次调用创建新Flow
坏:
// 每次创建新Flow
fun getUsers(): Flow<List<User>> = userDao.getAllUsers()
// 多次调用,多个数据库订阅
好:
// 共享Flow,单个订阅
val users: StateFlow<List<User>> = userDao.getAllUsers()
.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())
相关技能
- android-jetpack-compose: 与协程集成的UI
- android-architecture: 使用协程的架构模式