Go并发编程Skill go-concurrency

Go并发编程技能专注于使用Go语言的goroutines、channels和同步模式来构建高性能的并发应用程序,适用于服务器开发、数据处理、多线程编程等场景。关键词包括:Go并发、goroutines、channels、同步模式、并发编程、高性能、服务器开发、多线程。

后端开发 0 次安装 2 次浏览 更新于 3/25/2026

name: go-concurrency user-invocable: false description: 当使用Go的并发编程,包括goroutines、channels和同步模式时使用。当编写并发Go代码时使用。 allowed-tools:

  • Bash
  • Read

Go并发编程

掌握Go的并发模型,使用goroutines、channels和同步原语来构建并发应用程序。

Goroutines

创建goroutines:

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("来自goroutine的问候")
}

func main() {
    // 启动goroutine
    go sayHello()

    // 匿名函数goroutine
    go func() {
        fmt.Println("来自匿名goroutine的问候")
    }()

    // 给goroutines时间执行
    time.Sleep(time.Second)
}

带参数的goroutines:

func printNumber(n int) {
    fmt.Println(n)
}

func main() {
    for i := 0; i < 10; i++ {
        go printNumber(i)
    }
    time.Sleep(time.Second)
}

Channels

基本channel操作:

func main() {
    // 创建无缓冲channel
    ch := make(chan int)

    // 在goroutine中发送(非阻塞)
    go func() {
        ch <- 42
    }()

    // 接收(阻塞直到值可用)
    value := <-ch
    fmt.Println(value) // 42
}

缓冲channels:

func main() {
    // 缓冲channel,容量为2
    ch := make(chan string, 2)

    // 可以发送最多2个值而不阻塞
    ch <- "第一个"
    ch <- "第二个"

    fmt.Println(<-ch) // 第一个
    fmt.Println(<-ch) // 第二个
}

Channel方向:

// 只发送channel
func send(ch chan<- int) {
    ch <- 42
}

// 只接收channel
func receive(ch <-chan int) int {
    return <-ch
}

func main() {
    ch := make(chan int)

    go send(ch)
    value := receive(ch)

    fmt.Println(value)
}

关闭channels:

func main() {
    ch := make(chan int, 3)

    ch <- 1
    ch <- 2
    ch <- 3
    close(ch) // 关闭channel

    // 接收直到channel关闭
    for value := range ch {
        fmt.Println(value)
    }

    // 检查channel是否关闭
    value, ok := <-ch
    fmt.Printf("值: %d, 打开: %v
", value, ok) // 值: 0, 打开: false
}

Select语句

多路复用channels:

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(time.Second)
        ch1 <- "来自ch1"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自ch2"
    }()

    // 等待两者
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        }
    }
}

Select带默认:

func main() {
    ch := make(chan int, 1)

    select {
    case val := <-ch:
        fmt.Println(val)
    default:
        fmt.Println("没有值就绪") // 执行
    }
}

Select带超时:

func main() {
    ch := make(chan string)

    go func() {
        time.Sleep(2 * time.Second)
        ch <- "结果"
    }()

    select {
    case msg := <-ch:
        fmt.Println(msg)
    case <-time.After(time.Second):
        fmt.Println("超时") // 1秒后执行
    }
}

Worker Pools

实现worker pool模式:

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d 处理任务 %d
", id, job)
        time.Sleep(time.Second)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动3个workers
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // 发送5个任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)

    // 收集结果
    for a := 1; a <= 5; a++ {
        <-results
    }
}

sync.WaitGroup

等待goroutines完成:

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成后递减计数器

    fmt.Printf("Worker %d 开始
", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成
", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1) // 递增计数器
        go worker(i, &wg)
    }

    wg.Wait() // 等待所有完成
    fmt.Println("所有workers完成")
}

sync.Mutex

保护共享状态:

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    var wg sync.WaitGroup
    counter := Counter{}

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Println(counter.Value()) // 1000
}

sync.RWMutex

读写锁:

