跟着 GPT-4 从0到1学习 Golang 并发机制(二)

跟着 GPT-4 从0到1学习 Golang 并发机制(二)

·

9 min read

一、前言

话接上回《跟着 GPT-4 从0到1学习 Golang 并发机制(一)》,我想试试看 GPT-4 能不能当好一个编程老师,循循善诱,由浅入深,深入浅出…… 给我讲解下 Golang 里的并发机制。

今天继续当白纸,GPT 为笔,代码为颜料,挥笔泼墨,接着作画。

二、开聊

下文通过 DevChat 和 GPT-4 交互,如果你对 DevChat 感兴趣,可以跳转下文浏览:

2.1 Golang 中的 sync 包 - Mutex, RWMutex 和 WaitGroup

  • 我和 GPT 说:

我们继续来聊 sync 包吧,请结合代码示例介绍下 sync 包的主要功能。

  • GPT 说:

Go 语言的 sync 包提供了基本的同步原语,如互斥锁(Mutex)和读写锁(RWMutex),以及一些高级的同步原语,如 WaitGroup 和 Cond。

以下是一些使用 sync 包的示例:

  1. 使用 Mutex:Mutex 用于在多个 goroutine 之间同步访问共享资源。
package main

import (
    "fmt"
    "sync"
)

var (
    mutex   sync.Mutex
    balance int
)

func deposit(value int, wg *sync.WaitGroup) {
    mutex.Lock()
    fmt.Printf("Depositing %d to account with balance: %d\n", value, balance)
    balance += value
    mutex.Unlock()
    wg.Done()
}

func main() {
    balance = 1000
    var wg sync.WaitGroup
    wg.Add(2)
    go deposit(200, &wg)
    go deposit(100, &wg)
    wg.Wait()
    fmt.Printf("New Balance %d\n", balance)
}

在这个示例中,我们使用一个 Mutex 来同步访问一个银行账户的余额。每次存款时,我们都会锁定 Mutex,然后更新余额,最后解锁 Mutex。

  1. 使用 WaitGroup:WaitGroup 用于等待一组 goroutine 完成。
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    wg.Wait()
}

在这个示例中,我们创建了 5 个 worker goroutines,然后使用一个 WaitGroup 来等待它们全部完成。

  1. 使用 RWMutex:RWMutex 是一个读/写互斥锁。它可以被多个读者锁定或一个写者锁定,但不能同时被读者和写者锁定。
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    rwMutex sync.RWMutex
    value   int
)

func readValue(wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.RLock()
    fmt.Printf("Read value: %d\n", value)
    time.Sleep(1 * time.Second)
    rwMutex.RUnlock()
}

func writeValue(newValue int, wg *sync.WaitGroup) {
    defer wg.Done()
    rwMutex.Lock()
    fmt.Printf("Write value: %d\n", newValue)
    value = newValue
    rwMutex.Unlock()
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go writeValue(10, &wg)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go readValue(&wg)
    }
    wg.Wait()
}

在这个示例中,我们使用一个 RWMutex 来同步读写一个共享变量。我们创建了一个写者 goroutine 和多个读者 goroutines。写者会锁定 RWMutex 进行写操作,读者会锁定 RWMutex 进行读操作。由于 RWMutex 允许多个读者同时锁定,所以这些读者 goroutines 可以并行执行。

GPT 在这里介绍了互斥锁 Mutex、读写锁 RWMutex 和等待组 WaitGroup 的用法,都还算挺直观。不过在一开始 GPT 还提到了 Cond 条件变量,但是没有给出示例,可能是篇幅的原因,我们继续追问下。

2.2 条件变量 sync.Cond

  • 我又和 GPT 说:

在开始的时候你还提到了 sync 包中的 Cond,请继续结合代码示例介绍 Cond

  • GPT 回答:

sync.Cond 是一个条件变量,它可以让一组 goroutine 等待或者宣布某事件的发生。条件变量总是与互斥锁(sync.Mutexsync.RWMutex)一起使用。

