golang与Celery工作节点交互监控插件库celeriac的使用

Golang与Celery工作节点交互监控插件库celeriac的使用

Celeriac介绍

Celeriac是一个Golang客户端库,用于支持与Celery工作节点和任务进行交互和监控。它提供了将任务放入任务队列的功能,以及监控任务和工作节点事件的能力。

依赖项

该库依赖于以下包:

安装easyjson

$ go get -u github.com/mailru/easyjson/...

使用方法

安装:

go get github.com/svcavallar/celeriac.v1

这会导入一个名为celeriac的新命名空间。

完整示例

package main

import (
	"log"
	"os"

	"github.com/svcavallar/celeriac.v1"
)

func main() {
	taskBrokerURI := "amqp://user:pass@localhost:5672/vhost"

	// 连接到RabbitMQ任务队列
	TaskQueueMgr, err := celeriac.NewTaskQueueMgr(taskBrokerURI)
	if err != nil {
		log.Printf("Failed to connect to task queue: %v", err)
		os.Exit(-1)
	}

	log.Printf("Service connected to task queue - (URL: %s)", taskBrokerURI)

	// 监控Celery事件的goroutine
	go func() {
        for {
            select {
            default:
                ev := <-TaskQueueMgr.Monitor.EventsChannel

                if ev != nil {
                    if x, ok := ev.(*celeriac.WorkerEvent); ok {
                        log.Printf("Celery Event Channel: Worker event - %s [Hostname]: %s", x.Type, x.Hostname)
                    } else if x, ok := ev.(*celeriac.TaskEvent); ok {
                        log.Printf("Celery Event Channel: Task event - %s [ID]: %s", x.Type, x.UUID)
                    } else if x, ok := ev.(*celeriac.Event); ok {
                        log.Printf("Celery Event Channel: General event - %s [Hostname]: %s - [Data]: %v", x.Type, x.Hostname, x.Data)
                    } else {
                        log.Printf("Celery Event Channel: Unhandled event: %v", ev)
                    }
                }
            }
        }
	}()
}

分发任务

按名称分发

这将创建并分发一个包含提供数据的任务。任务将自动分配并由任务对象中返回的UUID标识。

// 分发新任务
taskName := "root.test.task"
taskData := map[string]interface{}{
	"foo": "bar"
}
routingKey := "root.test"

task, err := TaskQueueMgr.DispatchTask(taskName, taskData, routingKey)
if err != nil {
	log.Errorf("Failed to dispatch task to queue: %v", err)
}

按ID和名称分发

这将创建并分发一个包含提供数据的任务,并由用户提供的任务标识符标识。

// 分发新任务
taskID := "my_task_id_123456789"
taskName := "root.test.task"
taskData := map[string]interface{}{
	"foo": "bar"
}
routingKey := "root.test"

task, err := TaskQueueMgr.DispatchTaskWithID(taskID, taskName, taskData, routingKey)
if err != nil {
	log.Errorf("Failed to dispatch task to queue: %v", err)
}

修改task_event.go

如果修改了task_event.go中任何结构的属性,需要重新生成该文件的easyjson版本。可以通过以下命令实现:

$ easyjson -all task_eventtest.go

自动处理Redis后端结果

如果使用Redis后端存储结果,可以通过订阅Redis keyspace事件轻松处理新的/更新的条目。这可以节省轮询结果的开销,并且可以通过使用golang辅助包go-redis-event-sink方便地集成。

只需提供要监视的Celery任务命名掩码模式:celery-task-meta-*


更多关于golang与Celery工作节点交互监控插件库celeriac的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang与Celery工作节点交互监控插件库celeriac的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用celeriac监控Golang与Celery工作节点交互

celeriac是一个Golang库,用于与Celery工作节点交互并监控其状态。它允许Golang应用程序订阅Celery事件,获取任务状态和工作节点信息。

安装celeriac

go get github.com/svcodestream/celeriac

基本用法

1. 创建监控器

package main

import (
	"log"
	"time"

	"github.com/svcodestream/celeriac"
)

