Golang实现RabbitMQ消费者的定时任务

Golang实现RabbitMQ消费者的定时任务 我有两个项目,第一个项目作为 RabbitMQ 的生产者,向队列发送数据。第二个项目作为消费者,需要持续监听队列中是否有新的数据,如果有,就将这些数据提交到数据库。我如何在 Go 语言中为我的第二个项目创建这个监听器?

目前我尝试了…

  1. RabbitMQ 消费者部分
  2. 使用 time.Ticker 构建一个调度器

结果: 停止服务器后,它又能正常工作,但由于这是一个基于 Web 的微服务,不可能停止服务器。

package main

import (
	"encoding/json"
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	ticker := time.NewTicker(time.Second * 5)
	go scheduler(ticker)
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	<-sigs
	ticker.Stop()
	//os.Exit(1)
}

func scheduler(ticker *time.Ticker) {
	for ;true;<- ticker.C {
		Consumers()
	}
}

func HandleError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

type RabbitLocationData struct {
	Domain             string  `json:"domain"`
	UserId             string  `json:"user_id"`
	ClientTimeStampUTC float64  `json:"client_timestamp_utc"`
	ServerTimeStampUTC float64  `json:"server_timestamp_utc"`
	Longitude          float64 `json:"lon"`
	Latitude           float64 `json:"lat"`
}

func (rld RabbitLocationData) ToString() string {
	result := fmt.Sprintf("RabbitLocationData{userId='%s', "+
		"clientTimestampUtc=%f, "+
		"serverTimestampUtc=%f, "+
		"latitude=%f, "+
		"longitude=%f, "+
		"domain='%s'"+
		"}",
		rld.UserId,
		rld.ClientTimeStampUTC,
		rld.ServerTimeStampUTC,
		rld.Latitude,
		rld.Longitude,
		rld.Domain)
	return result
}


func Consumers() {
	// Make a connection
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	HandleError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// Create a channel
	ch, err := conn.Channel()
	HandleError(err, "Failed to open a channel")
	defer ch.Close()

	// Declare a queue
	q, err := ch.QueueDeclare(
		"hello",
		false,
		false,
		false,
		false,
		nil,
	)
	HandleError(err, "could not declare 'hello' queue")

	err = ch.Qos(1, 0, false)

	HandleError(err, "could not configure Qos")

	msgs, err := ch.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	HandleError(err, "could not register consumer")
	stopChannel := make(chan bool)

	go func() {
		log.Printf("Consumer ready, PID: %d", os.Getpid())
		for d := range msgs {
			log.Printf("Received a message: %s", string(d.Body))
			locationData := &RabbitLocationData{}

			err := json.Unmarshal(d.Body, locationData)

			if err != nil {
				log.Printf("Error decoding JSON: %s", err)
			}

			log.Printf("Result: %s", locationData.ToString())

			if err := d.Ack(false); err != nil {
				log.Printf("Error acknowledging message: %s", err)
			} else {
				log.Printf("Acknowledged message")
			}
		}
	}()

	log.Printf("[*] waiting for messages. To exit press CTRL+C")
	<-stopChannel
}

更多关于Golang实现RabbitMQ消费者的定时任务的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

为什么你要每五秒运行一次 Consumers,而在这个函数中启动的 AMQP 消费者会持续接收 RabbitMQ 推送的消息?

只需打开一次 AMQP 连接和通道,启动消费者,在一个循环中消费由 AMQP 消息推送过来的消息,并处理每个接收到的消息。

更多关于Golang实现RabbitMQ消费者的定时任务的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我每五秒调用一次 Consumer,因为假设我向 RabbitMQ 队列推送了一些消息,那么当另一个项目运行时,它会在项目首次运行时消费那些已排队的消息。但在消费并确认这些消息后,它会等待新的消息。与此同时,队列会更新,因此第二个项目可以检查是否有新的消息可供消费。

RabbitMQ 的工作方式并非如此。如果有两个消费者从同一个队列消费消息,RabbitMQ 基本上会使用轮询算法向它们推送消息。可以配置消费者,使其只被推送最多指定数量的消息进行消费和确认(请搜索“consumer prefetch”),但基本上 RabbitMQ 会均匀地将消息推送给所有已连接的消费者。

只需编写你的应用程序,使其打开一个连接和一个通道,创建一个消费者并让它持续消费。如果你启动这个应用程序两次,RabbitMQ 将均匀地分发和推送消息给这两个消费者。