以下是一个使用 sync.Cond 的示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var m sync.Mutex
    c := sync.NewCond(&m)
    queue := make([]interface{}, 0, 10)

    removeFromQueue := func(delay time.Duration) {
        time.Sleep(delay)
        c.L.Lock()
        queue = queue[1:]
        fmt.Println("Removed from queue")
        c.L.Unlock()
        c.Signal()
    }

    for i := 0; i < 10; i++ {
        c.L.Lock()
        for len(queue) == 2 {
            c.Wait()
        }
        fmt.Println("Adding to queue")
        queue = append(queue, struct{}{})
        go removeFromQueue(1 * time.Second)
        c.L.Unlock()
    }
}

在这个示例中,我们创建了一个队列和一个条件变量。我们在循环中向队列添加元素,如果队列的长度达到 2,我们就调用 c.Wait() 等待条件变量的信号。在另一个 goroutine 中,我们在一段时间后从队列中移除元素,并调用 c.Signal() 发送信号,这会唤醒等待的 goroutine。

注意,调用 c.Wait() 时,我们需要持有条件变量的锁。c.Wait() 会自动释放锁,并将 goroutine 放入等待队列。当 c.Signal() 被调用时,c.Wait() 会自动重新获取锁。这是为了保证在条件变量的信号被发送和接收之间,共享数据的状态不会改变。

我猜结合生产者-消费者模式会让 Cond 更好理解,试下吧:

  • 我又和 GPT 说:

看起来 Cond 在生产者-消费者模式中会很有用,能不能给一个相关的例子?

  • GPT 回答:

当然可以。以下是一个使用 sync.Cond 实现的生产者-消费者模型的示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Item struct {
    value string
}

func produce(queue *[]*Item, lock *sync.Mutex, cond *sync.Cond) {
    for i := 0; i < 10; i++ {
        lock.Lock()
        item := &Item{value: fmt.Sprintf("item-%d", i)}
        *queue = append(*queue, item)
        fmt.Printf("Produced %s\n", item.value)
        lock.Unlock()
        cond.Signal()
        time.Sleep(1 * time.Second)
    }
}

func consume(queue *[]*Item, lock *sync.Mutex, cond *sync.Cond) {
    for {
        lock.Lock()
        for len(*queue) == 0 {
            cond.Wait()
        }
        item := (*queue)[0]
        *queue = (*queue)[1:]
        fmt.Printf("Consumed %s\n", item.value)
        lock.Unlock()
    }
}

func main() {
    var lock sync.Mutex
    cond := sync.NewCond(&lock)
    queue := make([]*Item, 0, 10)

    go produce(&queue, &lock, cond)
    go consume(&queue, &lock, cond)

    time.Sleep(15 * time.Second)
}

在这个示例中,我们创建了一个生产者 goroutine 和一个消费者 goroutine。生产者每秒产生一个新的 Item 并添加到队列中,然后通过 cond.Signal() 唤醒消费者。消费者在队列为空时通过 cond.Wait() 等待,当收到生产者的信号时,消费者从队列中取出 Item 并处理。

注意,我们在调用 cond.Wait()cond.Signal() 时都需要持有锁,这是为了保证在条件变量的信号被发送和接收之间,队列的状态不会改变。

结合这两个示例,Cond 的用法就基本清晰了。这里面有几个小细节需要留一下,一个是 GPT 提到的 c.Wait() 会自动释放锁,并将 goroutine 放入等待队列,然后当 c.Signal() 被调用时,c.Wait() 会自动重新获取锁;另外一个是:

for len(*queue) == 0 {
    cond.Wait()
}

这里的逻辑是:

  1. 如果 len(*queue) == 0 条件成立,则执行 cond.Wait()
  2. c.Signal() 被调用时,c.Wait() 方法结束阻塞,并且尝试重新获得锁;
  3. c.Wait() 成功获得锁时,继续判断 len(*queue) == 0,从而决定是继续等待还是往下执行;

如果改成:

if len(*queue) == 0 {
    cond.Wait()
}

那就少了上述第三步判断,也就是不会再次尝试检测 len(*queue) == 0。这一步检测其实是必须的,因为从 c.Signal() 被调用后到另外一个 goroutine 中的 c.Wait() 重新获取锁的过程中,保证不了没有其他 goroutine 修改了 queue。当然在当前代码示例中并不容易看出来使用 if 的问题,因为消费者只有一个。

