name: Lua 协程 user-invocable: false description: 当使用 Lua 协程进行协作式多任务处理时应用,包括协程创建、挂起和恢复、值传递、生成器、迭代器、异步模式、状态机以及生产者-消费者实现。 allowed-tools: []
Lua 协程
简介
Lua 中的协程提供协作式多任务处理,使函数能够暂停和恢复执行。与线程不同,协程不并行运行,而是显式地让出控制权,这使得它们更易于理解,同时在不增加回调复杂性的情况下实现强大的异步模式。
协程在 Lua 中是第一类值,由函数创建并通过协程库管理。它们维护自己的栈、局部变量和指令指针,允许在任何点暂停并在稍后恢复。这使得生成器、迭代器和状态机的优雅实现成为可能。
本技能涵盖协程基础、带值的挂起和恢复、生成器和迭代器、生产者-消费者模式、异步 I/O 模拟、状态机、错误处理和实用协程模式。
协程基础
协程使函数能够暂停和恢复执行,提供协作式多任务处理而无需线程复杂性。
-- 创建协程
local function simple_task()
print("任务开始")
coroutine.yield()
print("任务恢复")
coroutine.yield()
print("任务完成")
end
local co = coroutine.create(simple_task)
-- 检查协程状态
print(coroutine.status(co)) -- "挂起"
-- 恢复协程
coroutine.resume(co) -- 打印 "任务开始"
print(coroutine.status(co)) -- "挂起"
coroutine.resume(co) -- 打印 "任务恢复"
coroutine.resume(co) -- 打印 "任务完成"
print(coroutine.status(co)) -- "死亡"
-- 带参数的协程
local function greet(name)
print("你好, " .. name)
local response = coroutine.yield("你多大了?")
print(name .. " 是 " .. response .. " 岁")
end
local co2 = coroutine.create(greet)
local success, question = coroutine.resume(co2, "Alice")
print(question) -- "你多大了?"
coroutine.resume(co2, 30) -- "Alice 是 30 岁"
-- 协程返回值
local function counter()
for i = 1, 5 do
coroutine.yield(i)
end
return "完成"
end
local co3 = coroutine.create(counter)
repeat
local success, value = coroutine.resume(co3)
print(value)
until coroutine.status(co3) == "死亡"
-- 协程包装(更简单的接口)
local function wrapped_task()
for i = 1, 3 do
coroutine.yield(i * 10)
end
end
local f = coroutine.wrap(wrapped_task)
print(f()) -- 10
print(f()) -- 20
print(f()) -- 30
-- 运行协程
local function self_aware()
if coroutine.running() then
print("在协程中运行")
else
print("在主线程中运行")
end
end
self_aware() -- "在主线程中运行"
coroutine.resume(coroutine.create(self_aware)) -- "在协程中运行"
-- 从嵌套调用中挂起
local function inner()
print("内部开始")
coroutine.yield("来自内部")
print("内部结束")
end
local function outer()
print("外部开始")
inner()
print("外部结束")
end
local co4 = coroutine.create(outer)
coroutine.resume(co4) -- 打印 "外部开始" 和 "内部开始"
coroutine.resume(co4) -- 打印 "内部结束" 和 "外部结束"
-- 双向通信
local function echo()
while true do
local value = coroutine.yield()
if value == nil then break end
print("回声: " .. value)
end
end
local co5 = coroutine.create(echo)
coroutine.resume(co5)
coroutine.resume(co5, "你好")
coroutine.resume(co5, "世界")
coroutine.resume(co5) -- nil 终止
-- 协程中的错误处理
local function faulty()
print("错误前")
error("出错了")
print("错误后") -- 从不执行
end
local co6 = coroutine.create(faulty)
local success, err = coroutine.resume(co6)
if not success then
print("捕获错误: " .. err)
end
协程实现协作式多任务处理,函数显式让出控制权而非被抢占。
生成器和迭代器
协程优雅地实现生成器和自定义迭代器,用于惰性求值和无限序列。
-- 简单生成器
local function range(from, to, step)
step = step or 1
return coroutine.wrap(function()
for i = from, to, step do
coroutine.yield(i)
end
end)
end
for n in range(1, 10, 2) do
print(n) -- 1, 3, 5, 7, 9
end
-- 无限生成器
local function naturals()
return coroutine.wrap(function()
local n = 1
while true do
coroutine.yield(n)
n = n + 1
end
end)
end
local gen = naturals()
print(gen()) -- 1
print(gen()) -- 2
print(gen()) -- 3
-- 斐波那契生成器
local function fibonacci()
return coroutine.wrap(function()
local a, b = 0, 1
while true do
coroutine.yield(a)
a, b = b, a + b
end
end)
end
local fib = fibonacci()
for i = 1, 10 do
print(fib())
end
-- 过滤生成器
local function filter(gen, predicate)
return coroutine.wrap(function()
for value in gen do
if predicate(value) then
coroutine.yield(value)
end
end
end)
end
local evens = filter(range(1, 20), function(n) return n % 2 == 0 end)
for n in evens do
print(n) -- 2, 4, 6, 8, 10, 12, 14, 16, 18, 20
end
-- 映射生成器
local function map(gen, transform)
return coroutine.wrap(function()
for value in gen do
coroutine.yield(transform(value))
end
end)
end
local squared = map(range(1, 5), function(n) return n * n end)
for n in squared do
print(n) -- 1, 4, 9, 16, 25
end
-- 取生成器(限制结果)
local function take(gen, n)
return coroutine.wrap(function()
local count = 0
for value in gen do
if count >= n then break end
coroutine.yield(value)
count = count + 1
end
end)
end
local first5 = take(naturals(), 5)
for n in first5 do
print(n) -- 1, 2, 3, 4, 5
end
-- 链式生成器
local function chain(...)
local generators = {...}
return coroutine.wrap(function()
for _, gen in ipairs(generators) do
for value in gen do
coroutine.yield(value)
end
end
end)
end
local combined = chain(range(1, 3), range(10, 12))
for n in combined do
print(n) -- 1, 2, 3, 10, 11, 12
end
-- 压缩生成器
local function zip(gen1, gen2)
return coroutine.wrap(function()
while true do
local v1 = gen1()
local v2 = gen2()
if v1 == nil or v2 == nil then break end
coroutine.yield(v1, v2)
end
end)
end
local letters = coroutine.wrap(function()
for c in string.gmatch("abc", ".") do
coroutine.yield(c)
end
end)
local zipped = zip(range(1, 3), letters)
for num, letter in zipped do
print(num, letter) -- 1 a, 2 b, 3 c
end
-- 排列生成器
local function permute(array)
return coroutine.wrap(function()
local function perm(arr, n)
n = n or #arr
if n == 1 then
coroutine.yield(arr)
else
for i = 1, n do
arr[n], arr[i] = arr[i], arr[n]
perm(arr, n - 1)
arr[n], arr[i] = arr[i], arr[n]
end
end
end
local copy = {}
for i, v in ipairs(array) do
copy[i] = v
end
perm(copy)
end)
end
for perm in permute({1, 2, 3}) do
print(table.concat(perm, ", "))
end
-- 文件行迭代器
local function lines(filename)
return coroutine.wrap(function()
local file = io.open(filename, "r")
if not file then return end
for line in file:lines() do
coroutine.yield(line)
end
file:close()
end)
end
生成器实现惰性求值和无限序列,语法清晰易读。
生产者-消费者模式
协程优雅地实现生产者-消费者模式,无需显式队列或回调。
-- 基本生产者-消费者
local function producer()
return coroutine.create(function()
for i = 1, 10 do
print("生产 " .. i)
coroutine.yield(i)
end
end)
end
local function consumer(prod)
while coroutine.status(prod) ~= "死亡" do
local success, value = coroutine.resume(prod)
if success and value then
print("消费 " .. value)
end
end
end
local prod = producer()
consumer(prod)
-- 过滤生产者-消费者
local function filtered_producer(filter_fn)
return coroutine.create(function()
for i = 1, 20 do
if filter_fn(i) then
coroutine.yield(i)
end
end
end)
end
local even_prod = filtered_producer(function(n) return n % 2 == 0 end)
consumer(even_prod)
-- 多消费者
local function multi_consumer(prod, num_consumers)
local consumers = {}
for i = 1, num_consumers do
consumers[i] = coroutine.create(function()
while true do
local success, value = coroutine.resume(prod)
if not success or value == nil then break end
print(string.format("消费者 %d 获得 %d", i, value))
coroutine.yield()
end
end)
end
-- 轮询调度
local active = true
while active do
active = false
for _, consumer in ipairs(consumers) do
if coroutine.status(consumer) ~= "死亡" then
coroutine.resume(consumer)
active = true
end
end
end
end
-- 管道模式
local function pipeline(...)
local stages = {...}
return function(input)
local current = input
for _, stage in ipairs(stages) do
local co = coroutine.create(stage)
local results = {}
for value in current do
local success, result = coroutine.resume(co, value)
if success and result then
table.insert(results, result)
end
end
-- 将结果转换为生成器
current = coroutine.wrap(function()
for _, v in ipairs(results) do
coroutine.yield(v)
end
end)
end
return current
end
end
-- 数据处理管道
local function double(n)
coroutine.yield(n * 2)
end
local function add_ten(n)
coroutine.yield(n + 10)
end
local process = pipeline(double, add_ten)
local result = process(range(1, 5))
for n in result do
print(n) -- 12, 14, 16, 18, 20
end
-- 带优先级的任务调度器
local Scheduler = {}
function Scheduler.new()
return {
tasks = {},
current = 1
}
end
function Scheduler.add(scheduler, priority, task_fn)
table.insert(scheduler.tasks, {
priority = priority,
coroutine = coroutine.create(task_fn)
})
table.sort(scheduler.tasks, function(a, b)
return a.priority > b.priority
end)
end
function Scheduler.run(scheduler)
while #scheduler.tasks > 0 do
local task = scheduler.tasks[1]
local success, result = coroutine.resume(task.coroutine)
if coroutine.status(task.coroutine) == "死亡" then
table.remove(scheduler.tasks, 1)
else
-- 移到末尾进行轮询
table.remove(scheduler.tasks, 1)
table.insert(scheduler.tasks, task)
end
end
end
-- 使用
local sched = Scheduler.new()
Scheduler.add(sched, 1, function()
for i = 1, 3 do
print("低优先级任务 " .. i)
coroutine.yield()
end
end)
Scheduler.add(sched, 10, function()
for i = 1, 3 do
print("高优先级任务 " .. i)
coroutine.yield()
end
end)
Scheduler.run(sched)
生产者-消费者模式使用协程消除了回调复杂性,提供清晰的数据流。
异步模式
协程实现异步 I/O 模式,无需回调,提供顺序式代码外观用于异步操作。
-- 异步计时器模拟
local Async = {}
function Async.sleep(seconds)
local wake_time = os.time() + seconds
coroutine.yield(wake_time)
end
function Async.run(tasks)
local waiting = {}
-- 初始化任务
for _, task_fn in ipairs(tasks) do
local co = coroutine.create(task_fn)
table.insert(waiting, {coroutine = co, wake_time = 0})
end
-- 事件循环
while #waiting > 0 do
local current_time = os.time()
local still_waiting = {}
for _, task in ipairs(waiting) do
if current_time >= task.wake_time then
local success, wake_time = coroutine.resume(task.coroutine)
if coroutine.status(task.coroutine) ~= "死亡" then
table.insert(still_waiting, {
coroutine = task.coroutine,
wake_time = wake_time or 0
})
end
else
table.insert(still_waiting, task)
end
end
waiting = still_waiting
if #waiting > 0 then
-- 短暂休眠以避免忙等待
os.execute("sleep 0.1")
end
end
end
-- 使用
Async.run({
function()
print("任务 1 开始")
Async.sleep(1)
print("任务 1 中间")
Async.sleep(1)
print("任务 1 结束")
end,
function()
print("任务 2 开始")
Async.sleep(2)
print("任务 2 结束")
end
})
-- HTTP 请求模拟
local function http_get(url)
-- 模拟异步 HTTP 请求
coroutine.yield("waiting_for_" .. url)
return "来自 " .. url .. " 的响应"
end
local function fetch_multiple()
local urls = {
"http://api.example.com/users",
"http://api.example.com/posts",
"http://api.example.com/comments"
}
local results = {}
for _, url in ipairs(urls) do
local response = http_get(url)
table.insert(results, response)
end
return results
end
-- 类 Promise 模式
local Promise = {}
Promise.__index = Promise
function Promise.new(executor)
local self = setmetatable({
state = "pending",
value = nil,
callbacks = {}
}, Promise)
local function resolve(value)
if self.state == "pending" then
self.state = "fulfilled"
self.value = value
for _, callback in ipairs(self.callbacks) do
callback(value)
end
end
end
coroutine.resume(coroutine.create(function()
executor(resolve)
end))
return self
end
function Promise:andThen(callback)
if self.state == "fulfilled" then
callback(self.value)
else
table.insert(self.callbacks, callback)
end
return self
end
-- 使用
local p = Promise.new(function(resolve)
-- 模拟异步工作
coroutine.yield()
resolve(42)
end)
p:andThen(function(value)
print("已解决: " .. value)
end)
-- 异步/等待模式
local function async(fn)
return function(...)
local args = {...}
return coroutine.create(function()
fn(table.unpack(args))
end)
end
end
local function await(co)
local success, result = coroutine.resume(co)
return result
end
local fetch_user = async(function(id)
print("获取用户 " .. id)
coroutine.yield()
return {id = id, name = "用户 " .. id}
end)
local main = async(function()
local user = await(fetch_user(1))
print("获取用户: " .. user.name)
end)
coroutine.resume(main())
异步模式使用协程提供顺序式代码风格用于异步操作,无需回调嵌套。
状态机
协程自然地实现状态机,具有清晰的状态转换和局部状态保存。
-- 连接状态机
local function connection_state_machine()
local state = "断开连接"
return coroutine.wrap(function()
while true do
local event = coroutine.yield(state)
if state == "断开连接" then
if event == "connect" then
print("连接中...")
state = "连接中"
end
elseif state == "连接中" then
if event == "connected" then
print("已连接!")
state = "已连接"
elseif event == "error" then
print("连接失败")
state = "断开连接"
end
elseif state == "已连接" then
if event == "disconnect" then
print("断开中...")
state = "断开中"
elseif event == "send" then
print("发送数据...")
end
elseif state == "断开中" then
if event == "disconnected" then
print("已断开")
state = "断开连接"
end
end
end
end)
end
local conn = connection_state_machine()
print(conn("connect")) -- "连接中"
print(conn("connected")) -- "已连接"
print(conn("send")) -- "已连接"
print(conn("disconnect")) -- "断开中"
print(conn("disconnected")) -- "断开连接"
-- 解析器状态机
local function parse_json_string()
return coroutine.wrap(function()
local chars = {}
local escaped = false
while true do
local char = coroutine.yield()
if char == '"' and not escaped then
break
elseif char == '\\' and not escaped then
escaped = true
else
table.insert(chars, char)
escaped = false
end
end
return table.concat(chars)
end)
end
-- 游戏 AI 状态机
local function enemy_ai()
local health = 100
local target = nil
return coroutine.wrap(function()
local state = "空闲"
while health > 0 do
local input = coroutine.yield(state)
if state == "空闲" then
if input.event == "player_spotted" then
target = input.player
state = "追逐"
end
elseif state == "追逐" then
if input.event == "player_in_range" then
state = "攻击"
elseif input.event == "player_lost" then
target = nil
state = "空闲"
end
elseif state == "攻击" then
if input.event == "attack" then
print("敌人攻击!")
elseif input.event == "player_out_of_range" then
state = "追逐"
elseif input.event == "damaged" then
health = health - input.damage
if health < 30 then
state = "逃跑"
end
end
elseif state == "逃跑" then
if input.event == "safe_distance" then
state = "空闲"
end
end
end
return "死亡"
end)
end
-- 交通灯状态机
local function traffic_light()
return coroutine.wrap(function()
while true do
coroutine.yield("绿灯")
coroutine.yield("黄灯")
coroutine.yield("红灯")
end
end)
end
local light = traffic_light()
for i = 1, 6 do
print(light()) -- 绿灯, 黄灯, 红灯, 绿灯, 黄灯, 红灯
end
-- 对话系统
local function dialog_tree(tree)
return coroutine.wrap(function()
local current = tree.start
while current then
local node = tree.nodes[current]
coroutine.yield(node.text, node.choices)
local choice = coroutine.yield()
if node.choices and node.choices[choice] then
current = node.choices[choice].next
else
current = nil
end
end
end)
end
local dialog = dialog_tree({
start = "greeting",
nodes = {
greeting = {
text = "你好,旅行者!",
choices = {
{text = "你好", next = "ask_quest"},
{text = "再见", next = nil}
}
},
ask_quest = {
text = "需要任务吗?",
choices = {
{text = "是", next = "give_quest"},
{text = "不", next = nil}
}
},
give_quest = {
text = "找到丢失的剑!",
choices = {}
}
}
})
状态机使用协程自然地维护状态,无需复杂的状态跟踪结构。
最佳实践
-
使用 coroutine.wrap 用于迭代器,因为它提供更简单的接口而无需状态检查
-
检查 coroutine.resume 返回值 以处理错误和检测完成
-
避免在 C 边界间挂起,因为标准 Lua 不支持
-
通过 yield 和 resume 传递数据,而不是使用全局或上值变量
-
使用 coroutine.status 在恢复前检查协程是否死亡
-
用 coroutine.wrap 创建生成器 以获得清晰的迭代语法
-
实现适当的清理 在协程中使用 pcall 进行错误处理
-
避免嵌套 coroutine.resume 调用,因为它们使控制流复杂化
-
使用 coroutine.running 检查执行上下文并避免无效挂起
-
清晰记录挂起点 以帮助读者理解暂停点
常见陷阱
-
从主线程挂起 会导致错误,因为主线程不是协程
-
不检查 resume 成功 会错过协程内部抛出的错误
-
在循环中创建新协程 而不清理会导致内存泄漏
-
在 C 调用边界间挂起 在标准 Lua 中失败(在 LuaJIT 中有效)
-
假设协程是线程 会导致不存在的竞态条件问题
-
不处理协程完成 会导致恢复死亡协程时出错
-
过度使用协程进行简单迭代 增加复杂性而无益处
-
混合使用 coroutine.create 和 wrap 接口 导致混淆
-
忘记在调度器中恢复协程 使任务永远挂起
-
传递错误数量的参数给 resume 导致意外行为
何时使用此技能
在有益于显式控制流的协作式多任务处理中应用协程。
使用生成器和迭代器实现惰性求值或无限序列。
利用协程进行异步 I/O 模式,以避免回调复杂性并保持顺序代码风格。
用协程实现状态机用于游戏 AI、解析器或协议处理器。
在通过转换管道处理数据时使用生产者-消费者模式。
应用基于协程的调度器来管理多个并发操作的协作。