golang高效管理Goroutine并发池插件库tunny的使用
Golang高效管理Goroutine并发池插件库tunny的使用
Tunny是一个用于创建和管理goroutine池的Golang库,它允许你通过同步API限制来自任意数量goroutine的工作。
为什么使用Tunny
固定大小的goroutine池在以下场景非常有用:当工作来自任意数量的异步源,但并行处理能力有限时。例如,在处理CPU密集型的HTTP请求作业时,你可以创建一个与CPU数量匹配的池。
安装
使用go get安装:
go get github.com/Jeffail/tunny
或者使用dep:
dep ensure -add github.com/Jeffail/tunny
基本使用
对于大多数情况,你的繁重工作可以用一个简单的func()
表示,这时可以使用NewFunc
。以下是一个HTTP请求处理CPU密集型工作的示例:
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: 在这里对payload进行CPU密集型操作
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
}
defer r.Body.Close()
// 将工作放入池中。这个调用是同步的,会阻塞直到作业完成
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
超时处理
Tunny支持超时。你可以将上面的Process
调用替换为:
result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
http.Error(w, "Request timed out", http.StatusRequestTimeout)
}
或者使用请求的context来处理超时和截止时间:
result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
http.Error(w, "Request timed out", http.StatusRequestTimeout)
}
动态调整池大小
Tunny池的大小可以随时通过SetSize(int)
改变:
pool.SetSize(10) // 10个goroutine
pool.SetSize(100) // 100个goroutine
这个操作是线程安全的,即使其他goroutine仍在处理也可以安全调用。
带状态的Goroutine
有时Tunny池中的每个goroutine都需要自己的管理状态。在这种情况下,你应该实现tunny.Worker
接口,它包括终止、中断(在作业超时且不再需要时)和阻塞下一个作业分配直到满足条件的方法。
当使用Worker
类型创建池时,你需要提供一个构造函数来生成你的自定义实现:
pool := tunny.New(poolSize, func() Worker {
// TODO: 在这里进行每个goroutine的状态分配
return newCustomWorker()
})
这允许Tunny在池大小改变时干净地创建和销毁Worker
类型。
处理顺序
积压的作业不保证按顺序处理。由于当前通道和select块的实现方式,积压的作业栈将作为FIFO队列处理。然而,这种行为不是规范的一部分,不应依赖于此。
更多关于golang高效管理Goroutine并发池插件库tunny的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高效管理Goroutine并发池插件库tunny的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang高效管理Goroutine并发池插件库tunny使用指南
什么是tunny
tunny是一个轻量级的Goroutine池实现,它可以帮助开发者高效管理并发任务,避免频繁创建和销毁Goroutine带来的性能开销。tunny的主要特点包括:
- 固定数量的工作Goroutine
- 任务队列机制
- 支持超时控制
- 简单的API设计
tunny基本使用
安装tunny
go get github.com/Jeffail/tunny
基本示例
package main
import (
"fmt"
"time"
"github.com/Jeffail/tunny"
)
func main() {
// 创建一个包含3个工作Goroutine的池
pool := tunny.NewFunc(3, func(payload interface{}) interface{} {
// 这里是工作函数
n := payload.(int)
result := n * n
fmt.Printf("处理 %d => %d\n", n, result)
time.Sleep(1 * time.Second) // 模拟耗时操作
return result
})
defer pool.Close()
// 提交任务到池中
for i := 1; i <= 10; i++ {
go func(n int) {
// 同步调用
result, err := pool.ProcessTimed(n, 2*time.Second)
if err != nil {
fmt.Printf("任务 %d 处理超时: %v\n", n, err)
return
}
fmt.Printf("任务 %d 完成, 结果: %d\n", n, result.(int))
}(i)
}
// 等待所有任务完成
time.Sleep(5 * time.Second)
}
tunny高级特性
1. 动态调整池大小
pool.SetSize(5) // 将池大小调整为5
2. 异步处理
go func() {
result := pool.Process(10)
fmt.Println(result)
}()
3. 超时控制
result, err := pool.ProcessTimed(5, 1*time.Second)
if err != nil {
fmt.Println("处理超时:", err)
}
4. 获取池状态
queuedJobs := pool.QueueLength() // 获取队列中等待的任务数
实际应用示例
处理HTTP请求
package main
import (
"fmt"
"net/http"
"sync"
"github.com/Jeffail/tunny"
)
func main() {
// 创建处理池
pool := tunny.NewFunc(10, func(payload interface{}) interface{} {
req := payload.(*http.Request)
// 模拟处理请求
time.Sleep(500 * time.Millisecond)
return fmt.Sprintf("已处理请求: %s", req.URL.Path)
})
defer pool.Close()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 使用池处理请求
result, err := pool.ProcessTimed(r, 1*time.Second)
if err != nil {
http.Error(w, "处理超时", http.StatusRequestTimeout)
return
}
fmt.Fprintln(w, result)
})
fmt.Println("服务器启动在 :8080")
http.ListenAndServe(":8080", nil)
}
批量处理数据
package main
import (
"fmt"
"sync"
"time"
"github.com/Jeffail/tunny"
)
func main() {
// 创建池
pool := tunny.NewFunc(4, func(data interface{}) interface{} {
item := data.(string)
time.Sleep(500 * time.Millisecond) // 模拟处理时间
return fmt.Sprintf("处理后的-%s", item)
})
defer pool.Close()
// 待处理数据
items := []string{"A", "B", "C", "D", "E", "F", "G", "H"}
var wg sync.WaitGroup
results := make([]string, len(items))
for i, item := range items {
wg.Add(1)
go func(idx int, val string) {
defer wg.Done()
res, _ := pool.ProcessTimed(val, 1*time.Second)
results[idx] = res.(string)
}(i, item)
}
wg.Wait()
fmt.Println("处理结果:", results)
}
性能考虑
- 池大小选择:通常设置为CPU核心数的2-4倍
- 任务队列长度:默认无限制,大量任务可能导致内存问题
- 超时设置:根据任务特性设置合理超时
- 资源释放:记得调用Close()释放资源
与其他库的比较
- 相比
ants
:tunny更轻量,API更简单 - 相比
workerpool
:tunny支持超时控制 - 相比原生Goroutine:tunny避免了频繁创建销毁的开销
tunny是一个简单高效的Goroutine池实现,适合需要控制并发数量的场景,特别是当任务执行时间较长或任务量较大时,能显著提升性能。