Go并发模式Skill go-concurrency-patterns

这个技能专注于Go语言的并发编程模式,提供实用示例和最佳实践,帮助开发者高效构建并发应用程序。涵盖goroutines、channels、同步原语、context管理等工作池、管道、优雅关闭等模式。关键词:Go并发、goroutine、channel、同步、context、工作池、竞争条件、并发模式、编程技能。

后端开发 0 次安装 0 次浏览 更新于 3/22/2026

name: Go并发模式 description: 掌握Go并发,使用goroutines、channels、同步原语和context。在构建并发Go应用、实现工作池或调试竞争条件时使用。

Go并发模式

生产级别的Go并发模式,包括goroutines、channels、同步原语和context管理。

何时使用此技能

  • 构建并发Go应用程序
  • 实现工作池和管道
  • 管理goroutine生命周期
  • 使用channels进行通信
  • 调试竞争条件
  • 实现优雅关闭

核心概念

1. Go并发原语

原语 用途
goroutine 轻量级并发执行
channel goroutines之间的通信
select 多路复用channel操作
sync.Mutex 互斥锁
sync.WaitGroup 等待goroutines完成
context.Context 取消和超时管理

2. Go并发格言

不要通过共享内存进行通信;
通过通信共享内存。

快速开始

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    results := make(chan string, 10)
    var wg sync.WaitGroup

    // 启动工作者
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(ctx, i, results, &wg)
    }

    // 完成后关闭results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for result := range results {
        fmt.Println(result)
    }
}

func worker(ctx context.Context, id int, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()

    select {
    case <-ctx.Done():
        return
    case results <- fmt.Sprintf("工作者 %d 完成", id):
    }
}

模式

模式1: 工作池

package main

import (
    "context"
    "fmt"
    "sync"
)

type Job struct {
    ID   int
    Data string
}

type Result struct {
    JobID  int
    Output string
    Err    error
}

func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
    results := make(chan Result, len(jobs))

    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                select {
                case <-ctx.Done():
                    return
                default:
                    result := processJob(job)
                    results <- result
                }
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

func processJob(job Job) Result {
    // 模拟工作
    return Result{
        JobID:  job.ID,
        Output: fmt.Sprintf("已处理: %s", job.Data),
    }
}

// 使用示例
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    jobs := make(chan Job, 100)

    // 发送任务
    go func() {
        for i := 0; i < 50; i++ {
            jobs <- Job{ID: i, Data: fmt.Sprintf("任务-%d", i)}
        }
        close(jobs)
    }()

    // 使用5个工作者处理
    results := WorkerPool(ctx, 5, jobs)

    for result := range results {
        fmt.Printf("结果: %+v
", result)
    }
}

模式2: 扇出/扇入管道

package main

import (
    "context"
    "sync"
)

// 阶段1: 生成数字
func generate(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case <-ctx.Done():
                return
            case out <- n:
            }
        }
    }()
    return out
}

// 阶段2: 平方数字(可运行多个实例)
func square(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case <-ctx.Done():
                return
            case out <- n * n:
            }
        }
    }()
    return out
}

// 扇入: 合并多个通道为一个
func merge(ctx context.Context, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个输入通道启动输出goroutine
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case <-ctx.Done():
                return
            case out <- n:
            }
        }
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 所有输入完成后关闭out
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 生成输入
    in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    // 扇出到多个平方器
    c1 := square(ctx, in)
    c2 := square(ctx, in)
    c3 := square(ctx, in)

    // 扇入结果
    for result := range merge(ctx, c1, c2, c3) {
        fmt.Println(result)
    }
}

模式3: 使用信号量的有界并发

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/semaphore"
    "sync"
)

type RateLimitedWorker struct {
    sem *semaphore.Weighted
}

func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
    return &RateLimitedWorker{
        sem: semaphore.NewWeighted(maxConcurrent),
    }
}

func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
    var (
        wg     sync.WaitGroup
        mu     sync.Mutex
        errors []error
    )

    for _, task := range tasks {
        // 获取信号量(如果达到限制则阻塞)
        if err := w.sem.Acquire(ctx, 1); err != nil {
            return []error{err}
        }

        wg.Add(1)
        go func(t func() error) {
            defer wg.Done()
            defer w.sem.Release(1)

            if err := t(); err != nil {
                mu.Lock()
                errors = append(errors, err)
                mu.Unlock()
            }
        }(task)
    }

    wg.Wait()
    return errors
}

// 替代方案: 基于通道的信号量
type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
    return make(chan struct{}, n)
}

func (s Semaphore) Acquire() {
    s <- struct{}{}
}

func (s Semaphore) Release() {
    <-s
}

模式4: 优雅关闭

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

type Server struct {
    shutdown chan struct{}
    wg       sync.WaitGroup
}

func NewServer() *Server {
    return &Server{
        shutdown: make(chan struct{}),
    }
}

func (s *Server) Start(ctx context.Context) {
    // 启动工作者
    for i := 0; i < 5; i++ {
        s.wg.Add(1)
        go s.worker(ctx, i)
    }
}