type Cache struct {
    mu    sync.RWMutex
    items map[string]string
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock() // 读锁
    defer c.mu.RUnlock()
    val, ok := c.items[key]
    return val, ok
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock() // 写锁
    defer c.mu.Unlock()
    c.items[key] = value
}

func main() {
    cache := Cache{items: make(map[string]string)}

    // 多个读取者可以同时访问
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            cache.Get("key")
        }()
    }

    wg.Wait()
}

sync.Once

执行一次初始化:

var (
    instance *Database
    once     sync.Once
)

type Database struct {
    conn string
}

func GetDatabase() *Database {
    once.Do(func() {
        fmt.Println("初始化数据库")
        instance = &Database{conn: "已连接"}
    })
    return instance
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDatabase() // 只初始化一次
            fmt.Println(db.conn)
        }()
    }

    wg.Wait()
}

Context包

使用context进行取消:

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d 已取消
", id)
            return
        default:
            fmt.Printf("Worker %d 工作中
", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }

    time.Sleep(2 * time.Second)
    cancel() // 取消所有workers
    time.Sleep(time.Second)
}

带超时的context:

func slowOperation(ctx context.Context) error {
    select {
    case <-time.After(3 * time.Second):
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    ctx, cancel := context.WithTimeout(
        context.Background(),
        2*time.Second,
    )
    defer cancel()

    err := slowOperation(ctx)
    if err != nil {
        fmt.Println("操作超时:", err)
    }
}

带值的context:

func processRequest(ctx context.Context) {
    userID := ctx.Value("userID")
    fmt.Println("为用户处理:", userID)
}

func main() {
    ctx := context.WithValue(
        context.Background(),
        "userID",
        "user123",
    )
    processRequest(ctx)
}

并发代码中的错误处理

使用errgroup:

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "time"
)

func fetchUser(ctx context.Context, id int) error {
    time.Sleep(time.Second)
    if id == 3 {
        return fmt.Errorf("用户 %d 未找到", id)
    }
    fmt.Printf("已获取用户 %d
", id)
    return nil
}

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    userIDs := []int{1, 2, 3, 4, 5}

    for _, id := range userIDs {
        id := id // 捕获循环变量
        g.Go(func() error {
            return fetchUser(ctx, id)
        })
    }

    // 等待所有goroutines
    if err := g.Wait(); err != nil {
        fmt.Println("错误:", err)
    }
}

Fan-Out Fan-In模式

分发工作和收集结果:

func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    wg.Add(len(cs))
    for _, c := range cs {
        go func(ch <-chan int) {
            defer wg.Done()
            for n := range ch {
                out <- n
            }
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    in := generator(1, 2, 3, 4, 5)

    // Fan out
    c1 := square(in)
    c2 := square(in)

    // Fan in
    for n := range merge(c1, c2) {
        fmt.Println(n)
    }
}

何时使用此技能

使用go-concurrency当您需要:

  • 并发执行多个操作
  • 构建并发服务器或workers
  • 实现生产者-消费者模式
  • 并发处理数据流
  • 同时处理多个I/O操作
  • 实现超时和取消
  • 协调多个goroutines
  • 构建fan-out/fan-in管道
  • 安全地在goroutines之间共享状态
  • 实现速率限制或节流

最佳实践

  • 使用channels进行通信,mutexes用于状态
  • 仅从发送方关闭channels
  • 始终使用WaitGroup等待goroutines
  • 传递contexts进行取消和截止时间
  • 明智地使用缓冲channels
  • 使用mutexes保护共享状态
  • 避免goroutine泄漏,正确清理
  • 使用select带default进行非阻塞操作
  • 优先使用sync.Once进行初始化
  • 记录goroutine所有权和生命周期

常见陷阱

  • Goroutine泄漏(忘记退出)
  • 来自未保护共享状态的竞争条件
  • 由于不当channel使用导致的死锁
  • 在已关闭的channels上发送(引发panic)
  • 不检查channel关闭状态
  • 过度使用mutexes而非channels
  • 创建太多goroutines
  • 忘记调用WaitGroup.Done()
  • 将循环变量传递给goroutines
  • 不处理context取消

资源