名称: crystal-concurrency 用户可调用: false 描述: 在Crystal中使用纤维、通道和并行执行模式实现并发编程,用于构建高性能、非阻塞应用程序。 允许工具: [Bash, Read]
Crystal并发编程
您是Claude Code,一位Crystal并发模型的专家。您专长于使用纤维、通道和Crystal的轻量级并发原语构建高性能、并发应用程序。
您的核心职责:
- 实现基于纤维的并发操作以进行非阻塞执行
- 设计基于通道的通信模式以进行纤维间协调
- 构建具有适当同步的并行处理管道
- 实现工作池和任务分配系统
- 使用互斥锁和原子操作处理并发资源访问
- 设计具有适当错误处理的容错并发系统
- 优化纤维调度和资源利用
- 实现背压和流量控制机制
- 构建实时数据处理系统
- 设计用于网络和文件系统的并发I/O操作
纤维:轻量级并发
Crystal使用纤维(也称为绿色线程或协程)进行并发。纤维由Crystal运行时协作调度,比操作系统线程轻量得多。
基本纤维生成
# 简单纤维生成
spawn do
puts "在纤维中运行"
sleep 1
puts "纤维完成"
end
# 带参数的纤维
def process_data(id : Int32, data : String)
puts "用id #{id}处理#{data}"
sleep 0.5
puts "完成#{id}"
end
spawn process_data(1, "任务A")
spawn process_data(2, "任务B")
# 等待纤维完成
sleep 1
通过通道的带返回值纤维
# 纤维不直接返回值,请使用通道
result_channel = Channel(Int32).new
spawn do
result = expensive_computation(42)
result_channel.send(result)
end
# 执行其他工作...
puts "执行其他工作"
# 等待结果
result = result_channel.receive
puts "获取结果: #{result}"
def expensive_computation(n : Int32) : Int32
sleep 1
n * 2
end
用于调试的命名纤维
# 为调试给纤维描述性名称
spawn(name: "数据处理器") do
process_large_dataset
end
spawn(name: "缓存更新器") do
update_cache_periodically
end
# 纤维名称出现在异常回溯中
spawn(name: "失败工作器") do
raise "发生错误"
end
通道:纤维间通信
通道是纤维间通信的主要机制。它们提供具有可选缓冲的线程安全消息传递。
无缓冲通道
# 无缓冲通道 - 阻塞直到发送者和接收者都准备就绪
channel = Channel(String).new
spawn do
puts "发送消息"
channel.send("你好")
puts "消息已发送"
end
spawn do
sleep 0.1 # 小延迟
puts "接收消息"
msg = channel.receive
puts "已接收: #{msg}"
end
sleep 1
缓冲通道
# 缓冲通道 - 允许无阻塞发送,直到缓冲区大小
channel = Channel(Int32).new(capacity: 3)
# 这些发送不会阻塞
channel.send(1)
channel.send(2)
channel.send(3)
# 这会阻塞直到有人接收
# channel.send(4)
# 接收值
puts channel.receive # 1
puts channel.receive # 2
puts channel.receive # 3
通道关闭和迭代
# 带通道关闭的生产者-消费者模式
channel = Channel(Int32).new
# 生产者
spawn do
5.times do |i|
channel.send(i)
sleep 0.1
end
channel.close # 表示没有更多值
end
# 消费者 - 迭代直到通道关闭
spawn do
channel.each do |value|
puts "已接收: #{value}"
end
puts "通道已关闭,消费者退出"
end
sleep 1
检查通道是否关闭
channel = Channel(String).new
spawn do
channel.send("消息1")
channel.send("消息2")
channel.close
end
sleep 0.1
# 在接收前检查
unless channel.closed?
puts channel.receive
end
# 或处理异常
begin
puts channel.receive
puts channel.receive
puts channel.receive # 将引发Channel::ClosedError
rescue Channel::ClosedError
puts "通道已关闭"
end
选择:多路复用通道
select语句允许同时等待多个通道操作,类似于Go的select语句。
多个通道的基本选择
ch1 = Channel(String).new
ch2 = Channel(Int32).new
spawn do
sleep 0.2
ch1.send("来自通道1")
end
spawn do
sleep 0.1
ch2.send(42)
end
# 等待最先就绪的通道
select
when msg = ch1.receive
puts "获取字符串: #{msg}"
when num = ch2.receive
puts "获取数字: #{num}"
end
sleep 1
带超时的选择
channel = Channel(String).new
spawn do
sleep 2 # 耗时太长
channel.send("延迟消息")
end
# 带超时等待
select
when msg = channel.receive
puts "已接收: #{msg}"
when timeout(1.second)
puts "等待消息超时"
end
带默认情况的选择(非阻塞)
channel = Channel(Int32).new
# 非阻塞接收
select
when value = channel.receive
puts "获取值: #{value}"
else
puts "无值可用,立即继续"
end
循环中的选择
results = Channel(String).new
done = Channel(Nil).new
output = [] of String
# 多个工作器发送结果
3.times do |i|
spawn do
sleep rand(0.5..1.5)
results.send("工作器 #{i} 完成")
end
end
# 收集器纤维
spawn do
3.times do
output << results.receive
end
done.send(nil)
end
# 带超时等待完成
select
when done.receive
puts "所有工作器完成"
output.each { |msg| puts msg }
when timeout(5.seconds)
puts "超时 - 并非所有工作器完成"
end
工作池
工作池在固定数量的并发工作器之间分配任务。
基本工作池
class WorkerPool(T, R)
def initialize(@size : Int32)
@tasks = Channel(T).new
@results = Channel(R).new
@workers = [] of Fiber
@size.times do |i|
@workers << spawn(name: "worker-#{i}") do
worker_loop
end
end
end
private def worker_loop
@tasks.each do |task|
result = process(task)
@results.send(result)
end
end
def process(task : T) : R
# 在子类中覆盖或传递块
raise "未实现"
end
def submit(task : T)
@tasks.send(task)
end
def get_result : R
@results.receive
end
def shutdown
@tasks.close
end
end
# 使用示例
class IntSquarePool < WorkerPool(Int32, Int32)
def process(task : Int32) : Int32
sleep 0.1 # 模拟工作
task * task
end
end
pool = IntSquarePool.new(size: 3)
# 提交任务
10.times { |i| pool.submit(i) }
# 收集结果
results = [] of Int32
10.times { results << pool.get_result }
pool.shutdown
puts results.sort
带错误处理的工作池
struct Task
property id : Int32
property data : String
def initialize(@id, @data)
end
end
struct Result
property task_id : Int32
property success : Bool
property value : String?
property error : String?
def initialize(@task_id, @success, @value = nil, @error = nil)
end
end
class RobustWorkerPool
def initialize(@worker_count : Int32)
@tasks = Channel(Task).new(capacity: 100)
@results = Channel(Result).new(capacity: 100)
@worker_count.times do |i|
spawn(name: "worker-#{i}") do
process_tasks
end
end
end
private def process_tasks
@tasks.each do |task|
begin
result_value = process_task(task)
@results.send(Result.new(
task_id: task.id,
success: true,
value: result_value
))
rescue ex
@results.send(Result.new(
task_id: task.id,
success: false,
error: ex.message
))
end
end
end
private def process_task(task : Task) : String
# 模拟可能失败的处理
raise "无效数据" if task.data.empty?
sleep 0.1
"已处理: #{task.data}"
end
def submit(task : Task)
@tasks.send(task)
end
def get_result : Result
@results.receive
end
def shutdown
@tasks.close
end
end
并行映射和归约
实现集合的并行处理。
并行映射
def parallel_map(collection : Array(T), workers : Int32 = 4, &block : T -> R) : Array(R) forall T, R
tasks = Channel(Tuple(Int32, T)).new
results = Channel(Tuple(Int32, R)).new
# 生成工作器
workers.times do
spawn do
tasks.each do |index, item|
result = yield item
results.send({index, result})
end
end
end
# 发送任务
spawn do
collection.each_with_index do |item, index|
tasks.send({index, item})
end
tasks.close
end
# 按顺序收集结果
result_map = {} of Int32 => R
collection.size.times do
index, result = results.receive
result_map[index] = result
end
collection.indices.map { |i| result_map[i] }
end
# 使用
numbers = (1..100).to_a
squares = parallel_map(numbers, workers: 8) do |n|
sleep 0.01 # 模拟工作
n * n
end
puts squares.first(10)
带管道的并行归约
def parallel_reduce(collection : Array(T), workers : Int32 = 4, initial : R, &block : R, T -> R) : R forall T, R
chunk_size = (collection.size / workers.to_f).ceil.to_i
chunks = collection.each_slice(chunk_size).to_a
results = Channel(R).new
chunks.each do |chunk|
spawn do
chunk_result = chunk.reduce(initial) { |acc, item| yield acc, item }
results.send(chunk_result)
end
end
# 归约部分结果
final_result = initial
chunks.size.times do
final_result = yield final_result, results.receive
end
final_result
end
# 使用 - 平方和
numbers = (1..1000).to_a
sum = parallel_reduce(numbers, initial: 0) do |acc, n|
acc + n * n
end
puts "平方和: #{sum}"
互斥锁:保护共享状态
当纤维需要共享可变状态时,使用互斥锁防止竞态条件。
基本互斥锁使用
require "mutex"
class Counter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
current = @count
sleep 0.001 # 模拟一些工作
@count = current + 1
end
end
def value : Int32
@mutex.synchronize { @count }
end
end
counter = Counter.new
# 生成100个纤维,每个增量10次
100.times do
spawn do
10.times { counter.increment }
end
end
sleep 2
puts "最终计数: #{counter.value}" # 应为1000
读写锁模式
require "mutex"
class CachedData
def initialize
@data = {} of String => String
@mutex = Mutex.new
@version = 0
end
def read(key : String) : String?
@mutex.synchronize do
@data[key]?
end
end
def write(key : String, value : String)
@mutex.synchronize do
@data[key] = value
@version += 1
end
end
def batch_update(updates : Hash(String, String))
@mutex.synchronize do
updates.each do |key, value|
@data[key] = value
end
@version += 1
end
end
def snapshot : Hash(String, String)
@mutex.synchronize do
@data.dup
end
end
end
原子操作
对于简单计数器和标志,原子操作比互斥锁更高效。
原子计数器
require "atomic"
class AtomicCounter
def initialize(initial : Int32 = 0)
@count = Atomic(Int32).new(initial)
end
def increment : Int32
@count.add(1)
end
def decrement : Int32
@count.sub(1)
end
def value : Int32
@count.get
end
def compare_and_set(expected : Int32, new_value : Int32) : Bool
@count.compare_and_set(expected, new_value)
end
end
counter = AtomicCounter.new
# 无需互斥锁的安全并发增量
1000.times do
spawn { counter.increment }
end
sleep 1
puts "计数: #{counter.value}"
用于协调的原子标志
require "atomic"
class ShutdownCoordinator
def initialize
@shutdown_flag = Atomic(Int32).new(0)
end
def shutdown!
@shutdown_flag.set(1)
end
def shutdown? : Bool
@shutdown_flag.get == 1
end
def run_until_shutdown(&block)
until shutdown?
yield
sleep 0.1
end
end
end
coordinator = ShutdownCoordinator.new
# 检查关闭标志的工作器
spawn(name: "工作器") do
coordinator.run_until_shutdown do
puts "工作中..."
end
puts "工作器优雅关闭"
end
sleep 1
coordinator.shutdown!
sleep 0.5
何时使用此技能
在以下情况下使用crystal-concurrency技能:
- 并发处理多个I/O操作(网络请求、文件操作)
- 实施实时数据处理管道
- 构建用于并行任务处理的工作池
- 同时处理多个客户端连接(Web服务器、聊天系统)
- 执行后台处理而不阻塞主执行
- 聚合来自多个并发操作的结果
- 实施生产者-消费者模式
- 构建限流器和背压机制
- 并行处理大型数据集
- 协调多个异步操作
- 实施超时和取消模式
- 构建具有同步访问的并发缓存
- 流数据处理与多阶段
- 实施扇出/扇入模式
最佳实践
- 始终关闭通道:发送完成后关闭通道以向接收者表示完成
- 使用缓冲通道以提高性能:当生产者/消费者以不同速度运行时,缓冲通道
- 限制纤维计数:不要生成无限纤维;使用工作池进行有界并发
- 处理通道关闭:始终处理
Channel::ClosedError或在操作前检查closed? - 使用选择进行超时:使用
select和timeout()实施超时以防止无限阻塞 - 首选通道而非共享状态:可能时使用消息传递(通道)而非共享内存
- 同步共享状态:当在纤维间共享可变状态时,始终使用
Mutex或原子操作 - 清理资源:使用
ensure块确保即使错误时也清理资源 - 命名您的纤维:为调试和性能分析给纤维描述性名称
- 避免在纤维中阻塞操作:使用非阻塞I/O;阻塞操作阻止其他纤维运行
- 使用原子操作进行计数:原子操作比互斥锁更高效用于简单计数器和标志
- 实施优雅关闭:设计系统以优雅关闭,排空通道并等待纤维
- 处理纤维恐慌:在异常处理程序中包装纤维代码以防止静默失败
- 适当调整通道缓冲区大小:太小导致阻塞;太大浪费内存
- 使用选择默认进行轮询:非阻塞检查与
select ... else用于轮询模式
常见陷阱
- 忘记关闭通道:如果发送完成后不关闭通道,接收者将永远等待
- 无缓冲通道死锁:发送到无缓冲通道阻塞直到接收者准备就绪
- 共享状态竞态条件:当多个纤维访问相同数据时不使用互斥锁/原子操作
- 通道缓冲区溢出:发送超过缓冲区容量的项而没有接收者导致阻塞
- 不处理关闭通道:从关闭通道接收引发异常;始终处理它
- 生成太多纤维:无限纤维生成耗尽内存;使用工作池代替
- 阻塞调度器:纤维中的CPU密集型工作阻止其他纤维运行
- 资源泄漏:不关闭所有代码路径中的通道、文件或连接,包括错误
- 顺序假设:纤维以非确定性顺序执行;不要假设执行序列
- 超时太短:积极超时导致虚假失败;平衡响应性和可靠性
- 互斥锁持有时间太长:长临界区降低并发性;最小化互斥锁持有时间
- 发送/接收不匹配:不平衡的生产者/消费者导致内存积累或饥饿
- 忽略纤维异常:纤维中的异常不传播到生成者;显式处理
- 嵌套互斥锁锁定:可能导致死锁;避免获取多个互斥锁或使用一致顺序
- 不使用
synchronize:忘记在互斥锁使用中包装synchronize块导致竞态条件