Golang实现单个Kafka消费者服务多个gRPC客户端
Golang实现单个Kafka消费者服务多个gRPC客户端 我正在学习Go语言,但在处理以下问题时遇到了困难:我正在从一个Kafka主题消费消息,希望将来自该Kafka主题的每条消息通过流式RPC调用发送给多个gRPC客户端。后者可能并不重要。我遇到的问题是如何将这个消息主题复制给所有连接的客户端。
这个实现尝试为每个创建的连接使用单个通道。但正如我刚意识到的,Go通道本身有点像Kafka主题,它们不会进行扇出分发,也不会让通道的每个消费者都获得传入内容的副本。
这是我的第一次尝试。这里最终发生的情况是:如果没有任何连接,轮询消费者的goroutine就会停止,因为在接收到消息并将其发送到通道时会发生阻塞。当存在连接时,GetEchos连接只收到第一条消息。
我考虑过角色互换。也许让每个GetEchos创建自己的通道,然后传递给另一个通道,这样我的主消费goroutine就可以迭代并发送消息。但在某种数组中管理这些通道似乎很困难。
我该如何最好地处理这个问题?
package main
import (
"fmt"
"net"
"os"
"os/signal"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
pb "github.com/parkhub/scout-echo/echo"
"google.golang.org/grpc"
)
// 该服务器接收消息通道,用于向客户端流式传输
type echoServer struct {
scoutMessages chan pb.Message
}
func (server *echoServer) GetEchos(_ *pb.Empty, stream pb.Echo_GetEchosServer) error {
for {
select {
case message := <-server.scoutMessages:
fmt.Printf("Got message!")
if err := stream.Send(&message); err != nil {
return err
}
// return nil
default:
continue
}
}
}
func getEnv(key, fallback string) string {
// 如果环境中不存在该键
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func main() {
broker := getEnv("BROKER_ADDR", "kafka:9092")
connPort := getEnv("GRPC_PORT", "50051")
groupID := getEnv("GROUP_ID", "SCOUT_ECHO_SERVICE")
topic := getEnv("TCP_LIVE_TOPIC", "ScoutTCPLiveStream")
scoutMessageChan := make(chan pb.Message)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": groupID,
"session.timeout.ms": 6000,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": "earliest",
},
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", consumer)
defer consumer.Close()
err = consumer.Subscribe(topic, nil)
if err != nil {
fmt.Printf("Unable to subscribe to topic: %v\n with error: %s\n", &topic, err)
os.Exit(1)
}
server := &echoServer{scoutMessages: scoutMessageChan}
go func() {
readScoutMessages:
for {
select {
case signal := <-sigChan:
fmt.Printf("Caught signal %v: terminating\n", signal)
break
default:
// 获取事件
event := consumer.Poll(100)
// 如果没有任何类型的事件,则重新开始循环
if event == nil {
continue
}
switch e := event.(type) {
case *kafka.Message:
// 将消息发送到scout消息通道
fmt.Printf("Message on %s\n", e.TopicPartition)
scoutMessageChan <- pb.Message{OriginalConnAdd: e.Key, Packet: e.Value}
case kafka.PartitionEOF:
fmt.Printf("Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
break readScoutMessages
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
}()
// 监听传入连接
lis, err := net.Listen("tcp", ":"+connPort)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
// 在应用程序关闭时关闭监听器
defer lis.Close()
fmt.Println("Listening on Port:" + connPort)
grpcServer := grpc.NewServer()
pb.RegisterEchoServer(grpcServer, server)
if err := grpcServer.Serve(lis); err != nil {
fmt.Fprintf(os.Stderr, "%% Error: %v\n", err)
}
}
更多关于Golang实现单个Kafka消费者服务多个gRPC客户端的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于Golang实现单个Kafka消费者服务多个gRPC客户端的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
您遇到的问题很典型:Go通道是单播的,每个消息只能被一个消费者接收。要实现Kafka消息的广播(扇出)到多个gRPC客户端,需要采用不同的架构模式。
以下是修改后的实现,使用扇出模式将Kafka消息广播给所有连接的gRPC客户端:
package main
import (
"fmt"
"net"
"os"
"os/signal"
"sync"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
pb "github.com/parkhub/scout-echo/echo"
"google.golang.org/grpc"
)
type clientStream struct {
stream pb.Echo_GetEchosServer
done chan bool
}
type echoServer struct {
mu sync.RWMutex
clientStreams map[*clientStream]bool
register chan *clientStream
unregister chan *clientStream
scoutMessages chan pb.Message
}
func newEchoServer() *echoServer {
return &echoServer{
clientStreams: make(map[*clientStream]bool),
register: make(chan *clientStream),
unregister: make(chan *clientStream),
scoutMessages: make(chan pb.Message, 1000), // 缓冲通道避免阻塞
}
}
func (server *echoServer) run() {
for {
select {
case client := <-server.register:
server.mu.Lock()
server.clientStreams[client] = true
server.mu.Unlock()
fmt.Printf("Client connected. Total clients: %d\n", len(server.clientStreams))
case client := <-server.unregister:
server.mu.Lock()
if _, ok := server.clientStreams[client]; ok {
delete(server.clientStreams, client)
close(client.done)
}
server.mu.Unlock()
fmt.Printf("Client disconnected. Total clients: %d\n", len(server.clientStreams))
case message := <-server.scoutMessages:
server.mu.RLock()
for client := range server.clientStreams {
select {
case client.done <- true:
// 客户端活跃,尝试发送消息
go func(c *clientStream, msg pb.Message) {
if err := c.stream.Send(&msg); err != nil {
server.unregister <- c
}
}(client, message)
default:
// 客户端不活跃,移除
server.unregister <- client
}
}
server.mu.RUnlock()
}
}
}
func (server *echoServer) GetEchos(_ *pb.Empty, stream pb.Echo_GetEchosServer) error {
client := &clientStream{
stream: stream,
done: make(chan bool, 1),
}
server.register <- client
// 保持连接直到客户端断开或出错
<-client.done
return nil
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
func main() {
broker := getEnv("BROKER_ADDR", "kafka:9092")
connPort := getEnv("GRPC_PORT", "50051")
groupID := getEnv("GROUP_ID", "SCOUT_ECHO_SERVICE")
topic := getEnv("TCP_LIVE_TOPIC", "ScoutTCPLiveStream")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
server := newEchoServer()
go server.run()
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": groupID,
"session.timeout.ms": 6000,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": "earliest",
},
})
if err != nil {
fmt.Printf("Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", consumer)
defer consumer.Close()
err = consumer.Subscribe(topic, nil)
if err != nil {
fmt.Printf("Unable to subscribe to topic: %v\n with error: %s\n", &topic, err)
os.Exit(1)
}
// Kafka消费者goroutine
go func() {
for {
select {
case <-sigChan:
fmt.Println("Kafka consumer stopping...")
return
default:
event := consumer.Poll(100)
if event == nil {
continue
}
switch e := event.(type) {
case *kafka.Message:
fmt.Printf("Message on %s\n", e.TopicPartition)
message := pb.Message{
OriginalConnAdd: e.Key,
Packet: e.Value,
}
server.scoutMessages <- message
case kafka.PartitionEOF:
fmt.Printf("Reached %v\n", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
return
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
}()
// gRPC服务器
lis, err := net.Listen("tcp", ":"+connPort)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer lis.Close()
fmt.Println("Listening on Port:" + connPort)
grpcServer := grpc.NewServer()
pb.RegisterEchoServer(grpcServer, server)
if err := grpcServer.Serve(lis); err != nil {
fmt.Fprintf(os.Stderr, "%% Error: %v\n", err)
}
}
这个实现的关键改进:
- 扇出模式:使用中央广播器将Kafka消息发送给所有注册的gRPC客户端
- 客户端管理:通过注册/注销机制动态管理客户端连接
- 并发安全:使用
sync.RWMutex保护客户端映射的并发访问 - 非阻塞设计:使用缓冲通道和goroutine避免消息发送阻塞主循环
- 连接监控:自动检测和清理断开连接的客户端
当Kafka消费者收到消息时,它会将消息发送到scoutMessages通道,广播循环会将该消息发送给所有活跃的gRPC客户端。每个新的gRPC连接都会注册到广播器中,断开时自动注销。

