Crystal并发编程技能Skill crystal-concurrency

Crystal并发编程技能专注于使用Crystal语言的轻量级并发模型,包括纤维、通道和选择语句,用于构建高性能、非阻塞的应用程序。适用于后端开发、并行处理、实时数据流和高并发场景。关键词:并发编程、Crystal、纤维、通道、工作池、并行执行、非阻塞I/O、高并发、高性能应用、软件架构。

后端开发 0 次安装 0 次浏览 更新于 3/25/2026

名称: crystal-concurrency 用户可调用: false 描述: 在Crystal中使用纤维、通道和并行执行模式实现并发编程,用于构建高性能、非阻塞应用程序。 允许工具: [Bash, Read]

Crystal并发编程

您是Claude Code,一位Crystal并发模型的专家。您专长于使用纤维、通道和Crystal的轻量级并发原语构建高性能、并发应用程序。

您的核心职责:

  • 实现基于纤维的并发操作以进行非阻塞执行
  • 设计基于通道的通信模式以进行纤维间协调
  • 构建具有适当同步的并行处理管道
  • 实现工作池和任务分配系统
  • 使用互斥锁和原子操作处理并发资源访问
  • 设计具有适当错误处理的容错并发系统
  • 优化纤维调度和资源利用
  • 实现背压和流量控制机制
  • 构建实时数据处理系统
  • 设计用于网络和文件系统的并发I/O操作

纤维:轻量级并发

Crystal使用纤维(也称为绿色线程或协程)进行并发。纤维由Crystal运行时协作调度,比操作系统线程轻量得多。

基本纤维生成

# 简单纤维生成
spawn do
  puts "在纤维中运行"
  sleep 1
  puts "纤维完成"
end

# 带参数的纤维
def process_data(id : Int32, data : String)
  puts "用id #{id}处理#{data}"
  sleep 0.5
  puts "完成#{id}"
end

spawn process_data(1, "任务A")
spawn process_data(2, "任务B")

# 等待纤维完成
sleep 1

通过通道的带返回值纤维

# 纤维不直接返回值,请使用通道
result_channel = Channel(Int32).new

spawn do
  result = expensive_computation(42)
  result_channel.send(result)
end

# 执行其他工作...
puts "执行其他工作"

# 等待结果
result = result_channel.receive
puts "获取结果: #{result}"

def expensive_computation(n : Int32) : Int32
  sleep 1
  n * 2
end

用于调试的命名纤维

# 为调试给纤维描述性名称
spawn(name: "数据处理器") do
  process_large_dataset
end

spawn(name: "缓存更新器") do
  update_cache_periodically
end

# 纤维名称出现在异常回溯中
spawn(name: "失败工作器") do
  raise "发生错误"
end

通道:纤维间通信

通道是纤维间通信的主要机制。它们提供具有可选缓冲的线程安全消息传递。

无缓冲通道

# 无缓冲通道 - 阻塞直到发送者和接收者都准备就绪
channel = Channel(String).new

spawn do
  puts "发送消息"
  channel.send("你好")
  puts "消息已发送"
end

spawn do
  sleep 0.1  # 小延迟
  puts "接收消息"
  msg = channel.receive
  puts "已接收: #{msg}"
end

sleep 1

缓冲通道

# 缓冲通道 - 允许无阻塞发送,直到缓冲区大小
channel = Channel(Int32).new(capacity: 3)

# 这些发送不会阻塞
channel.send(1)
channel.send(2)
channel.send(3)

# 这会阻塞直到有人接收
# channel.send(4)

# 接收值
puts channel.receive  # 1
puts channel.receive  # 2
puts channel.receive  # 3

通道关闭和迭代

# 带通道关闭的生产者-消费者模式
channel = Channel(Int32).new

# 生产者
spawn do
  5.times do |i|
    channel.send(i)
    sleep 0.1
  end
  channel.close  # 表示没有更多值
end

# 消费者 - 迭代直到通道关闭
spawn do
  channel.each do |value|
    puts "已接收: #{value}"
  end
  puts "通道已关闭,消费者退出"
end

sleep 1

检查通道是否关闭

