golang与Celery工作节点交互监控插件库celeriac的使用
Golang与Celery工作节点交互监控插件库celeriac的使用
Celeriac介绍
Celeriac是一个Golang客户端库,用于支持与Celery工作节点和任务进行交互和监控。它提供了将任务放入任务队列的功能,以及监控任务和工作节点事件的能力。
依赖项
该库依赖于以下包:
- github.com/rabbitmq/amqp091-go
- github.com/sirupsen/logrus
- github.com/nu7hatch/gouuid
- github.com/mailru/easyjson
安装easyjson
$ go get -u github.com/mailru/easyjson/...
使用方法
安装:
go get github.com/svcavallar/celeriac.v1
这会导入一个名为celeriac
的新命名空间。
完整示例
package main
import (
"log"
"os"
"github.com/svcavallar/celeriac.v1"
)
func main() {
taskBrokerURI := "amqp://user:pass@localhost:5672/vhost"
// 连接到RabbitMQ任务队列
TaskQueueMgr, err := celeriac.NewTaskQueueMgr(taskBrokerURI)
if err != nil {
log.Printf("Failed to connect to task queue: %v", err)
os.Exit(-1)
}
log.Printf("Service connected to task queue - (URL: %s)", taskBrokerURI)
// 监控Celery事件的goroutine
go func() {
for {
select {
default:
ev := <-TaskQueueMgr.Monitor.EventsChannel
if ev != nil {
if x, ok := ev.(*celeriac.WorkerEvent); ok {
log.Printf("Celery Event Channel: Worker event - %s [Hostname]: %s", x.Type, x.Hostname)
} else if x, ok := ev.(*celeriac.TaskEvent); ok {
log.Printf("Celery Event Channel: Task event - %s [ID]: %s", x.Type, x.UUID)
} else if x, ok := ev.(*celeriac.Event); ok {
log.Printf("Celery Event Channel: General event - %s [Hostname]: %s - [Data]: %v", x.Type, x.Hostname, x.Data)
} else {
log.Printf("Celery Event Channel: Unhandled event: %v", ev)
}
}
}
}
}()
}
分发任务
按名称分发
这将创建并分发一个包含提供数据的任务。任务将自动分配并由任务对象中返回的UUID标识。
// 分发新任务
taskName := "root.test.task"
taskData := map[string]interface{}{
"foo": "bar"
}
routingKey := "root.test"
task, err := TaskQueueMgr.DispatchTask(taskName, taskData, routingKey)
if err != nil {
log.Errorf("Failed to dispatch task to queue: %v", err)
}
按ID和名称分发
这将创建并分发一个包含提供数据的任务,并由用户提供的任务标识符标识。
// 分发新任务
taskID := "my_task_id_123456789"
taskName := "root.test.task"
taskData := map[string]interface{}{
"foo": "bar"
}
routingKey := "root.test"
task, err := TaskQueueMgr.DispatchTaskWithID(taskID, taskName, taskData, routingKey)
if err != nil {
log.Errorf("Failed to dispatch task to queue: %v", err)
}
修改task_event.go
如果修改了task_event.go
中任何结构的属性,需要重新生成该文件的easyjson
版本。可以通过以下命令实现:
$ easyjson -all task_eventtest.go
自动处理Redis后端结果
如果使用Redis后端存储结果,可以通过订阅Redis keyspace事件轻松处理新的/更新的条目。这可以节省轮询结果的开销,并且可以通过使用golang辅助包go-redis-event-sink
方便地集成。
只需提供要监视的Celery任务命名掩码模式:celery-task-meta-*
更多关于golang与Celery工作节点交互监控插件库celeriac的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang与Celery工作节点交互监控插件库celeriac的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用celeriac监控Golang与Celery工作节点交互
celeriac是一个Golang库,用于与Celery工作节点交互并监控其状态。它允许Golang应用程序订阅Celery事件,获取任务状态和工作节点信息。
安装celeriac
go get github.com/svcodestream/celeriac
基本用法
1. 创建监控器
package main
import (
"log"
"time"
"github.com/svcodestream/celeriac"
)
func main() {
// 创建监控器
monitor, err := celeriac.NewMonitor(
"amqp://guest:guest@localhost:5672//", // RabbitMQ地址
celeriac.MonitorBufferSize(100), // 事件缓冲区大小
)
if err != nil {
log.Fatalf("Failed to create monitor: %v", err)
}
defer monitor.Close()
// 启动监控器
go monitor.Start()
// 处理事件
for {
select {
case event := <-monitor.Events():
handleEvent(event)
case <-time.After(30 * time.Second):
log.Println("No events received in the last 30 seconds")
}
}
}
func handleEvent(event celeriac.Event) {
switch e := event.(type) {
case *celeriac.WorkerOnlineEvent:
log.Printf("Worker %s came online", e.Hostname)
case *celeriac.WorkerOfflineEvent:
log.Printf("Worker %s went offline", e.Hostname)
case *celeriac.TaskReceivedEvent:
log.Printf("Task %s received by worker %s", e.UUID, e.Hostname)
case *celeriac.TaskStartedEvent:
log.Printf("Task %s started on worker %s", e.UUID, e.Hostname)
case *celeriac.TaskSucceededEvent:
log.Printf("Task %s succeeded with result: %v", e.UUID, e.Result)
case *celeriac.TaskFailedEvent:
log.Printf("Task %s failed: %s", e.UUID, e.Exception)
case *celeriac.TaskRetriedEvent:
log.Printf("Task %s retried: %s", e.UUID, e.Exception)
default:
log.Printf("Received unknown event: %T", e)
}
}
2. 获取工作节点信息
// 获取所有工作节点
workers := monitor.GetWorkers()
for _, worker := range workers {
log.Printf("Worker: %s, Active: %v, Processed: %d",
worker.Hostname, worker.IsActive(), worker.TotalTasksProcessed())
}
// 获取特定工作节点
if worker, exists := monitor.GetWorker("worker1@host"); exists {
log.Printf("Worker %s last seen at %v", worker.Hostname, worker.LastSeen())
}
3. 获取任务状态
// 获取所有已知任务
tasks := monitor.GetTasks()
for _, task := range tasks {
log.Printf("Task %s: %s, Worker: %s", task.UUID, task.State, task.Worker)
}
// 获取特定任务
if task, exists := monitor.GetTask("task-uuid-123"); exists {
log.Printf("Task %s state: %s", task.UUID, task.State)
}
高级功能
自定义事件处理
// 创建自定义事件处理器
type CustomEventHandler struct{}
func (h *CustomEventHandler) HandleEvent(event celeriac.Event) {
switch e := event.(type) {
case *celeriac.TaskSucceededEvent:
// 只处理成功任务
log.Printf("Custom handler: Task %s succeeded", e.UUID)
}
}
// 注册处理器
monitor.AddEventHandler(&CustomEventHandler{})
过滤特定事件
// 只接收特定类型的事件
filteredEvents := monitor.FilterEvents(
func(event celeriac.Event) bool {
_, isTaskEvent := event.(celeriac.TaskEvent)
return isTaskEvent
},
)
go func() {
for event := range filteredEvents {
handleEvent(event)
}
}()
最佳实践
- 错误处理:始终检查监控器创建和事件处理中的错误
- 资源清理:使用defer确保监控器正确关闭
- 并发安全:celeriac是并发安全的,可以在多个goroutine中使用
- 性能考虑:对于高负载系统,适当调整缓冲区大小
- 重连逻辑:实现网络中断时的自动重连机制
注意事项
- celeriac需要访问Celery使用的同一个RabbitMQ实例
- Celery必须配置为发送事件(CELERY_SEND_EVENTS=True)
- 监控器不会持久化事件,重启后会丢失历史数据
通过celeriac,Golang应用程序可以有效地监控Celery集群,实现任务跟踪、工作节点监控和系统集成等功能。