看下这个示例场景吧:

  1. 消费者 A 检查队列,发现队列为空,于是调用 c.Wait() 进入等待状态。
  2. 生产者向队列中添加一个元素,然后调用 c.Signal()。 消费者 A 收到信号,唤醒并尝试重新获取锁。然而,此时消费者 B 可能抢先一步获取了锁,并从队列中取走了元素。
  3. 消费者 B 释放锁。
  4. 消费者 A 获取到锁,然后尝试从队列中取出元素。但是,此时队列可能已经为空,因为消费者 B 可能已经取走了唯一的元素。

这里需要想清楚一个小细节,就是消费者 A 调用了 c.Wait() 进入等待状态时,并不是其他消费者都必须也在等待状态。

2.3 sync.Cond 的 Broadcast() 方法

我还想问一个和 sync.Cond 相关的问题,继续:

  • 我又问 GPT:

Signal() 方法会唤醒一个随机的 goroutine,那么对应的唤醒所有等待的 goroutine 应该用什么方法呢?具体的使用场景是什么?请举例介绍

  • GPT 回答:

sync.CondBroadcast() 方法可以唤醒所有等待的 goroutine。当你有一个事件会影响所有等待的 goroutine 时,你应该使用 Broadcast()

以下是一个使用 Broadcast() 的示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, cond *sync.Cond) {
    cond.L.Lock()
    cond.Wait()
    fmt.Printf("Worker %d fired\n", id)
    cond.L.Unlock()
}

func main() {
    var wg sync.WaitGroup
    var m sync.Mutex
    cond := sync.NewCond(&m)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, cond)
        }(i)
    }

    fmt.Println("Ready...Set...Go!")
    time.Sleep(2 * time.Second)
    cond.Broadcast()
    wg.Wait()
}

在这个示例中,我们创建了 10 个 worker goroutines,它们都在等待一个 "开始" 的信号。当我们调用 cond.Broadcast() 时,所有的 worker goroutines 都会收到这个信号,并开始执行。

这个模式在你需要等待所有 goroutines 同时开始执行时非常有用,例如在并行计算或者基准测试中。

挺有趣,哈哈~

2.4 原子操作 Atomic

  • 我和 GPT 说:

请继续介绍下 Golang 的 Atomic

  • GPT 回答:

Go 语言的 sync/atomic 包提供了底层的原子级内存操作,包括对整数类型和指针的原子加载(Load)、存储(Store)、增加(Add)、比较并交换(Compare and Swap,简称 CAS)等。

以下是一些使用 sync/atomic 的示例:

  1. 原子增加:我们可以使用 atomic.AddInt64 来原子地增加一个 int64 的值。
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

var (
    sum int64
)

func worker(wg *sync.WaitGroup) {
    defer wg.Done()
    atomic.AddInt64(&sum, 1)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1000)
    for i := 0; i < 1000; i++ {
        go worker(&wg)
    }
    wg.Wait()
    fmt.Println(sum) // Output: 1000
}

在这个示例中,我们创建了 1000 个 goroutine,每个 goroutine 都会原子地增加 sum 的值。

  1. 原子比较并交换:我们可以使用 atomic.CompareAndSwapInt64 来原子地比较并交换一个 int64 的值。
package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    var value int64 = 20
    fmt.Println(atomic.CompareAndSwapInt64(&value, 20, 21)) // Output: true
    fmt.Println(value) // Output: 21
}

在这个示例中,我们原子地比较 value 是否等于 20,如果等于,就将其设置为 21。

这些原子操作在并发编程中非常有用,特别是在你需要对一个变量进行复杂的并发操作,而不想使用互斥锁时。然而,请注意,原子操作通常需要更复杂的错误处理和更细致的设计。

  • 我又问 GPT:

atomic 操作是不是只是为了简化部分的互斥锁使用场景

原子操作和互斥锁都是用于在并发环境中保护共享数据的工具,但它们的使用场景和特性是不同的。

原子操作通常用于简单的、单一的读写操作,例如增加一个计数器或者更新一个标志。原子操作的优点是它们非常快速且不会导致 goroutine 阻塞。然而,原子操作不能用于保护多个操作构成的临界区,也不能用于同步多个 goroutine 的执行顺序。

互斥锁则可以用于保护复杂的临界区,例如一个操作序列或者一个数据结构的多个字段。互斥锁也可以用于同步多个 goroutine 的执行顺序。然而,互斥锁的操作比原子操作要慢,且可能导致 goroutine 阻塞。

