用 Golang 从0到1实现一个高性能的 Worker Pool(一) - 每天5分钟玩转 GPT 编程系列(3)

用 Golang 从0到1实现一个高性能的 Worker Pool(一) - 每天5分钟玩转 GPT 编程系列(3)

·

7 min read

1. 概述

话说,早几天一口气过了一遍 Golang 的并发机制:

这些机制是使用 Golang 并发编程的基础知识,但是要使用 Golang 编写出高效、易维护、高质量的代码,我们可以使用一些通用的“并发模式”。在 Golang 并发编程中大家经常用到的一些模式主要有:

  • Worker-Pool 模式

  • Producer-Consumer 模式

  • Pipeline 模式

  • Event-Driven 模式

  • Reactor 模式

  • Futures and Promises 模式

  • ……

Worker-Pool 模式开始聊吧,Worker-Pool 模式是一种常见的并发设计模式,主要用于控制并发任务的数量,提高系统性能,以及更有效地管理系统资源。

如果在 GitHub 上搜索 Worker Pool 的开源实现,你会看到结果琳琅满目,五花八门,各有千秋,眼花缭乱。咱干脆试试看能不能让 GPT-4 来实现一个 Worker Pool,然后对比下 Star 数较高的开源实现,看下 GPT-4 “信手拈来”的 Worker Pool 能不能在功能集、易用性、性能上相比各种千星万星 GitHub 开源流行款更胜一筹。

下文通过 DevChat 和 GPT-4 交互,如果你对 DevChat 感兴趣,可以跳转下文浏览:

2. 设计

下面先来明确这个程序需要实现哪些功能,怎么实现。

2.1 让 GPT-4 给出功能点

先问下 DevChat 我应该实现哪些功能:

  • DevChat 的回答文字版:

在Golang中实现一个高性能、功能丰富且简单易用的Worker Pool,你可能需要考虑以下功能点:

  1. 任务队列:你需要一个队列来存储待处理的任务。这个队列应该是线程安全的,以便多个goroutine可以并发地向队列中添加任务和从队列中取出任务。

  2. 工作线程:你需要一组工作线程(goroutine)来并发地执行任务。这些工作线程应该可以从任务队列中取出任务> 并执行。

  3. 线程池大小控制:你需要一种机制来控制线程池的大小,即同时运行的工作线程的数量。这可以帮助你控制并发级别,防止过度消耗系统资源。

  4. 优雅的关闭:你需要一种机制来优雅地关闭线程池,即在关闭线程池之前,确保所有已经开始的任务都已经完成。

  5. 错误处理:你需要一种机制来处理任务执行过程中的错误。这可能包括记录错误日志,或者将错误返回给任务的提交者。

  6. 任务超时控制:对于可能长时间运行的任务,你可能需要一种机制来控制任务的执行时间,如果任务运行时间超过了预设的超时时间,那么应该中止任务的执行。

  7. 任务优先级:如果你的应用场景中有不同优先级的任务,你可能需要一种机制来控制任务的执行顺序,确保高优先级的任务先于低优先级的任务执行。

  8. 任务结果获取:对于需要返回结果的任务,你需要一种机制来获取任务的执行结果。

以上就是实现一个Worker Pool可能需要考虑的功能点,具体的功能点可能会根据你的具体需求和应用场景有所不同。

酷,看起来清新脱俗,“花里胡哨”。目测完全实现这些功能涉及到的代码量会超出目前 GPT-4 的上下文限制,外加细节太多,直接让 GPT-4 上手写代码大概率拿不到满意的结果,还是继续只把 AI 当一个“编程助手”吧。

2.2 自己总结需求,再给 GPT 派活

1. 给项目起一个名字(起名字是最难的,纠结死人)

就叫 GoPool 吧,对应代码库就叫 gopool,放到 GitHub 上开源出去。

2. 用代码的方式总结需求

整个程序涉及到的几个核心对象是 Task、Worker 和 Pool。Pool 中需要维护一个 workers 栈(或者队列,不过栈更易引入“Worker 过期”等逻辑);Task 可以先用一个最简单的函数类型;tasks 似乎由 Pool 来维护会导致过多的 workers(goroutines)上下文切换,影响性能…… 不行,细节太多了,文字不足以描述我想在第一个版本中包含哪些功能(肯定不是上来就实现一个GA版本),或者我先写几行对象、接口定义,然后让 GPT 在此基础上逐步实现相关逻辑吧。

