GO语言并发编程

# GO语言并发编程

# 1. 并发编程概述

GO语言通过Goroutine和Channel提供了强大的并发编程能力,遵循"通过通信来共享内存,而不是通过共享内存来通信"的设计理念。

# 1.1 并发 vs 并行

  • 并发:多个任务交替执行
  • 并行:多个任务同时执行

# 2. Goroutine

# 2.1 什么是Goroutine

Goroutine是GO语言的轻量级线程,由GO运行时管理,比传统线程更轻量。

# 2.2 创建Goroutine

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    fmt.Printf("Hello, %s!\n", name)
}

func main() {
    // 启动一个Goroutine
    go sayHello("Alice")
    
    // 主Goroutine继续执行
    fmt.Println("Main function")
    
    // 等待一下让Goroutine有机会执行
    time.Sleep(time.Second)
}

# 2.3 多个Goroutine

func main() {
    for i := 0; i < 5; i++ {
        go func(id int) {
            fmt.Printf("Goroutine %d\n", id)
        }(i)
    }
    
    time.Sleep(time.Second)
}

# 2.4 Goroutine的生命周期

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, j)
        time.Sleep(time.Millisecond * 500)
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // 启动3个worker
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送工作
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 收集结果
    for a := 1; a <= 9; a++ {
        <-results
    }
}

# 3. Channel(通道)

# 3.1 创建Channel

// 无缓冲通道
ch := make(chan int)

// 有缓冲通道
ch := make(chan int, 10)

# 3.2 发送和接收

func main() {
    ch := make(chan string)
    
    // 启动Goroutine发送数据
    go func() {
        ch <- "Hello from goroutine"
    }()
    
    // 主Goroutine接收数据
    msg := <-ch
    fmt.Println(msg)
}

# 3.3 通道方向

// 只发送通道
func sendOnly(ch chan<- int) {
    ch <- 42
}

// 只接收通道
func receiveOnly(ch <-chan int) {
    value := <-ch
    fmt.Println(value)
}

// 双向通道
func bidirectional(ch chan int) {
    ch <- 42
    value := <-ch
    fmt.Println(value)
}

# 3.4 通道关闭

func producer(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch) // 关闭通道
}

func consumer(ch chan int) {
    for value := range ch { // 遍历通道直到关闭
        fmt.Println("Received:", value)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}

# 4. Select语句

# 4.1 基本用法

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(time.Second)
        ch1 <- "one"
    }()
    
    go func() {
        time.Sleep(time.Second * 2)
        ch2 <- "two"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received", msg2)
        case <-time.After(time.Second * 3):
            fmt.Println("Timeout")
        }
    }
}

# 4.2 非阻塞选择

func main() {
    ch := make(chan string)
    
    select {
    case msg := <-ch:
        fmt.Println("Received:", msg)
    default:
        fmt.Println("No message received")
    }
}

# 5. 同步原语

# 5.1 WaitGroup

import "sync"

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保在函数结束时调用Done()
    
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    wg.Wait() // 等待所有Goroutine完成
    fmt.Println("All workers completed")
}

# 5.2 Mutex(互斥锁)

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) GetCount() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := SafeCounter{}
    
    for i := 0; i < 1000; i++ {
        go counter.Increment()
    }
    
    time.Sleep(time.Second)
    fmt.Println("Count:", counter.GetCount())
}

# 5.3 RWMutex(读写锁)

type DataStore struct {
    mu    sync.RWMutex
    data  map[string]string
}

func (ds *DataStore) Set(key, value string) {
    ds.mu.Lock()
    defer ds.mu.Unlock()
    ds.data[key] = value
}

func (ds *DataStore) Get(key string) (string, bool) {
    ds.mu.RLock()
    defer ds.mu.RUnlock()
    value, exists := ds.data[key]
    return value, exists
}

# 6. Context包

# 6.1 基本用法

import "context"

func worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Worker cancelled")
            return
        default:
            fmt.Println("Working...")
            time.Sleep(time.Millisecond * 500)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    
    go worker(ctx)
    
    time.Sleep(time.Second * 2)
    cancel() // 取消所有子Goroutine
    
    time.Sleep(time.Second)
}

# 6.2 超时控制

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
    defer cancel()
    
    go func() {
        time.Sleep(time.Second * 3)
        fmt.Println("This won't be printed")
    }()
    
    <-ctx.Done()
    fmt.Println("Context cancelled due to timeout")
}

# 7. 并发模式

# 7.1 生产者-消费者模式

func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
        time.Sleep(time.Millisecond * 100)
    }
    close(ch)
}

func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for value := range ch {
        fmt.Printf("Consumer %d: %d\n", id, value)
    }
}

func main() {
    ch := make(chan int, 5)
    var wg sync.WaitGroup
    
    go producer(ch)
    
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go consumer(i, ch, &wg)
    }
    
    wg.Wait()
}

# 7.2 工作池模式

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, j)
        time.Sleep(time.Millisecond * 500)
        results <- j * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // 启动workers
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 收集结果
    for a := 1; a <= numJobs; a++ {
        <-results
    }
}

# 8. 常见陷阱和最佳实践

# 8.1 避免Goroutine泄漏

// 错误示例
func leakyFunction() {
    go func() {
        for {
            // 无限循环,没有退出条件
        }
    }()
}

// 正确示例
func correctFunction() {
    done := make(chan bool)
    go func() {
        defer close(done)
        for {
            select {
            case <-done:
                return
            default:
                // 工作
            }
        }
    }()
    
    // 在适当的时候关闭done通道
    time.Sleep(time.Second)
    close(done)
}

# 8.2 避免竞态条件

// 错误示例
var counter int

func increment() {
    counter++ // 竞态条件
}

// 正确示例
var counter int
var mu sync.Mutex

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

# 8.3 使用缓冲通道避免死锁

// 可能导致死锁
ch := make(chan int)
ch <- 1 // 阻塞,因为没有接收者

// 使用缓冲通道
ch := make(chan int, 1)
ch <- 1 // 不会阻塞

# 9. 性能考虑

# 9.1 Goroutine开销

  • 每个Goroutine大约占用2KB内存
  • 可以轻松创建数百万个Goroutine

# 9.2 通道性能

  • 无缓冲通道适合同步
  • 有缓冲通道适合异步通信
  • 避免在热点路径中使用通道

# 9.3 锁的性能

  • 优先使用RWMutex进行读多写少的场景
  • 考虑使用原子操作替代锁
  • 避免锁的嵌套