所以,原子操作并不只是为了简化互斥锁的使用,而是为了提供一种更轻量级的同步机制。在选择使用原子操作还是互斥锁时,你需要根据你的具体需求和场景来决定。

  • GPT 回答:

原子操作和互斥锁都是用于在并发环境中保护共享数据的工具,但它们的使用场景和特性是不同的。

原子操作通常用于简单的、单一的读写操作,例如增加一个计数器或者更新一个标志。原子操作的优点是它们非常快速且不会导致 goroutine 阻塞。然而,原子操作不能用于保护多个操作构成的临界区,也不能用于同步多个 goroutine 的执行顺序。

互斥锁则可以用于保护复杂的临界区,例如一个操作序列或者一个数据结构的多个字段。互斥锁也可以用于同步多个 goroutine 的执行顺序。然而,互斥锁的操作比原子操作要慢,且可能导致 goroutine 阻塞。

所以,原子操作并不只是为了简化互斥锁的使用,而是为了提供一种更轻量级的同步机制。在选择使用原子操作还是互斥锁时,你需要根据你的具体需求和场景来决定。

2.5 context 包

  • 我又问 GPT:

请继续介绍下 Golang 里的 Context

  • GPT 回答:

Go 语言的 context 包提供了一种在 API 边界之间传递请求范围的值、取消信号和超时信息的方式。

以下是一些使用 context 的示例:

  1. 使用 WithCancel 创建可取消的 Context
package main

import (
    "context"
    "fmt"
    "time"
)

func operation(ctx context.Context) {
    select {
    case <-time.After(500 * time.Millisecond):
        fmt.Println("operation completed")
    case <-ctx.Done():
        fmt.Println("operation cancelled")
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go operation(ctx)
    time.Sleep(200 * time.Millisecond)
    cancel()
    time.Sleep(500 * time.Millisecond)
}

在这个示例中,我们创建了一个可取消的 Context,并传递给一个长时间运行的操作。我们在主 goroutine 中取消 Context,这会导致操作提前结束。

  1. 使用 WithDeadline 创建有超时的 Context
package main

import (
    "context"
    "fmt"
    "time"
)

func operation(ctx context.Context) {
    select {
    case <-time.After(1 * time.Second):
        fmt.Println("operation completed")
    case <-ctx.Done():
        fmt.Println("operation timeout")
    }
}

func main() {
    ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(500*time.Millisecond))
    go operation(ctx)
    time.Sleep(2 * time.Second)
}

在这个示例中,我们创建了一个有超时的 Context,并传递给一个长时间运行的操作。如果操作在超时时间内没有完成,Context 就会被取消,这会导致操作提前结束。

  1. 使用 WithValue 传递请求范围的值
package main

import (
    "context"
    "fmt"
)

func operation(ctx context.Context) {
    value := ctx.Value("key").(string)
    fmt.Println("operation completed with value:", value)
}

func main() {
    ctx := context.WithValue(context.Background(), "key", "some value")
    go operation(ctx)
    time.Sleep(1 * time.Second)
}

在这个示例中,我们使用 context.WithValue 创建了一个带有值的 Context,并传递给一个操作。操作可以从 Context 中获取这个> 值。

注意,context 包的设计目标是用于传递跨 API 边界的请求范围的数据,而不是在函数之间传递可选参数。如果你只是需要传递一些可选参数,你应该使用函数参数、结构体或者其他数据结构,而不是 context

挺直观,取消和超时看起来都很有用。那么“跨 API 边界的请求范围的数据”具体应该怎么理解呢?context.WithValue 真实使用场景是怎样的呢?

  • 我接着问 GPT:

请进一步解释下 context.WithValue 的使用场景,帮助我理解其和函数传参的差异。

  • GPT 回答:

context.WithValue 主要用于在 API 边界之间传递请求范围的数据。这些数据通常是和请求相关的,例如请求 ID、认证令牌、用户 ID 等。这些数据在处理请求的过程中可能会被多个函数或者多个 goroutine 使用。

以下是一个使用 context.WithValue 的示例:

package main

import (
    "context"
    "fmt"
    "net/http"
)

func middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        ctx := context.WithValue(r.Context(), "userID", "123")
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

func handler(w http.ResponseWriter, r *http.Request) {
    userID := r.Context().Value("userID").(string)
    fmt.Fprintf(w, "Hello, user %s\n", userID)
}