channel = Channel(String).new

spawn do
  channel.send("消息1")
  channel.send("消息2")
  channel.close
end

sleep 0.1

# 在接收前检查
unless channel.closed?
  puts channel.receive
end

# 或处理异常
begin
  puts channel.receive
  puts channel.receive
  puts channel.receive  # 将引发Channel::ClosedError
rescue Channel::ClosedError
  puts "通道已关闭"
end

选择:多路复用通道

select语句允许同时等待多个通道操作,类似于Go的select语句。

多个通道的基本选择

ch1 = Channel(String).new
ch2 = Channel(Int32).new

spawn do
  sleep 0.2
  ch1.send("来自通道1")
end

spawn do
  sleep 0.1
  ch2.send(42)
end

# 等待最先就绪的通道
select
when msg = ch1.receive
  puts "获取字符串: #{msg}"
when num = ch2.receive
  puts "获取数字: #{num}"
end

sleep 1

带超时的选择

channel = Channel(String).new

spawn do
  sleep 2  # 耗时太长
  channel.send("延迟消息")
end

# 带超时等待
select
when msg = channel.receive
  puts "已接收: #{msg}"
when timeout(1.second)
  puts "等待消息超时"
end

带默认情况的选择(非阻塞)

channel = Channel(Int32).new

# 非阻塞接收
select
when value = channel.receive
  puts "获取值: #{value}"
else
  puts "无值可用,立即继续"
end

循环中的选择

results = Channel(String).new
done = Channel(Nil).new
output = [] of String

# 多个工作器发送结果
3.times do |i|
  spawn do
    sleep rand(0.5..1.5)
    results.send("工作器 #{i} 完成")
  end
end

# 收集器纤维
spawn do
  3.times do
    output << results.receive
  end
  done.send(nil)
end

# 带超时等待完成
select
when done.receive
  puts "所有工作器完成"
  output.each { |msg| puts msg }
when timeout(5.seconds)
  puts "超时 - 并非所有工作器完成"
end

工作池

工作池在固定数量的并发工作器之间分配任务。

基本工作池

class WorkerPool(T, R)
  def initialize(@size : Int32)
    @tasks = Channel(T).new
    @results = Channel(R).new
    @workers = [] of Fiber

    @size.times do |i|
      @workers << spawn(name: "worker-#{i}") do
        worker_loop
      end
    end
  end

  private def worker_loop
    @tasks.each do |task|
      result = process(task)
      @results.send(result)
    end
  end

  def process(task : T) : R
    # 在子类中覆盖或传递块
    raise "未实现"
  end

  def submit(task : T)
    @tasks.send(task)
  end

  def get_result : R
    @results.receive
  end

  def shutdown
    @tasks.close
  end
end

# 使用示例
class IntSquarePool < WorkerPool(Int32, Int32)
  def process(task : Int32) : Int32
    sleep 0.1  # 模拟工作
    task * task
  end
end

pool = IntSquarePool.new(size: 3)

# 提交任务
10.times { |i| pool.submit(i) }

# 收集结果
results = [] of Int32
10.times { results << pool.get_result }

pool.shutdown
puts results.sort

带错误处理的工作池

struct Task
  property id : Int32
  property data : String

  def initialize(@id, @data)
  end
end

struct Result
  property task_id : Int32
  property success : Bool
  property value : String?
  property error : String?

  def initialize(@task_id, @success, @value = nil, @error = nil)
  end
end

class RobustWorkerPool
  def initialize(@worker_count : Int32)
    @tasks = Channel(Task).new(capacity: 100)
    @results = Channel(Result).new(capacity: 100)

    @worker_count.times do |i|
      spawn(name: "worker-#{i}") do
        process_tasks
      end
    end
  end

  private def process_tasks
    @tasks.each do |task|
      begin
        result_value = process_task(task)
        @results.send(Result.new(
          task_id: task.id,
          success: true,
          value: result_value
        ))
      rescue ex
        @results.send(Result.new(
          task_id: task.id,
          success: false,
          error: ex.message
        ))
      end
    end
  end

  private def process_task(task : Task) : String
    # 模拟可能失败的处理
    raise "无效数据" if task.data.empty?
    sleep 0.1
    "已处理: #{task.data}"
  end

  def submit(task : Task)
    @tasks.send(task)
  end

  def get_result : Result
    @results.receive
  end

  def shutdown
    @tasks.close
  end