于是,我写了这些代码,有效行数不到50,看起来却金光闪闪有没有?

  • pool.go
type Pool struct {
     // capacity represents the maximum number of workers the pool can have
     capacity int
     // running represents the number of workers that are currently running
     running int
     // workers represents a pool of workers that are executing tasks
     workers WorkerStack
     // lock represents a mutex to protect the workers slice
     lock sync.Mutex
 }

 func (p *Pool) Submit(task Task) {
     // TODO
 }

 func (p *Pool) Running() int {
     return p.running
 }

 func (p *Pool) Cap() int {
     return p.capacity
 }

 func (p *Pool) Release() {
     // TODO
 }
  • worker.go
type Task func()

type Worker struct {
    // pool represents a reference to the pool that owns this worker
    pool *Pool
    // task represents the task that the worker will execute
    task chan Task
    // lastUsedTime represents the last time the worker was used
    lastUsedTime time.Time
}
type WorkerStack interface {
    Push(w *Worker)
    Pop() *Worker
    Len() int
}
type workerStack struct {
    workers []*Worker
}

这个骨架是比较清晰的,基本结构体名字一出来,GitHub Copilot 就能自动补全相应属性和方法,包括注释了(没错,我写的为数不多的几十行代码里还有一大半是 Copilot 补全的)。这时候还是直接 Copilot 补全比较省口水,就不和 GPT 废话了。

3. 实现

关门,放 GPT-4!

3.1 你先随意发挥

  • 我对 GPT 说:

我要用 Golang 实现一个 worker pool 程序,项目名叫做 GoPool,代码库是 gopool。在 pool.go 和 worker.go 中我已经放了一些相关代码,请在此基础上实现 GoPool 程序。

这里需要基于选中代码/文件来聊天,DevChat 的这个小功能还是很酷:

  • GPT 回答:

首先,我们需要理解需求和已有代码:

  • 用户需要在 Golang 中实现一个名为 GoPool 的 worker pool 程序。

  • 项目的代码库名为 gopool,已有的代码文件包括 pool.go 和 worker.go。

  • pool.go 文件中定义了 Pool 结构体,包含了一些字段如 capacity(池的最大工作数)、running(当前运行的工作数)、workers(执行任务的工作池)和 lock(保护 workers 切片的互斥锁)。此外,还定义了一些方法,如 Submit(提交任务)、Running(获取当前运行的工作数)、Cap(获取池的最大工作数)和 Release(释放池)。

  • worker.go 文件中定义了 Task 类型(一个函数类型)、Worker 结构体(包含了一些字段如 pool(拥有此工作的池的引用)、task(工作将执行的任务)和 lastUsedTime(工作最后使用的时间))和 WorkerStack 接口(包含了一些方法如 Push(推入工作)、Pop(弹出工作)和 Len(获取工作数))。

基于以上信息,我们需要实现以下功能:

  1. 在 Pool 结构体的 Submit 方法中,实现任务的提交。

  2. 在 Pool 结构体的 Release 方法中,实现池的释放。

  3. 实现 WorkerStack 接口的方法。

下面是实现代码:

// pool.go
package gopool

import (
    "sync"
)

type Pool struct {
    capacity int
    running  int
    workers  WorkerStack
    lock     sync.Mutex
}

func (p *Pool) Submit(task Task) {
    p.lock.Lock()
    defer p.lock.Unlock()

    if p.running < p.capacity {
        worker := p.workers.Pop()
        if worker == nil {
            worker = &Worker{
                pool: p,
                task: make(chan Task),
            }
        }
        worker.task <- task
        p.running++
    } else {
        // TODO: handle the case when the pool is full
    }
}

func (p *Pool) Running() int {
    return p.running
}

func (p *Pool) Cap() int {
    return p.capacity
}