在Go语言中实现RabbitMQ消费者时,不应该使用定时器轮询的方式。RabbitMQ的消费者设计应该是持续监听队列,而不是定时检查。以下是修正后的实现:

package main

import (
	"encoding/json"
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"os"
	"os/signal"
	"syscall"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	HandleError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	HandleError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello",
		false,
		false,
		false,
		false,
		nil,
	)
	HandleError(err, "could not declare 'hello' queue")

	err = ch.Qos(1, 0, false)
	HandleError(err, "could not configure Qos")

	msgs, err := ch.Consume(
		q.Name,
		"",
		false, // 设置为false,手动确认消息
		false,
		false,
		false,
		nil,
	)
	HandleError(err, "could not register consumer")

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	log.Printf("Consumer ready, PID: %d", os.Getpid())
	log.Printf("[*] waiting for messages. To exit press CTRL+C")

	for {
		select {
		case d := <-msgs:
			log.Printf("Received a message: %s", string(d.Body))
			locationData := &RabbitLocationData{}

			err := json.Unmarshal(d.Body, locationData)
			if err != nil {
				log.Printf("Error decoding JSON: %s", err)
				d.Nack(false, false) // 解码失败,拒绝消息
				continue
			}

			log.Printf("Result: %s", locationData.ToString())

			// 这里添加数据库插入逻辑
			// err = insertToDatabase(locationData)
			// if err != nil {
			//     log.Printf("Error inserting to database: %s", err)
			//     d.Nack(false, true) // 数据库失败,重新入队
			//     continue
			// }

			if err := d.Ack(false); err != nil {
				log.Printf("Error acknowledging message: %s", err)
			} else {
				log.Printf("Acknowledged message")
			}

		case <-sigs:
			log.Println("Received termination signal, shutting down...")
			return
		}
	}
}

func HandleError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

type RabbitLocationData struct {
	Domain             string  `json:"domain"`
	UserId             string  `json:"user_id"`
	ClientTimeStampUTC float64 `json:"client_timestamp_utc"`
	ServerTimeStampUTC float64 `json:"server_timestamp_utc"`
	Longitude          float64 `json:"lon"`
	Latitude           float64 `json:"lat"`
}

func (rld RabbitLocationData) ToString() string {
	result := fmt.Sprintf("RabbitLocationData{userId='%s', "+
		"clientTimestampUtc=%f, "+
		"serverTimestampUtc=%f, "+
		"latitude=%f, "+
		"longitude=%f, "+
		"domain='%s'"+
		"}",
		rld.UserId,
		rld.ClientTimeStampUTC,
		rld.ServerTimeStampUTC,
		rld.Latitude,
		rld.Longitude,
		rld.Domain)
	return result
}

如果需要并发处理消息,可以使用goroutine池:

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	HandleError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	HandleError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello",
		false,
		false,
		false,
		false,
		nil,
	)
	HandleError(err, "could not declare 'hello' queue")

	err = ch.Qos(10, 0, false) // 增加预取数量
	HandleError(err, "could not configure Qos")

	msgs, err := ch.Consume(
		q.Name,
		"",
		false,
		false,
		false,
		false,
		nil,
	)
	HandleError(err, "could not register consumer")

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	workerCount := 5
	done := make(chan bool)

	for i := 0; i < workerCount; i++ {
		go func(workerID int) {
			for d := range msgs {
				log.Printf("Worker %d received message: %s", workerID, string(d.Body))
				
				locationData := &RabbitLocationData{}
				err := json.Unmarshal(d.Body, locationData)
				
				if err != nil {
					log.Printf("Worker %d error decoding JSON: %s", workerID, err)
					d.Nack(false, false)
					continue
				}

				log.Printf("Worker %d processing: %s", workerID, locationData.ToString())
				
				// 数据库插入逻辑
				// if err := insertToDatabase(locationData); err != nil {
				//     d.Nack(false, true)
				//     continue
				// }
				
				d.Ack(false)
			}
			done <- true
		}(i)
	}

	log.Printf("Started %d workers, waiting for messages...", workerCount)

	<-sigs
	log.Println("Shutting down workers...")
	ch.Close()
	
	for i := 0; i < workerCount; i++ {
		<-done
	}
}

这个实现使用了RabbitMQ的原生消费模式,消息到达时会立即被处理,不需要定时轮询。Consume方法返回的channel会持续接收消息,直到连接关闭。

回到顶部