名称: Kotlin协程 用户可调用: false 描述: 用于结构化并发编程的Kotlin协程,包括挂起函数、协程构建器、Flow流、通道,以及构建高效异步代码的模式,支持取消和异常处理。 允许工具: []
Kotlin协程
简介
Kotlin协程提供了一个强大的异步编程框架,它轻量级、表达性强,并基于结构化并发原则构建。 协程使得编写看起来和行为像顺序代码的异步代码成为可能,消除了回调地狱并提高了可读性。
与线程不同,协程极其轻量级——可以在有限资源上运行数百万个。协程框架包括用于非阻塞操作的挂起函数、用于启动工作的构建器、用于响应式流的Flow流,以及全面的取消和异常处理机制。
本技能涵盖协程基础、构建器、上下文、Flow流、通道,以及Android开发和服务器端Kotlin的生产模式。
挂起函数
挂起函数是协程的构建块,支持无需阻塞线程即可暂停和恢复的非阻塞操作。
// 基本挂起函数
suspend fun fetchUser(id: Int): User {
delay(1000) // 挂起而不阻塞
return User(id, "Alice")
}
data class User(val id: Int, val name: String)
// 调用挂起函数
suspend fun loadUserProfile(id: Int): UserProfile {
val user = fetchUser(id)
val posts = fetchPosts(user.id)
return UserProfile(user, posts)
}
suspend fun fetchPosts(userId: Int): List<Post> {
delay(500)
return emptyList()
}
data class Post(val id: Int, val title: String)
data class UserProfile(val user: User, val posts: List<Post>)
// 顺序与并发执行
suspend fun loadDataSequential(): Pair<User, List<Post>> {
val user = fetchUser(1)
val posts = fetchPosts(1)
return user to posts
}
suspend fun loadDataConcurrent(): Pair<User, List<Post>> = coroutineScope {
val userDeferred = async { fetchUser(1) }
val postsDeferred = async { fetchPosts(1) }
userDeferred.await() to postsDeferred.await()
}
// 带回调的挂起函数
suspend fun fetchData(url: String): String = suspendCoroutine { continuation ->
fetchDataWithCallback(url) { result, error ->
if (error != null) {
continuation.resumeWithException(error)
} else {
continuation.resume(result)
}
}
}
fun fetchDataWithCallback(
url: String,
callback: (String, Exception?) -> Unit
) {
// 模拟基于回调的API
callback("data", null)
}
// 可取消的挂起函数
suspend fun downloadFile(url: String): ByteArray =
suspendCancellableCoroutine { continuation ->
val request = startDownload(url) { data, error ->
if (error != null) {
continuation.resumeWithException(error)
} else {
continuation.resume(data)
}
}
continuation.invokeOnCancellation {
request.cancel()
}
}
class DownloadRequest {
fun cancel() {}
}
fun startDownload(url: String, callback: (ByteArray, Exception?) -> Unit):
DownloadRequest {
return DownloadRequest()
}
// 使用withContext切换调度器
suspend fun saveToDatabase(user: User) {
withContext(Dispatchers.IO) {
// 在IO调度器上进行数据库操作
println("保存用户: ${user.name}")
}
}
suspend fun updateUI(user: User) {
withContext(Dispatchers.Main) {
// 在主线程更新UI
println("更新UI为: ${user.name}")
}
}
挂起函数用suspend修饰符标记,只能从其他挂起函数或协程调用,确保正确的上下文。
协程构建器
协程构建器以不同的生命周期和结果处理语义启动协程,支持结构化和非结构化并发。
// launch: 发射后不管的协程
fun launchExample() {
GlobalScope.launch {
val user = fetchUser(1)
println("用户: ${user.name}")
}
}
// async: 带结果的协程
fun asyncExample() {
GlobalScope.launch {
val deferredUser = async { fetchUser(1) }
val deferredPosts = async { fetchPosts(1) }
val user = deferredUser.await()
val posts = deferredPosts.await()
println("为${user.name}加载了${posts.size}篇帖子")
}
}
// runBlocking: 桥接阻塞和挂起世界
fun runBlockingExample() = runBlocking {
val user = fetchUser(1)
println("用户已加载: ${user.name}")
}
// coroutineScope: 结构化并发
suspend fun loadMultipleUsers(ids: List<Int>): List<User> = coroutineScope {
ids.map { id ->
async { fetchUser(id) }
}.awaitAll()
}
// supervisorScope: 独立的子任务失败
suspend fun loadDataWithSupervisor(): List<User> = supervisorScope {
val user1 = async { fetchUser(1) }
val user2 = async {
delay(100)
throw Exception("失败")
}
// 即使user2失败,user1也成功
listOfNotNull(
try { user1.await() } catch (e: Exception) { null }
)
}
// withTimeout: 限时协程
suspend fun fetchWithTimeout(id: Int): User? {
return try {
withTimeout(2000) {
fetchUser(id)
}
} catch (e: TimeoutCancellationException) {
null
}
}
// 带生命周期的结构化并发
class ViewModel : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
fun loadData() {
launch {
val user = fetchUser(1)
// 更新UI
}
}
fun onCleared() {
job.cancel()
}
}
使用coroutineScope的结构化并发确保子协程在作用域退出前完成,防止泄漏并确保适当清理。
协程上下文和调度器
协程上下文定义执行环境,包括调度器、作业、异常处理器和用于调试的协程名称。
// 用于线程池的调度器
suspend fun dispatcherExamples() {
// Main: UI线程(Android/JavaFX)
withContext(Dispatchers.Main) {
println("在主线程: ${Thread.currentThread().name}")
}
// IO: 用于阻塞I/O操作
withContext(Dispatchers.IO) {
println("在IO线程: ${Thread.currentThread().name}")
}
// Default: CPU密集型工作
withContext(Dispatchers.Default) {
println("在默认线程: ${Thread.currentThread().name}")
}
// Unconfined: 在调用者线程启动,在挂起处恢复
withContext(Dispatchers.Unconfined) {
println("无限制: ${Thread.currentThread().name}")
}
}
// 协程上下文元素
fun contextExample() {
val scope = CoroutineScope(
Dispatchers.Main +
SupervisorJob() +
CoroutineName("MyCoroutine") +
CoroutineExceptionHandler { _, throwable ->
println("捕获: $throwable")
}
)
scope.launch {
println("上下文: $coroutineContext")
}
}
// 继承上下文
fun inheritContextExample() {
CoroutineScope(Dispatchers.Main).launch {
println("父级: ${Thread.currentThread().name}")
launch {
// 继承Dispatchers.Main
println("子级: ${Thread.currentThread().name}")
}
launch(Dispatchers.IO) {
// 用IO调度器覆盖
println("覆盖: ${Thread.currentThread().name}")
}
}
}
// ThreadLocal上下文元素
val threadLocalValue = ThreadLocal<String>()
suspend fun threadLocalExample() {
threadLocalValue.set("初始值")
withContext(threadLocalValue.asContextElement("新值")) {
println("在上下文中: ${threadLocalValue.get()}")
}
println("上下文后: ${threadLocalValue.get()}")
}
// 自定义上下文元素
data class UserId(val id: Int) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<UserId>
}
suspend fun customContextExample() {
withContext(UserId(42)) {
val userId = coroutineContext[UserId]
println("用户ID: ${userId?.id}")
}
}
调度器决定哪个线程池执行协程。上下文元素被子协程继承并可以覆盖。
Flow用于响应式流
Flow表示按需计算的冷流值,提供带背压和转换操作符的响应式编程能力。
// 基本Flow
fun numberFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
suspend fun collectFlow() {
numberFlow().collect { value ->
println("接收: $value")
}
}
// Flow构建器
fun flowBuilders() {
// flowOf: 发射固定值
val fixedFlow = flowOf(1, 2, 3, 4, 5)
// asFlow: 转换集合
val listFlow = listOf(1, 2, 3).asFlow()
// flow: 自定义发射逻辑
val customFlow = flow {
repeat(3) {
emit(it)
delay(100)
}
}
}
// Flow转换
suspend fun flowTransformations() {
numberFlow()
.map { it * 2 }
.filter { it > 5 }
.take(3)
.collect { println(it) }
}
// Flow组合
suspend fun combineFlows() {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf("A", "B", "C")
// zip: 配对元素
flow1.zip(flow2) { num, letter ->
"$num$letter"
}.collect { println(it) }
// combine: 每个的最新值
flow1.combine(flow2) { num, letter ->
"$num$letter"
}.collect { println(it) }
}
// Flow异常处理
suspend fun flowExceptionHandling() {
flow {
emit(1)
emit(2)
throw Exception("错误!")
}.catch { e ->
println("捕获: ${e.message}")
emit(-1)
}.collect { println(it) }
}
// StateFlow和SharedFlow
class DataRepository {
private val _users = MutableStateFlow<List<User>>(emptyList())
val users: StateFlow<List<User>> = _users
private val _events = MutableSharedFlow<Event>()
val events: SharedFlow<Event> = _events
suspend fun loadUsers() {
val loaded = fetchUsers()
_users.value = loaded
}
suspend fun emitEvent(event: Event) {
_events.emit(event)
}
private suspend fun fetchUsers(): List<User> {
delay(100)
return listOf(User(1, "Alice"))
}
}
data class Event(val type: String)
// Flow在不同调度器上
suspend fun flowWithContext() {
flow {
emit(1)
emit(2)
}
.flowOn(Dispatchers.IO)
.collect { value ->
// 在调用者上下文收集
println("值: $value")
}
}
// Channel Flow用于热流
fun channelFlowExample() = channelFlow {
launch {
repeat(3) {
send(it)
delay(100)
}
}
launch {
repeat(3) {
send(it * 10)
delay(150)
}
}
}
Flow是冷的——直到收集时才执行。StateFlow持有状态,SharedFlow广播事件,channelFlow支持并发发射。
通道用于通信
通道提供协程之间发送和接收值的通信原语,类似于BlockingQueue但是挂起的。
// 基本通道使用
suspend fun channelExample() {
val channel = Channel<Int>()
launch {
for (x in 1..5) {
channel.send(x)
}
channel.close()
}
for (y in channel) {
println("接收: $y")
}
}
// 缓冲通道
fun bufferedChannelExample() {
val channel = Channel<Int>(capacity = 4)
GlobalScope.launch {
for (x in 1..10) {
println("发送 $x")
channel.send(x)
}
channel.close()
}
GlobalScope.launch {
delay(1000)
for (y in channel) {
println("接收: $y")
delay(200)
}
}
}
// 通道类型
fun channelTypes() {
// Rendezvous: 无缓冲,发送者挂起直到接收者
val rendezvous = Channel<Int>()
// Buffered: 指定容量
val buffered = Channel<Int>(10)
// Unlimited: 无限缓冲
val unlimited = Channel<Int>(Channel.UNLIMITED)
// Conflated: 只保留最新值
val conflated = Channel<Int>(Channel.CONFLATED)
}
// Produce构建器
fun produceNumbers(): ReceiveChannel<Int> = GlobalScope.produce {
for (x in 1..5) {
send(x * x)
delay(100)
}
}
suspend fun consumeNumbers() {
val channel = produceNumbers()
channel.consumeEach { println(it) }
}
// 多消费者
suspend fun multipleConsumers() {
val channel = Channel<Int>()
repeat(3) { id ->
launch {
for (value in channel) {
println("消费者 $id 接收: $value")
}
}
}
repeat(10) {
channel.send(it)
delay(100)
}
channel.close()
}
// Select表达式用于多通道
suspend fun selectExample() {
val channel1 = produce { send("A") }
val channel2 = produce { send("B") }
select<Unit> {
channel1.onReceive { value ->
println("从channel1: $value")
}
channel2.onReceive { value ->
println("从channel2: $value")
}
}
}
fun CoroutineScope.produce(block: suspend () -> Unit): ReceiveChannel<String> {
return produce {
block()
}
}
通道支持扇出(多消费者)、扇入(多生产者)和管道模式用于并发数据处理。
取消和异常处理
协程支持协作式取消和结构化异常处理,以确保适当的资源清理和错误传播。
// 基本取消
suspend fun cancellationExample() {
val job = GlobalScope.launch {
repeat(1000) { i ->
println("工作中: $i")
delay(500)
}
}
delay(2000)
println("取消中...")
job.cancel()
job.join()
println("已取消")
}
// 检查取消
suspend fun checkCancellation() {
GlobalScope.launch {
var i = 0
while (isActive) {
println("计算: ${i++}")
}
}
}
// 不可取消工作
suspend fun nonCancellableCleanup() {
val job = GlobalScope.launch {
try {
repeat(1000) {
delay(500)
}
} finally {
withContext(NonCancellable) {
println("在不可取消上下文中清理")
delay(1000)
println("清理完成")
}
}
}
delay(1000)
job.cancelAndJoin()
}
// 超时处理
suspend fun timeoutExample() {
try {
withTimeout(1000) {
repeat(100) {
delay(100)
println("工作中...")
}
}
} catch (e: TimeoutCancellationException) {
println("超时")
}
}
// 协程中的异常处理
suspend fun exceptionHandlingExample() {
val handler = CoroutineExceptionHandler { _, exception ->
println("捕获: $exception")
}
GlobalScope.launch(handler) {
throw Exception("协程失败")
}
delay(100)
}
// 结构化异常处理
suspend fun structuredExceptions() = coroutineScope {
val job1 = launch {
delay(100)
throw Exception("作业1失败")
}
val job2 = launch {
delay(200)
println("作业2完成")
}
// 当作业1失败时,作业2被取消
}
// SupervisorScope用于独立失败
suspend fun supervisorExample() = supervisorScope {
val job1 = launch {
delay(100)
throw Exception("作业1失败")
}
val job2 = launch {
delay(200)
println("作业2完成")
}
// 即使作业1失败,作业2也继续
}
// 使用try-catch的错误处理
suspend fun errorHandling() {
coroutineScope {
launch {
try {
fetchUser(1)
} catch (e: Exception) {
println("错误: ${e.message}")
}
}
}
}
取消是协作式的——协程必须检查isActive或调用支持取消的挂起函数。异常处理尊重结构化并发边界。
最佳实践
-
使用带作用域的结构化并发以确保协程被适当管理并在不再需要时取消。
-
为工作类型选择适当的调度器:IO用于阻塞操作,Default用于CPU工作,Main用于UI更新。
-
协作式处理取消通过在循环中检查isActive并使用支持取消的挂起函数。
-
优先使用Flow而非回调用于响应式流,以获得背压、操作符和结构化生命周期管理。
-
对独立操作使用supervisorScope以防止一个失败取消不相关的协程。
-
避免使用GlobalScope除非是真正的应用范围工作以防止泄漏并维护结构化并发的好处。
-
应用withContext进行调度器切换而不是启动新协程,以保持结构并减少开销。
-
使用try-catch或CoroutineExceptionHandler显式处理异常以防止静默失败。
-
使用StateFlow处理状态和SharedFlow处理事件,以提供带适当生命周期感知的可观察流。
-
使用TestCoroutineDispatcher测试协程以控制时间并确保确定性测试执行。
常见陷阱
-
对作用域工作使用GlobalScope导致协程在相关上下文(如活动或视图模型)结束后继续存在,造成内存泄漏。
-
在协程内部阻塞使用Thread.sleep或阻塞I/O违背了目的并可能耗尽线程池。
-
未处理取消在长时间运行的循环中导致协程在取消后继续执行。
-
忘记在调用其他挂起函数的函数上添加suspend修饰符导致编译错误。
-
捕获CancellationException并不重新抛出防止了结构化并发中适当的取消传播。
-
使用delay(0)来让出不如显式调用yield()用于协作式多任务清晰。
-
不必要地创建过多协程可能降低性能;在可能时批量或限制操作。
-
不使用withContext进行调度器切换而是启动不必要的子协程,增加了复杂性。
-
假设launch后立即执行;协程可能直到调度器有容量时才启动。
-
不正确混合回调和协程没有适当桥接,导致竞态条件和泄漏。
何时使用此技能
在构建Android应用时使用Kotlin协程进行异步操作,如网络调用、数据库查询或任何不应阻塞主线程的I/O绑定工作。
在服务器端Kotlin应用中使用协程,如Ktor或Spring Boot,以高效处理并发请求而无线程每请求的开销。
在MVVM架构中使用Flow处理响应式流,替代LiveData或RxJava进行状态管理和事件传播,具有生命周期感知。
利用结构化并发协调多个异步操作,确保在导航离开屏幕或关闭连接时适当取消。
使用通道处理生产者-消费者模式、管道或任何需要在并发协程之间显式通信的场景。