name: go-concurrency user-invocable: false description: 当使用Go的并发编程,包括goroutines、channels和同步模式时使用。当编写并发Go代码时使用。 allowed-tools:
- Bash
- Read
Go并发编程
掌握Go的并发模型,使用goroutines、channels和同步原语来构建并发应用程序。
Goroutines
创建goroutines:
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("来自goroutine的问候")
}
func main() {
// 启动goroutine
go sayHello()
// 匿名函数goroutine
go func() {
fmt.Println("来自匿名goroutine的问候")
}()
// 给goroutines时间执行
time.Sleep(time.Second)
}
带参数的goroutines:
func printNumber(n int) {
fmt.Println(n)
}
func main() {
for i := 0; i < 10; i++ {
go printNumber(i)
}
time.Sleep(time.Second)
}
Channels
基本channel操作:
func main() {
// 创建无缓冲channel
ch := make(chan int)
// 在goroutine中发送(非阻塞)
go func() {
ch <- 42
}()
// 接收(阻塞直到值可用)
value := <-ch
fmt.Println(value) // 42
}
缓冲channels:
func main() {
// 缓冲channel,容量为2
ch := make(chan string, 2)
// 可以发送最多2个值而不阻塞
ch <- "第一个"
ch <- "第二个"
fmt.Println(<-ch) // 第一个
fmt.Println(<-ch) // 第二个
}
Channel方向:
// 只发送channel
func send(ch chan<- int) {
ch <- 42
}
// 只接收channel
func receive(ch <-chan int) int {
return <-ch
}
func main() {
ch := make(chan int)
go send(ch)
value := receive(ch)
fmt.Println(value)
}
关闭channels:
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch) // 关闭channel
// 接收直到channel关闭
for value := range ch {
fmt.Println(value)
}
// 检查channel是否关闭
value, ok := <-ch
fmt.Printf("值: %d, 打开: %v
", value, ok) // 值: 0, 打开: false
}
Select语句
多路复用channels:
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(time.Second)
ch1 <- "来自ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自ch2"
}()
// 等待两者
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
Select带默认:
func main() {
ch := make(chan int, 1)
select {
case val := <-ch:
fmt.Println(val)
default:
fmt.Println("没有值就绪") // 执行
}
}
Select带超时:
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "结果"
}()
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(time.Second):
fmt.Println("超时") // 1秒后执行
}
}
Worker Pools
实现worker pool模式:
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d
", id, job)
time.Sleep(time.Second)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动3个workers
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送5个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for a := 1; a <= 5; a++ {
<-results
}
}
sync.WaitGroup
等待goroutines完成:
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成后递减计数器
fmt.Printf("Worker %d 开始
", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成
", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 递增计数器
go worker(i, &wg)
}
wg.Wait() // 等待所有完成
fmt.Println("所有workers完成")
}
sync.Mutex
保护共享状态:
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
c.value++
c.mu.Unlock()
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
var wg sync.WaitGroup
counter := Counter{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println(counter.Value()) // 1000
}
sync.RWMutex
读写锁:
type Cache struct {
mu sync.RWMutex
items map[string]string
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
val, ok := c.items[key]
return val, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.items[key] = value
}
func main() {
cache := Cache{items: make(map[string]string)}
// 多个读取者可以同时访问
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cache.Get("key")
}()
}
wg.Wait()
}
sync.Once
执行一次初始化:
var (
instance *Database
once sync.Once
)
type Database struct {
conn string
}
func GetDatabase() *Database {
once.Do(func() {
fmt.Println("初始化数据库")
instance = &Database{conn: "已连接"}
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDatabase() // 只初始化一次
fmt.Println(db.conn)
}()
}
wg.Wait()
}
Context包
使用context进行取消:
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d 已取消
", id)
return
default:
fmt.Printf("Worker %d 工作中
", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
time.Sleep(2 * time.Second)
cancel() // 取消所有workers
time.Sleep(time.Second)
}
带超时的context:
func slowOperation(ctx context.Context) error {
select {
case <-time.After(3 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
ctx, cancel := context.WithTimeout(
context.Background(),
2*time.Second,
)
defer cancel()
err := slowOperation(ctx)
if err != nil {
fmt.Println("操作超时:", err)
}
}
带值的context:
func processRequest(ctx context.Context) {
userID := ctx.Value("userID")
fmt.Println("为用户处理:", userID)
}
func main() {
ctx := context.WithValue(
context.Background(),
"userID",
"user123",
)
processRequest(ctx)
}
并发代码中的错误处理
使用errgroup:
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func fetchUser(ctx context.Context, id int) error {
time.Sleep(time.Second)
if id == 3 {
return fmt.Errorf("用户 %d 未找到", id)
}
fmt.Printf("已获取用户 %d
", id)
return nil
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
userIDs := []int{1, 2, 3, 4, 5}
for _, id := range userIDs {
id := id // 捕获循环变量
g.Go(func() error {
return fetchUser(ctx, id)
})
}
// 等待所有goroutines
if err := g.Wait(); err != nil {
fmt.Println("错误:", err)
}
}
Fan-Out Fan-In模式
分发工作和收集结果:
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := generator(1, 2, 3, 4, 5)
// Fan out
c1 := square(in)
c2 := square(in)
// Fan in
for n := range merge(c1, c2) {
fmt.Println(n)
}
}
何时使用此技能
使用go-concurrency当您需要:
- 并发执行多个操作
- 构建并发服务器或workers
- 实现生产者-消费者模式
- 并发处理数据流
- 同时处理多个I/O操作
- 实现超时和取消
- 协调多个goroutines
- 构建fan-out/fan-in管道
- 安全地在goroutines之间共享状态
- 实现速率限制或节流
最佳实践
- 使用channels进行通信,mutexes用于状态
- 仅从发送方关闭channels
- 始终使用WaitGroup等待goroutines
- 传递contexts进行取消和截止时间
- 明智地使用缓冲channels
- 使用mutexes保护共享状态
- 避免goroutine泄漏,正确清理
- 使用select带default进行非阻塞操作
- 优先使用sync.Once进行初始化
- 记录goroutine所有权和生命周期
常见陷阱
- Goroutine泄漏(忘记退出)
- 来自未保护共享状态的竞争条件
- 由于不当channel使用导致的死锁
- 在已关闭的channels上发送(引发panic)
- 不检查channel关闭状态
- 过度使用mutexes而非channels
- 创建太多goroutines
- 忘记调用WaitGroup.Done()
- 将循环变量传递给goroutines
- 不处理context取消