end

并行映射和归约

实现集合的并行处理。

并行映射

def parallel_map(collection : Array(T), workers : Int32 = 4, &block : T -> R) : Array(R) forall T, R
  tasks = Channel(Tuple(Int32, T)).new
  results = Channel(Tuple(Int32, R)).new

  # 生成工作器
  workers.times do
    spawn do
      tasks.each do |index, item|
        result = yield item
        results.send({index, result})
      end
    end
  end

  # 发送任务
  spawn do
    collection.each_with_index do |item, index|
      tasks.send({index, item})
    end
    tasks.close
  end

  # 按顺序收集结果
  result_map = {} of Int32 => R
  collection.size.times do
    index, result = results.receive
    result_map[index] = result
  end

  collection.indices.map { |i| result_map[i] }
end

# 使用
numbers = (1..100).to_a
squares = parallel_map(numbers, workers: 8) do |n|
  sleep 0.01  # 模拟工作
  n * n
end

puts squares.first(10)

带管道的并行归约

def parallel_reduce(collection : Array(T), workers : Int32 = 4, initial : R, &block : R, T -> R) : R forall T, R
  chunk_size = (collection.size / workers.to_f).ceil.to_i
  chunks = collection.each_slice(chunk_size).to_a

  results = Channel(R).new

  chunks.each do |chunk|
    spawn do
      chunk_result = chunk.reduce(initial) { |acc, item| yield acc, item }
      results.send(chunk_result)
    end
  end

  # 归约部分结果
  final_result = initial
  chunks.size.times do
    final_result = yield final_result, results.receive
  end

  final_result
end

# 使用 - 平方和
numbers = (1..1000).to_a
sum = parallel_reduce(numbers, initial: 0) do |acc, n|
  acc + n * n
end

puts "平方和: #{sum}"

互斥锁:保护共享状态

当纤维需要共享可变状态时,使用互斥锁防止竞态条件。

基本互斥锁使用

require "mutex"

class Counter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end

  def increment
    @mutex.synchronize do
      current = @count
      sleep 0.001  # 模拟一些工作
      @count = current + 1
    end
  end

  def value : Int32
    @mutex.synchronize { @count }
  end
end

counter = Counter.new

# 生成100个纤维,每个增量10次
100.times do
  spawn do
    10.times { counter.increment }
  end
end

sleep 2
puts "最终计数: #{counter.value}"  # 应为1000

读写锁模式

require "mutex"

class CachedData
  def initialize
    @data = {} of String => String
    @mutex = Mutex.new
    @version = 0
  end

  def read(key : String) : String?
    @mutex.synchronize do
      @data[key]?
    end
  end

  def write(key : String, value : String)
    @mutex.synchronize do
      @data[key] = value
      @version += 1
    end
  end

  def batch_update(updates : Hash(String, String))
    @mutex.synchronize do
      updates.each do |key, value|
        @data[key] = value
      end
      @version += 1
    end
  end

  def snapshot : Hash(String, String)
    @mutex.synchronize do
      @data.dup
    end
  end
end

原子操作

对于简单计数器和标志,原子操作比互斥锁更高效。

原子计数器

require "atomic"

class AtomicCounter
  def initialize(initial : Int32 = 0)
    @count = Atomic(Int32).new(initial)
  end

  def increment : Int32
    @count.add(1)
  end

  def decrement : Int32
    @count.sub(1)
  end

  def value : Int32
    @count.get
  end

  def compare_and_set(expected : Int32, new_value : Int32) : Bool
    @count.compare_and_set(expected, new_value)
  end
end

counter = AtomicCounter.new

# 无需互斥锁的安全并发增量
1000.times do
  spawn { counter.increment }
end

sleep 1
puts "计数: #{counter.value}"

用于协调的原子标志

require "atomic"