func (s *Server) worker(ctx context.Context, id int) {
    defer s.wg.Done()
    defer fmt.Printf("工作者 %d 已停止
", id)

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            // 清理
            fmt.Printf("工作者 %d 正在清理...
", id)
            time.Sleep(500 * time.Millisecond) // 模拟清理
            return
        case <-ticker.C:
            fmt.Printf("工作者 %d 正在工作...
", id)
        }
    }
}

func (s *Server) Shutdown(timeout time.Duration) {
    // 发送关闭信号
    close(s.shutdown)

    // 超时等待
    done := make(chan struct{})
    go func() {
        s.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        fmt.Println("优雅关闭完成")
    case <-time.After(timeout):
        fmt.Println("关闭超时,强制退出")
    }
}

func main() {
    // 设置信号处理
    ctx, cancel := context.WithCancel(context.Background())

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    server := NewServer()
    server.Start(ctx)

    // 等待信号
    sig := <-sigCh
    fmt.Printf("
接收到信号: %v
", sig)

    // 取消context以停止工作者
    cancel()

    // 等待优雅关闭
    server.Shutdown(5 * time.Second)
}

模式5: 带有取消的错误组

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "net/http"
)

func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)

    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获循环变量

        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return fmt.Errorf("为 %s 创建请求时出错: %w", url, err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("获取 %s 时出错: %w", url, err)
            }
            defer resp.Body.Close()

            results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)
            return nil
        })
    }

    // 等待所有goroutines完成或一个失败
    if err := g.Wait(); err != nil {
        return nil, err // 第一个错误取消所有其他
    }

    return results, nil
}

// 带并发限制
func fetchWithLimit(ctx context.Context, urls []string, limit int) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(limit) // 最大并发goroutines数

    results := make([]string, len(urls))
    var mu sync.Mutex

    for i, url := range urls {
        i, url := i, url

        g.Go(func() error {
            result, err := fetchURL(ctx, url)
            if err != nil {
                return err
            }

            mu.Lock()
            results[i] = result
            mu.Unlock()
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }

    return results, nil
}

模式6: 使用sync.Map的并发映射

package main

import (
    "sync"
)

// 用于频繁读取、不频繁写入
type Cache struct {
    m sync.Map
}

func (c *Cache) Get(key string) (interface{}, bool) {
    return c.m.Load(key)
}

func (c *Cache) Set(key string, value interface{}) {
    c.m.Store(key, value)
}

func (c *Cache) GetOrSet(key string, value interface{}) (interface{}, bool) {
    return c.m.LoadOrStore(key, value)
}

func (c *Cache) Delete(key string) {
    c.m.Delete(key)
}

// 用于写入密集型工作负载,使用分片映射
type ShardedMap struct {
    shards    []*shard
    numShards int
}

type shard struct {
    sync.RWMutex
    data map[string]interface{}
}

func NewShardedMap(numShards int) *ShardedMap {
    m := &ShardedMap{
        shards:    make([]*shard, numShards),
        numShards: numShards,
    }
    for i := range m.shards {
        m.shards[i] = &shard{data: make(map[string]interface{})}
    }
    return m
}

func (m *ShardedMap) getShard(key string) *shard {
    // 简单哈希
    h := 0
    for _, c := range key {
        h = 31*h + int(c)
    }
    return m.shards[h%m.numShards]
}

func (m *ShardedMap) Get(key string) (interface{}, bool) {
    shard := m.getShard(key)
    shard.RLock()
    defer shard.RUnlock()
    v, ok := shard.data[key]
    return v, ok
}

func (m *ShardedMap) Set(key string, value interface{}) {
    shard := m.getShard(key)
    shard.Lock()
    defer shard.Unlock()
    shard.data[key] = value
}

模式7: 带有超时和默认的Select

func selectPatterns() {
    ch := make(chan int)

    // 超时模式
    select {
    case v := <-ch:
        fmt.Println("已接收:", v)
    case <-time.After(time.Second):
        fmt.Println("超时!")
    }

    // 非阻塞发送/接收
    select {
    case ch <- 42:
        fmt.Println("已发送")
    default:
        fmt.Println("通道已满,跳过")
    }

    // 优先级select(先检查高优先级)
    highPriority := make(chan int)
    lowPriority := make(chan int)

    for {
        select {
        case msg := <-highPriority:
            fmt.Println("高优先级:", msg)
        default:
            select {
            case msg := <-highPriority:
                fmt.Println("高优先级:", msg)
            case msg := <-lowPriority:
                fmt.Println("低优先级:", msg)
            }
        }
    }
}

竞争检测

# 使用竞争检测器运行测试
go test -race ./...

# 使用竞争检测器构建
go build -race .

# 使用竞争检测器运行
go run -race main.go

最佳实践

建议

  • 使用context - 用于取消和超时
  • 关闭channels - 仅从发送方关闭
  • 使用errgroup - 用于带错误的并发操作
  • 缓冲channels - 当你知道数量时
  • 优先使用channels - 尽可能避免mutexes

不建议

  • 不要泄露goroutines - 总是有退出路径
  • 不要从接收方关闭 - 会导致panic
  • 不要使用共享内存 - 除非必要
  • 不要忽略context取消 - 检查ctx.Done()
  • 不要用time.Sleep进行同步 - 使用适当的原语

资源