func (p *Pool) Release() {
    p.lock.Lock()
    defer p.lock.Unlock()

    for p.workers.Len() > 0 {
        worker := p.workers.Pop()
        close(worker.task)
    }
    p.running = 0
}
// worker.go
package gopool

import (
    "time"
)

type Task func()

type Worker struct {
    pool         *Pool
    task         chan Task
    lastUsedTime time.Time
}

type WorkerStack interface {
    Push(w *Worker)
    Pop() *Worker
    Len() int
}

type workerStack struct {
    workers []*Worker
}

func (ws *workerStack) Push(w *Worker) {
    ws.workers = append(ws.workers, w)
}

func (ws *workerStack) Pop() *Worker {
    if len(ws.workers) == 0 {
        return nil
    }
    w := ws.workers[len(ws.workers)-1]
    ws.workers = ws.workers[:len(ws.workers)-1]
    return w
}

func (ws *workerStack) Len() int {
    return len(ws.workers)
}

注意:在 Submit 方法中,当池已满时,我们需要处理这种情况。这里我没有实现,因为这取决于你想如何处理这种情况(例如,你可能想要阻塞直到有可用的工作,或者你可能想要返回一个错误)。

不管三七二十一,先把代码替换到我的源文件中再说。(不过这里实际操作的时候还是要注意,如下图所示,多数时候“插入”是最有用的;有时候“替换全文件”也不错;写到后面其实先看 Diff,然后选择性接受 GPT 的代码还是非常实用)

这一轮聊天的成果是这个 Commit

3.2 你得让 Worker 跑起来呀

  • 我对 GPT 说:

看来 Worker 还需要一个 run() 方法用来异步执行 task。当这个 task 执行完成后,Worker 应该被放回 workStack。在 Submit() 方法中将 task 给到 worker 后,顺带调用一下这个 run() 方法吧。请继续编码。

GPT 给出了加上 run() 方法后的完整源文件,贴出来有点长,直接看增加的 run() 部分吧:

func (w *Worker) run() {
     go func() {
         for task := range w.task {
             if task == nil {
                 return
             }
             task()
             w.pool.lock.Lock()
             w.pool.workers.Push(w)
             w.pool.running--
             w.pool.lock.Unlock()
         }
     }()
 }

这一轮聊天的成果是这个 Commit

3.3 你说说 Pool 满了怎么办

前面 GPT 给的代码中 Summit() 方法里有一段 TODO:

func (p *Pool) Submit(task Task) {
    // ……
    else {
        // TODO: handle the case when the pool is full
    }
}

满了怎么办?挺好办的,满了就等呀,空闲了再继续。于是,我想到了条件变量 sync.Cond。那就不拐弯抹角,让 GPT 自由发挥了,给点明确的提示吧。

  • 我对 GPT 说:

下面继续来实现 handle the case when the pool is full 相关逻辑吧。这里可以引入 sync.Cond 来实现,当 pool full 的时候就 Wait();同时 Worker 的 run() 方法中当 Worker 被放回到 Pool 中时,说明又有空闲的 Worker 可用了,这时候通过 Signal() 来唤醒 Wait() 的 goroutine。

这时候上下文越来越长了,GPT 的回答贴出来有点碍眼。直接看结果吧:

这一轮聊天的成果是这个 Commit

这时候 Submit() 方法就变成了这样:

func (p *Pool) Submit(task Task) {
     p.lock.Lock()
     defer p.lock.Unlock()

     for p.running == p.capacity {
         p.cond.Wait()
     }

     worker := p.workers.Pop()
     if worker == nil {
         worker = &Worker{
             pool: p,
             task: make(chan Task),
         }
     }
     worker.task <- task
     worker.run()
     p.running++
 }

看起来基本功能差不多了,继续添加 UT 吧。

3.4 你得把 UT 给我加上

  • 我对 GPT 说:

接下来我想添加一个 pool_test.go 来放单元测试测一下 Submit() 的功能,请给出测试代码。

  • GPT 回答:

为了测试 Submit() 方法,我们可以创建一个 Pool 对象,然后提交一些任务,检查是否所有的任务都被正确地执行了。以下是测试代码:

// pool_test.go
package gopool

import (
    "sync"
    "testing"
)

