golang高效后台任务处理插件库goworker的使用
goworker - Golang高效后台任务处理插件库
goworker是一个兼容Resque的、基于Go的后台工作程序。它允许您使用Ruby等表达性语言将作业推入队列,同时利用Go的效率和并发性来最小化作业延迟和成本。
安装
要安装goworker,使用以下命令:
go get github.com/benmanns/goworker
然后在您的worker中导入:
import "github.com/benmanns/goworker"
快速入门
基本使用
要创建一个worker,编写一个匹配以下签名的函数:
func(string, ...interface{}) error
并使用Register
方法注册它:
goworker.Register("MyClass", myFunc)
下面是一个打印参数的简单worker示例:
package main
import (
"fmt"
"github.com/benmanns/goworker"
)
func myFunc(queue string, args ...interface{}) error {
fmt.Printf("From %s, %v\n", queue, args)
return nil
}
func init() {
goworker.Register("MyClass", myFunc)
}
func main() {
if err := goworker.Work(); err != nil {
fmt.Println("Error:", err)
}
}
共享资源的Worker
要创建共享数据库池或其他资源的worker,可以使用闭包来共享变量:
package main
import (
"fmt"
"github.com/benmanns/goworker"
)
func newMyFunc(uri string) (func(queue string, args ...interface{}) error) {
foo := NewFoo(uri)
return func(queue string, args ...interface{}) error {
foo.Bar(args)
return nil
}
}
func init() {
goworker.Register("MyClass", newMyFunc("http://www.example.com/"))
}
func main() {
if err := goworker.Work(); err != nil {
fmt.Println("Error:", err)
}
}
带配置的Worker
下面是一个带配置的worker示例:
package main
import (
"fmt"
"github.com/benmanns/goworker"
)
func myFunc(queue string, args ...interface{}) error {
fmt.Printf("From %s, %v\n", queue, args)
return nil
}
func init() {
settings := goworker.WorkerSettings{
URI: "redis://localhost:6379/",
Connections: 100,
Queues: []string{"myqueue", "delimited", "queues"},
UseNumber: true,
ExitOnComplete: false,
Concurrency: 2,
Namespace: "resque:",
Interval: 5.0,
}
goworker.SetSettings(settings)
goworker.Register("MyClass", myFunc)
}
func main() {
if err := goworker.Work(); err != nil {
fmt.Println("Error:", err)
}
}
参数处理
goworker工作函数接收它们服务的队列和一个接口切片。要将它们用作其他函数的参数,请使用Go类型断言将它们转换为可用的类型:
// 期望 (int, string, float64)
func myFunc(queue, args ...interface{}) error {
idNum, ok := args[0].(json.Number)
if !ok {
return errorInvalidParam
}
id, err := idNum.Int64()
if err != nil {
return errorInvalidParam
}
name, ok := args[1].(string)
if !ok {
return errorInvalidParam
}
weightNum, ok := args[2].(json.Number)
if !ok {
return errorInvalidParam
}
weight, err := weightNum.Float64()
if err != nil {
return errorInvalidParam
}
doSomething(id, name, weight)
return nil
}
测试
对于测试,使用redis-cli
程序将作业插入Redis队列很有帮助:
redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","args":["hi","there"]}'
这将在myqueue
队列中插入100个MyClass
worker的作业。它等同于:
class MyClass
@queue = :myqueue
end
100.times do
Resque.enqueue MyClass, ['hi', 'there']
end
或者使用Go代码:
goworker.Enqueue(&goworker.Job{
Queue: "myqueue",
Payload: goworker.Payload{
Class: "MyClass",
Args: []interface{}{"hi", "there"},
},
})
配置选项
有几个标志控制goworker客户端的操作:
-queues="comma,delimited,queues"
- 唯一必需的标志。建议做法是使用不同的队列将Resque worker与goworker分开。-interval=5.0
- 指定在没有作业时轮询之间的等待时间。-concurrency=25
- 指定并发执行worker的数量。-connections=2
- 指定goworker将在轮询器和所有worker之间消耗的最大Redis连接数。-uri=redis://localhost:6379/
- 指定goworker从中轮询作业的Redis数据库的URI。-namespace=resque:
- 指定goworker从中检索作业并存储worker统计信息的命名空间。-exit-on-complete=false
- 当队列中没有作业剩余时退出goworker。
信号处理
要停止goworker,向进程发送QUIT
、TERM
或INT
信号。这将立即停止作业轮询。当前最多可以有$CONCURRENCY
个作业正在运行,这些作业将继续运行直到完成。
故障模式
与Resque一样,goworker不保证在进程关闭时作业的安全性。worker必须既是幂等的,又能在失败时容忍作业丢失。
如果进程被KILL
或系统故障杀死,轮询器缓冲区中可能有一个当前作业将丢失,队列或worker变量中没有任何表示。
更多关于golang高效后台任务处理插件库goworker的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高效后台任务处理插件库goworker的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang高效后台任务处理插件库goworker使用指南
goworker是一个基于Go语言的高效后台任务处理库,它允许你将耗时任务异步处理,提高应用程序的响应速度。goworker的设计灵感来自Ruby的Resque项目,使用Redis作为消息队列。
安装goworker
首先使用go get安装goworker:
go get github.com/benmanns/goworker
基本使用
1. 定义工作函数
package main
import (
"fmt"
"time"
"github.com/benmanns/goworker"
)
func myWorker(queue string, args ...interface{}) error {
fmt.Printf("Processing job from queue %s with args %v\n", queue, args)
// 模拟耗时任务
time.Sleep(2 * time.Second)
return nil
}
func init() {
// 注册工作函数
goworker.Register("MyWorker", myWorker)
}
func main() {
// 设置Redis连接信息
goworker.SetSettings(goworker.WorkerSettings{
URI: "redis://localhost:6379/",
Connections: 10,
Queues: []string{"myqueue", "delimited", "queues"},
UseNumber: true,
ExitOnComplete: false,
Concurrency: 5,
Namespace: "resque:",
Interval: 5.0,
})
// 启动worker
if err := goworker.Work(); err != nil {
fmt.Println("Error:", err)
}
}
2. 添加任务到队列
package main
import (
"fmt"
"time"
"github.com/gomodule/redigo/redis"
)
func enqueueJob() {
conn, err := redis.Dial("tcp", "localhost:6379")
if err != nil {
panic(err)
}
defer conn.Close()
// 将任务推入队列
_, err = conn.Do("LPUSH", "resque:queue:myqueue", `{"class":"MyWorker","args":["arg1", "arg2"]}`)
if err != nil {
panic(err)
}
fmt.Println("Job enqueued successfully")
}
func main() {
enqueueJob()
}
高级特性
1. 自定义中间件
goworker支持中间件模式,可以在任务执行前后添加自定义逻辑:
func loggingMiddleware(queue string, next goworker.WorkerFunc) goworker.WorkerFunc {
return func(queue string, args ...interface{}) error {
start := time.Now()
fmt.Printf("Starting job %v at %s\n", args, start)
err := next(queue, args...)
duration := time.Since(start)
if err != nil {
fmt.Printf("Job failed after %s: %v\n", duration, err)
} else {
fmt.Printf("Job completed in %s\n", duration)
}
return err
}
}
// 注册中间件
goworker.Middleware.Append(loggingMiddleware)
2. 错误处理
func myWorkerWithErrorHandling(queue string, args ...interface{}) error {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Recovered from panic: %v\n", r)
}
}()
// 业务逻辑
if len(args) < 1 {
return fmt.Errorf("insufficient arguments")
}
// 模拟可能panic的操作
_ = args[0].(string) // 类型断言可能panic
return nil
}
3. 配置选项详解
goworker提供了丰富的配置选项:
goworker.SetSettings(goworker.WorkerSettings{
// Redis连接URI
URI: "redis://:password@localhost:6379/0",
// 最大连接数
Connections: 20,
// 监听的队列列表,按优先级排序
Queues: []string{"critical", "high", "medium", "low"},
// 并发worker数量
Concurrency: 10,
// Redis键名前缀
Namespace: "myapp:resque:",
// 轮询间隔(秒)
Interval: 2.5,
// 是否在队列为空时退出
ExitOnComplete: false,
// 是否将数字解码为json.Number而非float64
UseNumber: true,
// 跳过fork过程(在Windows上需要设置为true)
SkipFork: runtime.GOOS == "windows",
})
生产环境建议
- 监控:集成Prometheus或StatsD监控任务执行情况
- 日志:使用结构化日志库如logrus或zap记录任务执行日志
- 错误处理:实现重试机制和死信队列处理失败任务
- 资源限制:根据服务器资源合理设置并发数
- 优雅关闭:实现信号处理确保任务完成后再退出
goworker是一个简单但功能强大的后台任务处理库,特别适合需要与Redis集成的场景。对于更复杂的分布式任务处理,也可以考虑使用其他框架如Machinery或Asynq。