func main() {
	// 创建监控器
	monitor, err := celeriac.NewMonitor(
		"amqp://guest:guest@localhost:5672//", // RabbitMQ地址
		celeriac.MonitorBufferSize(100),      // 事件缓冲区大小
	)
	if err != nil {
		log.Fatalf("Failed to create monitor: %v", err)
	}
	defer monitor.Close()

	// 启动监控器
	go monitor.Start()

	// 处理事件
	for {
		select {
		case event := <-monitor.Events():
			handleEvent(event)
		case <-time.After(30 * time.Second):
			log.Println("No events received in the last 30 seconds")
		}
	}
}

func handleEvent(event celeriac.Event) {
	switch e := event.(type) {
	case *celeriac.WorkerOnlineEvent:
		log.Printf("Worker %s came online", e.Hostname)
	case *celeriac.WorkerOfflineEvent:
		log.Printf("Worker %s went offline", e.Hostname)
	case *celeriac.TaskReceivedEvent:
		log.Printf("Task %s received by worker %s", e.UUID, e.Hostname)
	case *celeriac.TaskStartedEvent:
		log.Printf("Task %s started on worker %s", e.UUID, e.Hostname)
	case *celeriac.TaskSucceededEvent:
		log.Printf("Task %s succeeded with result: %v", e.UUID, e.Result)
	case *celeriac.TaskFailedEvent:
		log.Printf("Task %s failed: %s", e.UUID, e.Exception)
	case *celeriac.TaskRetriedEvent:
		log.Printf("Task %s retried: %s", e.UUID, e.Exception)
	default:
		log.Printf("Received unknown event: %T", e)
	}
}

2. 获取工作节点信息

// 获取所有工作节点
workers := monitor.GetWorkers()
for _, worker := range workers {
	log.Printf("Worker: %s, Active: %v, Processed: %d", 
		worker.Hostname, worker.IsActive(), worker.TotalTasksProcessed())
}

// 获取特定工作节点
if worker, exists := monitor.GetWorker("worker1@host"); exists {
	log.Printf("Worker %s last seen at %v", worker.Hostname, worker.LastSeen())
}

3. 获取任务状态

// 获取所有已知任务
tasks := monitor.GetTasks()
for _, task := range tasks {
	log.Printf("Task %s: %s, Worker: %s", task.UUID, task.State, task.Worker)
}

// 获取特定任务
if task, exists := monitor.GetTask("task-uuid-123"); exists {
	log.Printf("Task %s state: %s", task.UUID, task.State)
}

高级功能

自定义事件处理

// 创建自定义事件处理器
type CustomEventHandler struct{}

func (h *CustomEventHandler) HandleEvent(event celeriac.Event) {
	switch e := event.(type) {
	case *celeriac.TaskSucceededEvent:
		// 只处理成功任务
		log.Printf("Custom handler: Task %s succeeded", e.UUID)
	}
}

// 注册处理器
monitor.AddEventHandler(&CustomEventHandler{})

过滤特定事件

// 只接收特定类型的事件
filteredEvents := monitor.FilterEvents(
	func(event celeriac.Event) bool {
		_, isTaskEvent := event.(celeriac.TaskEvent)
		return isTaskEvent
	},
)

go func() {
	for event := range filteredEvents {
		handleEvent(event)
	}
}()

最佳实践

  1. 错误处理:始终检查监控器创建和事件处理中的错误
  2. 资源清理:使用defer确保监控器正确关闭
  3. 并发安全:celeriac是并发安全的,可以在多个goroutine中使用
  4. 性能考虑:对于高负载系统,适当调整缓冲区大小
  5. 重连逻辑:实现网络中断时的自动重连机制

注意事项

  1. celeriac需要访问Celery使用的同一个RabbitMQ实例
  2. Celery必须配置为发送事件(CELERY_SEND_EVENTS=True)
  3. 监控器不会持久化事件,重启后会丢失历史数据

通过celeriac,Golang应用程序可以有效地监控Celery集群,实现任务跟踪、工作节点监控和系统集成等功能。

回到顶部