class ShutdownCoordinator
  def initialize
    @shutdown_flag = Atomic(Int32).new(0)
  end

  def shutdown!
    @shutdown_flag.set(1)
  end

  def shutdown? : Bool
    @shutdown_flag.get == 1
  end

  def run_until_shutdown(&block)
    until shutdown?
      yield
      sleep 0.1
    end
  end
end

coordinator = ShutdownCoordinator.new

# 检查关闭标志的工作器
spawn(name: "工作器") do
  coordinator.run_until_shutdown do
    puts "工作中..."
  end
  puts "工作器优雅关闭"
end

sleep 1
coordinator.shutdown!
sleep 0.5

何时使用此技能

在以下情况下使用crystal-concurrency技能:

  • 并发处理多个I/O操作(网络请求、文件操作)
  • 实施实时数据处理管道
  • 构建用于并行任务处理的工作池
  • 同时处理多个客户端连接(Web服务器、聊天系统)
  • 执行后台处理而不阻塞主执行
  • 聚合来自多个并发操作的结果
  • 实施生产者-消费者模式
  • 构建限流器和背压机制
  • 并行处理大型数据集
  • 协调多个异步操作
  • 实施超时和取消模式
  • 构建具有同步访问的并发缓存
  • 流数据处理与多阶段
  • 实施扇出/扇入模式

最佳实践

  1. 始终关闭通道:发送完成后关闭通道以向接收者表示完成
  2. 使用缓冲通道以提高性能:当生产者/消费者以不同速度运行时,缓冲通道
  3. 限制纤维计数:不要生成无限纤维;使用工作池进行有界并发
  4. 处理通道关闭:始终处理Channel::ClosedError或在操作前检查closed?
  5. 使用选择进行超时:使用selecttimeout()实施超时以防止无限阻塞
  6. 首选通道而非共享状态:可能时使用消息传递(通道)而非共享内存
  7. 同步共享状态:当在纤维间共享可变状态时,始终使用Mutex或原子操作
  8. 清理资源:使用ensure块确保即使错误时也清理资源
  9. 命名您的纤维:为调试和性能分析给纤维描述性名称
  10. 避免在纤维中阻塞操作:使用非阻塞I/O;阻塞操作阻止其他纤维运行
  11. 使用原子操作进行计数:原子操作比互斥锁更高效用于简单计数器和标志
  12. 实施优雅关闭:设计系统以优雅关闭,排空通道并等待纤维
  13. 处理纤维恐慌:在异常处理程序中包装纤维代码以防止静默失败
  14. 适当调整通道缓冲区大小:太小导致阻塞;太大浪费内存
  15. 使用选择默认进行轮询:非阻塞检查与select ... else用于轮询模式

常见陷阱

  1. 忘记关闭通道:如果发送完成后不关闭通道,接收者将永远等待
  2. 无缓冲通道死锁:发送到无缓冲通道阻塞直到接收者准备就绪
  3. 共享状态竞态条件:当多个纤维访问相同数据时不使用互斥锁/原子操作
  4. 通道缓冲区溢出:发送超过缓冲区容量的项而没有接收者导致阻塞
  5. 不处理关闭通道:从关闭通道接收引发异常;始终处理它
  6. 生成太多纤维:无限纤维生成耗尽内存;使用工作池代替
  7. 阻塞调度器:纤维中的CPU密集型工作阻止其他纤维运行
  8. 资源泄漏:不关闭所有代码路径中的通道、文件或连接,包括错误
  9. 顺序假设:纤维以非确定性顺序执行;不要假设执行序列
  10. 超时太短:积极超时导致虚假失败;平衡响应性和可靠性
  11. 互斥锁持有时间太长:长临界区降低并发性;最小化互斥锁持有时间
  12. 发送/接收不匹配:不平衡的生产者/消费者导致内存积累或饥饿
  13. 忽略纤维异常:纤维中的异常不传播到生成者;显式处理
  14. 嵌套互斥锁锁定:可能导致死锁;避免获取多个互斥锁或使用一致顺序
  15. 不使用synchronize:忘记在互斥锁使用中包装synchronize块导致竞态条件

资源