1. 概述
话说,早几天一口气过了一遍 Golang 的并发机制:
这些机制是使用 Golang 并发编程的基础知识,但是要使用 Golang 编写出高效、易维护、高质量的代码,我们可以使用一些通用的“并发模式”。在 Golang 并发编程中大家经常用到的一些模式主要有:
Worker-Pool 模式
Producer-Consumer 模式
Pipeline 模式
Event-Driven 模式
Reactor 模式
Futures and Promises 模式
……
从 Worker-Pool 模式开始聊吧,Worker-Pool 模式是一种常见的并发设计模式,主要用于控制并发任务的数量,提高系统性能,以及更有效地管理系统资源。
如果在 GitHub 上搜索 Worker Pool 的开源实现,你会看到结果琳琅满目,五花八门,各有千秋,眼花缭乱。咱干脆试试看能不能让 GPT-4 来实现一个 Worker Pool,然后对比下 Star 数较高的开源实现,看下 GPT-4 “信手拈来”的 Worker Pool 能不能在功能集、易用性、性能上相比各种千星万星 GitHub 开源流行款更胜一筹。
下文通过 DevChat 和 GPT-4 交互,如果你对 DevChat 感兴趣,可以跳转下文浏览:
2. 设计
下面先来明确这个程序需要实现哪些功能,怎么实现。
2.1 让 GPT-4 给出功能点
先问下 DevChat 我应该实现哪些功能:
- DevChat 的回答文字版:
在Golang中实现一个高性能、功能丰富且简单易用的Worker Pool,你可能需要考虑以下功能点:
任务队列:你需要一个队列来存储待处理的任务。这个队列应该是线程安全的,以便多个goroutine可以并发地向队列中添加任务和从队列中取出任务。
工作线程:你需要一组工作线程(goroutine)来并发地执行任务。这些工作线程应该可以从任务队列中取出任务> 并执行。
线程池大小控制:你需要一种机制来控制线程池的大小,即同时运行的工作线程的数量。这可以帮助你控制并发级别,防止过度消耗系统资源。
优雅的关闭:你需要一种机制来优雅地关闭线程池,即在关闭线程池之前,确保所有已经开始的任务都已经完成。
错误处理:你需要一种机制来处理任务执行过程中的错误。这可能包括记录错误日志,或者将错误返回给任务的提交者。
任务超时控制:对于可能长时间运行的任务,你可能需要一种机制来控制任务的执行时间,如果任务运行时间超过了预设的超时时间,那么应该中止任务的执行。
任务优先级:如果你的应用场景中有不同优先级的任务,你可能需要一种机制来控制任务的执行顺序,确保高优先级的任务先于低优先级的任务执行。
任务结果获取:对于需要返回结果的任务,你需要一种机制来获取任务的执行结果。
以上就是实现一个Worker Pool可能需要考虑的功能点,具体的功能点可能会根据你的具体需求和应用场景有所不同。
酷,看起来清新脱俗,“花里胡哨”。目测完全实现这些功能涉及到的代码量会超出目前 GPT-4 的上下文限制,外加细节太多,直接让 GPT-4 上手写代码大概率拿不到满意的结果,还是继续只把 AI 当一个“编程助手”吧。
2.2 自己总结需求,再给 GPT 派活
1. 给项目起一个名字(起名字是最难的,纠结死人)
就叫 GoPool 吧,对应代码库就叫 gopool,放到 GitHub 上开源出去。
2. 用代码的方式总结需求
整个程序涉及到的几个核心对象是 Task、Worker 和 Pool。Pool 中需要维护一个 workers 栈(或者队列,不过栈更易引入“Worker 过期”等逻辑);Task 可以先用一个最简单的函数类型;tasks 似乎由 Pool 来维护会导致过多的 workers(goroutines)上下文切换,影响性能…… 不行,细节太多了,文字不足以描述我想在第一个版本中包含哪些功能(肯定不是上来就实现一个GA版本),或者我先写几行对象、接口定义,然后让 GPT 在此基础上逐步实现相关逻辑吧。
于是,我写了这些代码,有效行数不到50,看起来却金光闪闪有没有?
pool.go
type Pool struct {
// capacity represents the maximum number of workers the pool can have
capacity int
// running represents the number of workers that are currently running
running int
// workers represents a pool of workers that are executing tasks
workers WorkerStack
// lock represents a mutex to protect the workers slice
lock sync.Mutex
}
func (p *Pool) Submit(task Task) {
// TODO
}
func (p *Pool) Running() int {
return p.running
}
func (p *Pool) Cap() int {
return p.capacity
}
func (p *Pool) Release() {
// TODO
}
worker.go
type Task func()
type Worker struct {
// pool represents a reference to the pool that owns this worker
pool *Pool
// task represents the task that the worker will execute
task chan Task
// lastUsedTime represents the last time the worker was used
lastUsedTime time.Time
}
type WorkerStack interface {
Push(w *Worker)
Pop() *Worker
Len() int
}
type workerStack struct {
workers []*Worker
}
这个骨架是比较清晰的,基本结构体名字一出来,GitHub Copilot 就能自动补全相应属性和方法,包括注释了(没错,我写的为数不多的几十行代码里还有一大半是 Copilot 补全的)。这时候还是直接 Copilot 补全比较省口水,就不和 GPT 废话了。
3. 实现
关门,放 GPT-4!
3.1 你先随意发挥
- 我对 GPT 说:
我要用 Golang 实现一个 worker pool 程序,项目名叫做 GoPool,代码库是 gopool。在 pool.go 和 worker.go 中我已经放了一些相关代码,请在此基础上实现 GoPool 程序。
这里需要基于选中代码/文件来聊天,DevChat 的这个小功能还是很酷:
- GPT 回答:
首先,我们需要理解需求和已有代码:
用户需要在 Golang 中实现一个名为 GoPool 的 worker pool 程序。
项目的代码库名为 gopool,已有的代码文件包括 pool.go 和 worker.go。
pool.go 文件中定义了 Pool 结构体,包含了一些字段如 capacity(池的最大工作数)、running(当前运行的工作数)、workers(执行任务的工作池)和 lock(保护 workers 切片的互斥锁)。此外,还定义了一些方法,如 Submit(提交任务)、Running(获取当前运行的工作数)、Cap(获取池的最大工作数)和 Release(释放池)。
worker.go 文件中定义了 Task 类型(一个函数类型)、Worker 结构体(包含了一些字段如 pool(拥有此工作的池的引用)、task(工作将执行的任务)和 lastUsedTime(工作最后使用的时间))和 WorkerStack 接口(包含了一些方法如 Push(推入工作)、Pop(弹出工作)和 Len(获取工作数))。
基于以上信息,我们需要实现以下功能:
在 Pool 结构体的 Submit 方法中,实现任务的提交。
在 Pool 结构体的 Release 方法中,实现池的释放。
实现 WorkerStack 接口的方法。
下面是实现代码:
// pool.go package gopool import ( "sync" ) type Pool struct { capacity int running int workers WorkerStack lock sync.Mutex } func (p *Pool) Submit(task Task) { p.lock.Lock() defer p.lock.Unlock() if p.running < p.capacity { worker := p.workers.Pop() if worker == nil { worker = &Worker{ pool: p, task: make(chan Task), } } worker.task <- task p.running++ } else { // TODO: handle the case when the pool is full } } func (p *Pool) Running() int { return p.running } func (p *Pool) Cap() int { return p.capacity } func (p *Pool) Release() { p.lock.Lock() defer p.lock.Unlock() for p.workers.Len() > 0 { worker := p.workers.Pop() close(worker.task) } p.running = 0 }
// worker.go package gopool import ( "time" ) type Task func() type Worker struct { pool *Pool task chan Task lastUsedTime time.Time } type WorkerStack interface { Push(w *Worker) Pop() *Worker Len() int } type workerStack struct { workers []*Worker } func (ws *workerStack) Push(w *Worker) { ws.workers = append(ws.workers, w) } func (ws *workerStack) Pop() *Worker { if len(ws.workers) == 0 { return nil } w := ws.workers[len(ws.workers)-1] ws.workers = ws.workers[:len(ws.workers)-1] return w } func (ws *workerStack) Len() int { return len(ws.workers) }
注意:在 Submit 方法中,当池已满时,我们需要处理这种情况。这里我没有实现,因为这取决于你想如何处理这种情况(例如,你可能想要阻塞直到有可用的工作,或者你可能想要返回一个错误)。
不管三七二十一,先把代码替换到我的源文件中再说。(不过这里实际操作的时候还是要注意,如下图所示,多数时候“插入”是最有用的;有时候“替换全文件”也不错;写到后面其实先看 Diff,然后选择性接受 GPT 的代码还是非常实用)
这一轮聊天的成果是这个 Commit。
3.2 你得让 Worker 跑起来呀
- 我对 GPT 说:
看来 Worker 还需要一个 run() 方法用来异步执行 task。当这个 task 执行完成后,Worker 应该被放回 workStack。在 Submit() 方法中将 task 给到 worker 后,顺带调用一下这个 run() 方法吧。请继续编码。
GPT 给出了加上 run()
方法后的完整源文件,贴出来有点长,直接看增加的 run()
部分吧:
func (w *Worker) run() {
go func() {
for task := range w.task {
if task == nil {
return
}
task()
w.pool.lock.Lock()
w.pool.workers.Push(w)
w.pool.running--
w.pool.lock.Unlock()
}
}()
}
这一轮聊天的成果是这个 Commit。
3.3 你说说 Pool 满了怎么办
前面 GPT 给的代码中 Summit() 方法里有一段 TODO:
func (p *Pool) Submit(task Task) {
// ……
else {
// TODO: handle the case when the pool is full
}
}
满了怎么办?挺好办的,满了就等呀,空闲了再继续。于是,我想到了条件变量 sync.Cond
。那就不拐弯抹角,让 GPT 自由发挥了,给点明确的提示吧。
- 我对 GPT 说:
下面继续来实现 handle the case when the pool is full 相关逻辑吧。这里可以引入 sync.Cond 来实现,当 pool full 的时候就 Wait();同时 Worker 的 run() 方法中当 Worker 被放回到 Pool 中时,说明又有空闲的 Worker 可用了,这时候通过 Signal() 来唤醒 Wait() 的 goroutine。
这时候上下文越来越长了,GPT 的回答贴出来有点碍眼。直接看结果吧:
这一轮聊天的成果是这个 Commit。
这时候 Submit()
方法就变成了这样:
func (p *Pool) Submit(task Task) {
p.lock.Lock()
defer p.lock.Unlock()
for p.running == p.capacity {
p.cond.Wait()
}
worker := p.workers.Pop()
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task),
}
}
worker.task <- task
worker.run()
p.running++
}
看起来基本功能差不多了,继续添加 UT 吧。
3.4 你得把 UT 给我加上
- 我对 GPT 说:
接下来我想添加一个 pool_test.go 来放单元测试测一下 Submit() 的功能,请给出测试代码。
- GPT 回答:
为了测试
Submit()
方法,我们可以创建一个Pool
对象,然后提交一些任务,检查是否所有的任务都被正确地执行了。以下是测试代码:
// pool_test.go package gopool import ( "sync" "testing" ) func TestSubmit(t *testing.T) { var wg sync.WaitGroup p := NewPool(10) for i := 0; i < 20; i++ { wg.Add(1) task := func() { defer wg.Done() } p.Submit(task) } wg.Wait() if p.Running() != 0 { t.Errorf("Expected running workers to be 0, but got %d", p.Running()) } }
在这个测试中,我们首先创建了一个新的
Pool
对象,然后提交了 20 个任务。每个任务都会在完成时调用wg.Done()
。我们使用sync.WaitGroup
来确保所有的任务都被执行了。最后,我们检查Running()
方法的返回值是否为 0,如果不是,那么说明有一些任务没有被执行,测试就失败了。
我决定手动加上一条日志,把 for 循环改成这样:
for i := 0; i < 20; i++ {
wg.Add(1)
taskNum := i // Capture the task number
task := func() {
t.Logf("Executing task %d\n", taskNum)
defer wg.Done()
}
p.Submit(task)
}
这时候运行下单元测试,惨惨惨,死锁了。不过这时候我也不想去考 GPT 了,看到死锁我已经第一时间想到了 Channel 阻塞问题,也就是这段代码:
func (p *Pool) Submit(task Task) {
// ……
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task),
}
}
worker.task <- task
// ……
}
这里给 task
赋值了一个 make(chan Task)
,导致下面这行 worker.task <- task
直接阻塞了。随手改一下吧:
func (p *Pool) Submit(task Task) {
// ……
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task, 1),
}
}
worker.task <- task
// ……
}
没错,把 task: make(chan Task)
改成 task: make(chan Task, 1)
就可以了。
运行一下吧:
$ go test . -v
=== RUN TestSubmit
pool_test.go:16: Executing task 9
pool_test.go:16: Executing task 7
pool_test.go:16: Executing task 8
pool_test.go:16: Executing task 10
pool_test.go:16: Executing task 11
pool_test.go:16: Executing task 12
pool_test.go:16: Executing task 13
pool_test.go:16: Executing task 14
pool_test.go:16: Executing task 15
pool_test.go:16: Executing task 16
pool_test.go:16: Executing task 17
pool_test.go:16: Executing task 18
pool_test.go:16: Executing task 19
pool_test.go:16: Executing task 1
pool_test.go:16: Executing task 0
pool_test.go:16: Executing task 2
pool_test.go:16: Executing task 4
pool_test.go:16: Executing task 3
pool_test.go:16: Executing task 6
pool_test.go:16: Executing task 5
--- PASS: TestSubmit (0.00s)
PASS
ok github.com/devchat-ai/gopool 0.753s
这一轮聊天的成果是这个 Commit。
3.5 你再给我把文档补上
- 我对 GPT 说:
我已经初步实现了 GoPool 程序,请帮我完善 README.md 文件内容。
今天最后一轮聊天了,我决定把所有文件内容都丢给 GPT,让它自由发挥一下。
这一轮聊天的成果是这个 Commit。
4. 总结
经过一顿瞎聊,最终 GPT-4 写了差不多200行,我写了20行,Copilot 写了20行,加一起,完成了一个初级的 Golang worker pool 程序 GoPool。
本文发布时对应的代码版本看这里。
接下来几天我准备继续和 GPT-4 瞎聊,让它帮着把一个 worker pool 该有的功能都加上,并且不断优化性能,看能不能打造一款炫酷的“开源 Golang Goroutine/Worker Pool 库”。
当然,大伙有啥好想法,需求,或者 bug 反馈,欢迎直接提到 GitHub Issues
相关资源
微信公众号:微信号为“devchat-ai”,名称暂为“思码逸智能编程”
微信群:“思码逸智能编程”公众号内菜单栏点击“微信群”即可收到群二维码