func main() {
    http.Handle("/", middleware(http.HandlerFunc(handler)))
    http.ListenAndServe(":8080", nil)
}

在这个示例中,我们创建了一个 HTTP 中间件,它会将用户 ID 添加到请求的 Context 中。然后,我们的处理函数可以从 Context 中获取用户 ID。

与函数参数不同,Context 可以在 API 边界之间传递数据,而不需要修改函数签名。这使得我们可以在不改变函数签名的情况下,向我们的程序添加新的跨 API 的数据。

然而,context.WithValue 应该谨慎使用。它不应该被用于在函数之间传递可选参数,或者在程序内部传递数据。如果你只是需要在函数之间传递一些数据,你应该使用函数参数、结构体或者其他数据结构,而不是 context

行。不过我想起来 context.TODO(),继续问下 GPT 吧。

  • 我又问 GPT:

再介绍下 context.TODO() 和 context.Background() 吧。

  • GPT 回答:

context.Background()context.TODO() 都是用于创建新的 Context 的函数,但它们的用途略有不同。

context.Background() 返回一个空的 Context,这个 Context 不能被取消,没有值,也没有超时。它通常用作主函数、初始化函数和测试代码中的顶级 Context,也用作其他 Context 的默认值。如果你不确定应该使用哪个 Context,你可以使用 context.Background()

context.TODO() 也返回一个空的 Context。这个函数主要用在当函数需要一个 Context 参数,但是还不清楚应该使用哪个 Context 时,或者在你计划将来要添加一个 Context 到函数签名,但是还没有添加时。context.TODO() 可以提醒你将来需要修改这部分代码。

以下是一个使用 context.TODO() 的示例:

package main

import (
    "context"
    "fmt"
)

func operation(ctx context.Context) {
    // ...
}

func main() {
    // TODO: create a proper Context
    operation(context.TODO())
    fmt.Println("operation completed")
}

在这个示例中,我们使用 context.TODO() 作为一个临时的 Context。将来,我们可能会创建一个具有超时或者取消功能的 Context,并替换掉 context.TODO()

  • 我又问 GPT:

你提到了 context 的分级,请详细介绍下相关知识点

  • GPT 回答:

在 Go 语言中,Context 是可以分级的,也就是说,你可以从一个已有的 Context 创建一个新的 Context。新的 Context 会继承父 Context 的所有特性,包括取消信号、超时时间和值。同时,你还可以为新的 Context 添加额外的特性。

以下是一些创建分级 Context 的函数:

  • context.WithCancel(parent Context) (ctx Context, cancel CancelFunc):创建一个新的 Context,这个 Context 可以被取消。当父 Context 被取消时,新的 Context 也会被取消。

  • context.WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc):创建一个新的 Context,这个 > Context 会在指定的时间点自动取消。当父 Context 被取消,或者当前时间超过了 deadline 时,新的 Context 会被取消。

  • context.WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc):创建一个新的 Context,这个 Context 会在指定的时间段后自动取消。当父 Context 被取消,或者从现在开始的 timeout 时间段过去后,新的 Context 会被取消。

  • context.WithValue(parent Context, key, val interface{}) Context:创建一个新的 Context,这个 Context 会包含一个与 key 关联的值。新的 Context 会继承父 Context 的所有值,并添加一个新的值。

以下是一个使用分级 Context 的示例:

package main

import (
    "context"
    "fmt"
    "time"
)

func operation(ctx context.Context) {
    select {
    case <-time.After(1 * time.Second):
        fmt.Println("operation completed")
    case <-ctx.Done():
        fmt.Println("operation cancelled")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()
    go operation(ctx)
    time.Sleep(2 * time.Second)
}

在这个示例中,我们从 context.Background() 创建了一个新的 Context,这个 Context 会在 500 毫秒后自动取消。然后,我们将这个 Context 传递给一个长时间运行的操作。当 Context 被取消时,操作会提前结束。

大致就是说当“子线程”需要管理“孙线程”的时候,可以通过分级 Context 的方式让爷爷依旧能管到孙子。

三、总结

Race Detector、Goroutine 泄露和内存模型这几个话题明天再接着聊吧。(欲催更,关注公众号“思码逸智能编程”!)