Golang实现RabbitMQ消费者的定时任务
Golang实现RabbitMQ消费者的定时任务 我有两个项目,第一个项目作为 RabbitMQ 的生产者,向队列发送数据。第二个项目作为消费者,需要持续监听队列中是否有新的数据,如果有,就将这些数据提交到数据库。我如何在 Go 语言中为我的第二个项目创建这个监听器?
目前我尝试了…
- RabbitMQ 消费者部分
- 使用 time.Ticker 构建一个调度器
结果: 停止服务器后,它又能正常工作,但由于这是一个基于 Web 的微服务,不可能停止服务器。
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
ticker := time.NewTicker(time.Second * 5)
go scheduler(ticker)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
ticker.Stop()
//os.Exit(1)
}
func scheduler(ticker *time.Ticker) {
for ;true;<- ticker.C {
Consumers()
}
}
func HandleError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
type RabbitLocationData struct {
Domain string `json:"domain"`
UserId string `json:"user_id"`
ClientTimeStampUTC float64 `json:"client_timestamp_utc"`
ServerTimeStampUTC float64 `json:"server_timestamp_utc"`
Longitude float64 `json:"lon"`
Latitude float64 `json:"lat"`
}
func (rld RabbitLocationData) ToString() string {
result := fmt.Sprintf("RabbitLocationData{userId='%s', "+
"clientTimestampUtc=%f, "+
"serverTimestampUtc=%f, "+
"latitude=%f, "+
"longitude=%f, "+
"domain='%s'"+
"}",
rld.UserId,
rld.ClientTimeStampUTC,
rld.ServerTimeStampUTC,
rld.Latitude,
rld.Longitude,
rld.Domain)
return result
}
func Consumers() {
// Make a connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
HandleError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// Create a channel
ch, err := conn.Channel()
HandleError(err, "Failed to open a channel")
defer ch.Close()
// Declare a queue
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
HandleError(err, "could not declare 'hello' queue")
err = ch.Qos(1, 0, false)
HandleError(err, "could not configure Qos")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
HandleError(err, "could not register consumer")
stopChannel := make(chan bool)
go func() {
log.Printf("Consumer ready, PID: %d", os.Getpid())
for d := range msgs {
log.Printf("Received a message: %s", string(d.Body))
locationData := &RabbitLocationData{}
err := json.Unmarshal(d.Body, locationData)
if err != nil {
log.Printf("Error decoding JSON: %s", err)
}
log.Printf("Result: %s", locationData.ToString())
if err := d.Ack(false); err != nil {
log.Printf("Error acknowledging message: %s", err)
} else {
log.Printf("Acknowledged message")
}
}
}()
log.Printf("[*] waiting for messages. To exit press CTRL+C")
<-stopChannel
}
更多关于Golang实现RabbitMQ消费者的定时任务的实战教程也可以访问 https://www.itying.com/category-94-b0.html
为什么你要每五秒运行一次 Consumers,而在这个函数中启动的 AMQP 消费者会持续接收 RabbitMQ 推送的消息?
只需打开一次 AMQP 连接和通道,启动消费者,在一个循环中消费由 AMQP 消息推送过来的消息,并处理每个接收到的消息。
更多关于Golang实现RabbitMQ消费者的定时任务的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我每五秒调用一次 Consumer,因为假设我向 RabbitMQ 队列推送了一些消息,那么当另一个项目运行时,它会在项目首次运行时消费那些已排队的消息。但在消费并确认这些消息后,它会等待新的消息。与此同时,队列会更新,因此第二个项目可以检查是否有新的消息可供消费。
RabbitMQ 的工作方式并非如此。如果有两个消费者从同一个队列消费消息,RabbitMQ 基本上会使用轮询算法向它们推送消息。可以配置消费者,使其只被推送最多指定数量的消息进行消费和确认(请搜索“consumer prefetch”),但基本上 RabbitMQ 会均匀地将消息推送给所有已连接的消费者。
只需编写你的应用程序,使其打开一个连接和一个通道,创建一个消费者并让它持续消费。如果你启动这个应用程序两次,RabbitMQ 将均匀地分发和推送消息给这两个消费者。
在Go语言中实现RabbitMQ消费者时,不应该使用定时器轮询的方式。RabbitMQ的消费者设计应该是持续监听队列,而不是定时检查。以下是修正后的实现:
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"log"
"os"
"os/signal"
"syscall"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
HandleError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
HandleError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
HandleError(err, "could not declare 'hello' queue")
err = ch.Qos(1, 0, false)
HandleError(err, "could not configure Qos")
msgs, err := ch.Consume(
q.Name,
"",
false, // 设置为false,手动确认消息
false,
false,
false,
nil,
)
HandleError(err, "could not register consumer")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
log.Printf("Consumer ready, PID: %d", os.Getpid())
log.Printf("[*] waiting for messages. To exit press CTRL+C")
for {
select {
case d := <-msgs:
log.Printf("Received a message: %s", string(d.Body))
locationData := &RabbitLocationData{}
err := json.Unmarshal(d.Body, locationData)
if err != nil {
log.Printf("Error decoding JSON: %s", err)
d.Nack(false, false) // 解码失败,拒绝消息
continue
}
log.Printf("Result: %s", locationData.ToString())
// 这里添加数据库插入逻辑
// err = insertToDatabase(locationData)
// if err != nil {
// log.Printf("Error inserting to database: %s", err)
// d.Nack(false, true) // 数据库失败,重新入队
// continue
// }
if err := d.Ack(false); err != nil {
log.Printf("Error acknowledging message: %s", err)
} else {
log.Printf("Acknowledged message")
}
case <-sigs:
log.Println("Received termination signal, shutting down...")
return
}
}
}
func HandleError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
type RabbitLocationData struct {
Domain string `json:"domain"`
UserId string `json:"user_id"`
ClientTimeStampUTC float64 `json:"client_timestamp_utc"`
ServerTimeStampUTC float64 `json:"server_timestamp_utc"`
Longitude float64 `json:"lon"`
Latitude float64 `json:"lat"`
}
func (rld RabbitLocationData) ToString() string {
result := fmt.Sprintf("RabbitLocationData{userId='%s', "+
"clientTimestampUtc=%f, "+
"serverTimestampUtc=%f, "+
"latitude=%f, "+
"longitude=%f, "+
"domain='%s'"+
"}",
rld.UserId,
rld.ClientTimeStampUTC,
rld.ServerTimeStampUTC,
rld.Latitude,
rld.Longitude,
rld.Domain)
return result
}
如果需要并发处理消息,可以使用goroutine池:
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
HandleError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
HandleError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
HandleError(err, "could not declare 'hello' queue")
err = ch.Qos(10, 0, false) // 增加预取数量
HandleError(err, "could not configure Qos")
msgs, err := ch.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
HandleError(err, "could not register consumer")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
workerCount := 5
done := make(chan bool)
for i := 0; i < workerCount; i++ {
go func(workerID int) {
for d := range msgs {
log.Printf("Worker %d received message: %s", workerID, string(d.Body))
locationData := &RabbitLocationData{}
err := json.Unmarshal(d.Body, locationData)
if err != nil {
log.Printf("Worker %d error decoding JSON: %s", workerID, err)
d.Nack(false, false)
continue
}
log.Printf("Worker %d processing: %s", workerID, locationData.ToString())
// 数据库插入逻辑
// if err := insertToDatabase(locationData); err != nil {
// d.Nack(false, true)
// continue
// }
d.Ack(false)
}
done <- true
}(i)
}
log.Printf("Started %d workers, waiting for messages...", workerCount)
<-sigs
log.Println("Shutting down workers...")
ch.Close()
for i := 0; i < workerCount; i++ {
<-done
}
}
这个实现使用了RabbitMQ的原生消费模式,消息到达时会立即被处理,不需要定时轮询。Consume方法返回的channel会持续接收消息,直到连接关闭。