func TestSubmit(t *testing.T) {
    var wg sync.WaitGroup
    p := NewPool(10)

    for i := 0; i < 20; i++ {
        wg.Add(1)
        task := func() {
            defer wg.Done()
        }
        p.Submit(task)
    }

    wg.Wait()

    if p.Running() != 0 {
        t.Errorf("Expected running workers to be 0, but got %d", p.Running())
    }
}

在这个测试中,我们首先创建了一个新的 Pool 对象,然后提交了 20 个任务。每个任务都会在完成时调用 wg.Done()。我们使用 sync.WaitGroup 来确保所有的任务都被执行了。最后,我们检查 Running() 方法的返回值是否为 0,如果不是,那么说明有一些任务没有被执行,测试就失败了。

我决定手动加上一条日志,把 for 循环改成这样:

    for i := 0; i < 20; i++ {
        wg.Add(1)
        taskNum := i // Capture the task number
        task := func() {
            t.Logf("Executing task %d\n", taskNum)
            defer wg.Done()
        }
        p.Submit(task)
    }

这时候运行下单元测试,惨惨惨,死锁了。不过这时候我也不想去考 GPT 了,看到死锁我已经第一时间想到了 Channel 阻塞问题,也就是这段代码:

func (p *Pool) Submit(task Task) {
    // ……
    if worker == nil {
        worker = &Worker{
            pool: p,
            task: make(chan Task),
        }
    }
    worker.task <- task
    // ……
}

这里给 task 赋值了一个 make(chan Task),导致下面这行 worker.task <- task 直接阻塞了。随手改一下吧:

func (p *Pool) Submit(task Task) {
    // ……
    if worker == nil {
        worker = &Worker{
            pool: p,
            task: make(chan Task, 1),
        }
    }
    worker.task <- task
    // ……
}

没错,把 task: make(chan Task) 改成 task: make(chan Task, 1) 就可以了。

运行一下吧:

$ go test . -v
=== RUN   TestSubmit
    pool_test.go:16: Executing task 9
    pool_test.go:16: Executing task 7
    pool_test.go:16: Executing task 8
    pool_test.go:16: Executing task 10
    pool_test.go:16: Executing task 11
    pool_test.go:16: Executing task 12
    pool_test.go:16: Executing task 13
    pool_test.go:16: Executing task 14
    pool_test.go:16: Executing task 15
    pool_test.go:16: Executing task 16
    pool_test.go:16: Executing task 17
    pool_test.go:16: Executing task 18
    pool_test.go:16: Executing task 19
    pool_test.go:16: Executing task 1
    pool_test.go:16: Executing task 0
    pool_test.go:16: Executing task 2
    pool_test.go:16: Executing task 4
    pool_test.go:16: Executing task 3
    pool_test.go:16: Executing task 6
    pool_test.go:16: Executing task 5
--- PASS: TestSubmit (0.00s)
PASS
ok      github.com/devchat-ai/gopool    0.753s

这一轮聊天的成果是这个 Commit

3.5 你再给我把文档补上

  • 我对 GPT 说:

我已经初步实现了 GoPool 程序,请帮我完善 README.md 文件内容。

今天最后一轮聊天了,我决定把所有文件内容都丢给 GPT,让它自由发挥一下。

这一轮聊天的成果是这个 Commit

4. 总结

经过一顿瞎聊,最终 GPT-4 写了差不多200行,我写了20行,Copilot 写了20行,加一起,完成了一个初级的 Golang worker pool 程序 GoPool

本文发布时对应的代码版本看这里

接下来几天我准备继续和 GPT-4 瞎聊,让它帮着把一个 worker pool 该有的功能都加上,并且不断优化性能,看能不能打造一款炫酷的“开源 Golang Goroutine/Worker Pool 库”。

当然,大伙有啥好想法,需求,或者 bug 反馈,欢迎直接提到 GitHub Issues

相关资源

  • GitHub:github.com/devchat-ai

  • 注册网站:devchat.ai

  • 微信公众号:微信号为“devchat-ai”,名称暂为“思码逸智能编程”

  • 微信群:“思码逸智能编程”公众号内菜单栏点击“微信群”即可收到群二维码

  • Discord:discord.gg/9t3yrbBUXD