golang高效后台任务处理插件库goworker的使用

goworker - Golang高效后台任务处理插件库

goworker是一个兼容Resque的、基于Go的后台工作程序。它允许您使用Ruby等表达性语言将作业推入队列,同时利用Go的效率和并发性来最小化作业延迟和成本。

安装

要安装goworker,使用以下命令:

go get github.com/benmanns/goworker

然后在您的worker中导入:

import "github.com/benmanns/goworker"

快速入门

基本使用

要创建一个worker,编写一个匹配以下签名的函数:

func(string, ...interface{}) error

并使用Register方法注册它:

goworker.Register("MyClass", myFunc)

下面是一个打印参数的简单worker示例:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

共享资源的Worker

要创建共享数据库池或其他资源的worker,可以使用闭包来共享变量:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func newMyFunc(uri string) (func(queue string, args ...interface{}) error) {
	foo := NewFoo(uri)
	return func(queue string, args ...interface{}) error {
		foo.Bar(args)
		return nil
	}
}

func init() {
	goworker.Register("MyClass", newMyFunc("http://www.example.com/"))
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

带配置的Worker

下面是一个带配置的worker示例:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	settings := goworker.WorkerSettings{
		URI:            "redis://localhost:6379/",
		Connections:    100,
		Queues:         []string{"myqueue", "delimited", "queues"},
		UseNumber:      true,
		ExitOnComplete: false,
		Concurrency:    2,
		Namespace:      "resque:",
		Interval:       5.0,
	}
	goworker.SetSettings(settings)
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

参数处理

goworker工作函数接收它们服务的队列和一个接口切片。要将它们用作其他函数的参数,请使用Go类型断言将它们转换为可用的类型:

// 期望 (int, string, float64)
func myFunc(queue, args ...interface{}) error {
	idNum, ok := args[0].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	id, err := idNum.Int64()
	if err != nil {
		return errorInvalidParam
	}
	name, ok := args[1].(string)
	if !ok {
		return errorInvalidParam
	}
	weightNum, ok := args[2].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	weight, err := weightNum.Float64()
	if err != nil {
		return errorInvalidParam
	}
	doSomething(id, name, weight)
	return nil
}

测试

对于测试,使用redis-cli程序将作业插入Redis队列很有帮助:

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","args":["hi","there"]}'

这将在myqueue队列中插入100个MyClass worker的作业。它等同于:

class MyClass
  @queue = :myqueue
end

100.times do
  Resque.enqueue MyClass, ['hi', 'there']
end

或者使用Go代码:

goworker.Enqueue(&goworker.Job{
    Queue: "myqueue",
    Payload: goworker.Payload{
        Class: "MyClass",
        Args: []interface{}{"hi", "there"},
    },
})

配置选项

有几个标志控制goworker客户端的操作:

  • -queues="comma,delimited,queues" - 唯一必需的标志。建议做法是使用不同的队列将Resque worker与goworker分开。
  • -interval=5.0 - 指定在没有作业时轮询之间的等待时间。
  • -concurrency=25 - 指定并发执行worker的数量。
  • -connections=2 - 指定goworker将在轮询器和所有worker之间消耗的最大Redis连接数。
  • -uri=redis://localhost:6379/ - 指定goworker从中轮询作业的Redis数据库的URI。
  • -namespace=resque: - 指定goworker从中检索作业并存储worker统计信息的命名空间。
  • -exit-on-complete=false - 当队列中没有作业剩余时退出goworker。

信号处理

要停止goworker,向进程发送QUITTERMINT信号。这将立即停止作业轮询。当前最多可以有$CONCURRENCY个作业正在运行,这些作业将继续运行直到完成。

故障模式

与Resque一样,goworker不保证在进程关闭时作业的安全性。worker必须既是幂等的,又能在失败时容忍作业丢失。

如果进程被KILL或系统故障杀死,轮询器缓冲区中可能有一个当前作业将丢失,队列或worker变量中没有任何表示。


更多关于golang高效后台任务处理插件库goworker的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效后台任务处理插件库goworker的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang高效后台任务处理插件库goworker使用指南

goworker是一个基于Go语言的高效后台任务处理库,它允许你将耗时任务异步处理,提高应用程序的响应速度。goworker的设计灵感来自Ruby的Resque项目,使用Redis作为消息队列。

安装goworker

首先使用go get安装goworker:

go get github.com/benmanns/goworker

基本使用

1. 定义工作函数

package main

import (
	"fmt"
	"time"
	
	"github.com/benmanns/goworker"
)

func myWorker(queue string, args ...interface{}) error {
	fmt.Printf("Processing job from queue %s with args %v\n", queue, args)
	// 模拟耗时任务
	time.Sleep(2 * time.Second)
	return nil
}

func init() {
	// 注册工作函数
	goworker.Register("MyWorker", myWorker)
}

func main() {
	// 设置Redis连接信息
	goworker.SetSettings(goworker.WorkerSettings{
		URI:            "redis://localhost:6379/",
		Connections:    10,
		Queues:         []string{"myqueue", "delimited", "queues"},
		UseNumber:      true,
		ExitOnComplete: false,
		Concurrency:    5,
		Namespace:      "resque:",
		Interval:       5.0,
	})
	
	// 启动worker
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

2. 添加任务到队列

package main

import (
	"fmt"
	"time"
	
	"github.com/gomodule/redigo/redis"
)

func enqueueJob() {
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	
	// 将任务推入队列
	_, err = conn.Do("LPUSH", "resque:queue:myqueue", `{"class":"MyWorker","args":["arg1", "arg2"]}`)
	if err != nil {
		panic(err)
	}
	
	fmt.Println("Job enqueued successfully")
}

func main() {
	enqueueJob()
}

高级特性

1. 自定义中间件

goworker支持中间件模式,可以在任务执行前后添加自定义逻辑:

func loggingMiddleware(queue string, next goworker.WorkerFunc) goworker.WorkerFunc {
	return func(queue string, args ...interface{}) error {
		start := time.Now()
		fmt.Printf("Starting job %v at %s\n", args, start)
		
		err := next(queue, args...)
		
		duration := time.Since(start)
		if err != nil {
			fmt.Printf("Job failed after %s: %v\n", duration, err)
		} else {
			fmt.Printf("Job completed in %s\n", duration)
		}
		
		return err
	}
}

// 注册中间件
goworker.Middleware.Append(loggingMiddleware)

2. 错误处理

func myWorkerWithErrorHandling(queue string, args ...interface{}) error {
	defer func() {
		if r := recover(); r != nil {
			fmt.Printf("Recovered from panic: %v\n", r)
		}
	}()
	
	// 业务逻辑
	if len(args) < 1 {
		return fmt.Errorf("insufficient arguments")
	}
	
	// 模拟可能panic的操作
	_ = args[0].(string) // 类型断言可能panic
	
	return nil
}

3. 配置选项详解

goworker提供了丰富的配置选项:

goworker.SetSettings(goworker.WorkerSettings{
	// Redis连接URI
	URI: "redis://:password@localhost:6379/0",
	
	// 最大连接数
	Connections: 20,
	
	// 监听的队列列表,按优先级排序
	Queues: []string{"critical", "high", "medium", "low"},
	
	// 并发worker数量
	Concurrency: 10,
	
	// Redis键名前缀
	Namespace: "myapp:resque:",
	
	// 轮询间隔(秒)
	Interval: 2.5,
	
	// 是否在队列为空时退出
	ExitOnComplete: false,
	
	// 是否将数字解码为json.Number而非float64
	UseNumber: true,
	
	// 跳过fork过程(在Windows上需要设置为true)
	SkipFork: runtime.GOOS == "windows",
})

生产环境建议

  1. 监控:集成Prometheus或StatsD监控任务执行情况
  2. 日志:使用结构化日志库如logrus或zap记录任务执行日志
  3. 错误处理:实现重试机制和死信队列处理失败任务
  4. 资源限制:根据服务器资源合理设置并发数
  5. 优雅关闭:实现信号处理确保任务完成后再退出

goworker是一个简单但功能强大的后台任务处理库,特别适合需要与Redis集成的场景。对于更复杂的分布式任务处理,也可以考虑使用其他框架如Machinery或Asynq。

回到顶部