Golang实现单个Kafka消费者服务多个gRPC客户端

Golang实现单个Kafka消费者服务多个gRPC客户端 我正在学习Go语言,但在处理以下问题时遇到了困难:我正在从一个Kafka主题消费消息,希望将来自该Kafka主题的每条消息通过流式RPC调用发送给多个gRPC客户端。后者可能并不重要。我遇到的问题是如何将这个消息主题复制给所有连接的客户端。

这个实现尝试为每个创建的连接使用单个通道。但正如我刚意识到的,Go通道本身有点像Kafka主题,它们不会进行扇出分发,也不会让通道的每个消费者都获得传入内容的副本。

这是我的第一次尝试。这里最终发生的情况是:如果没有任何连接,轮询消费者的goroutine就会停止,因为在接收到消息并将其发送到通道时会发生阻塞。当存在连接时,GetEchos连接只收到第一条消息。

我考虑过角色互换。也许让每个GetEchos创建自己的通道,然后传递给另一个通道,这样我的主消费goroutine就可以迭代并发送消息。但在某种数组中管理这些通道似乎很困难。

我该如何最好地处理这个问题?

package main

import (
	"fmt"
	"net"
	"os"
	"os/signal"
	"syscall"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	pb "github.com/parkhub/scout-echo/echo"
	"google.golang.org/grpc"
)

// 该服务器接收消息通道,用于向客户端流式传输
type echoServer struct {
	scoutMessages chan pb.Message
}

func (server *echoServer) GetEchos(_ *pb.Empty, stream pb.Echo_GetEchosServer) error {
	for {
		select {
		case message := <-server.scoutMessages:
			fmt.Printf("Got message!")
			if err := stream.Send(&message); err != nil {
				return err
			}

			// return nil
		default:
			continue
		}
	}
}

func getEnv(key, fallback string) string {
	// 如果环境中不存在该键
	if value, ok := os.LookupEnv(key); ok {
		return value
	}

	return fallback
}

func main() {
	broker := getEnv("BROKER_ADDR", "kafka:9092")
	connPort := getEnv("GRPC_PORT", "50051")
	groupID := getEnv("GROUP_ID", "SCOUT_ECHO_SERVICE")
	topic := getEnv("TCP_LIVE_TOPIC", "ScoutTCPLiveStream")

	scoutMessageChan := make(chan pb.Message)
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":  broker,
		"group.id":           groupID,
		"session.timeout.ms": 6000,
		"default.topic.config": kafka.ConfigMap{
			"auto.offset.reset": "earliest",
		},
	})

	if err != nil {
		fmt.Printf("Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", consumer)

	defer consumer.Close()

	err = consumer.Subscribe(topic, nil)

	if err != nil {
		fmt.Printf("Unable to subscribe to topic: %v\n with error: %s\n", &topic, err)
		os.Exit(1)
	}

	server := &echoServer{scoutMessages: scoutMessageChan}

	go func() {
	readScoutMessages:
		for {
			select {
			case signal := <-sigChan:
				fmt.Printf("Caught signal %v: terminating\n", signal)
				break
			default:
				// 获取事件
				event := consumer.Poll(100)
				// 如果没有任何类型的事件,则重新开始循环
				if event == nil {
					continue
				}

				switch e := event.(type) {
				case *kafka.Message:
					// 将消息发送到scout消息通道
					fmt.Printf("Message on %s\n", e.TopicPartition)

					scoutMessageChan <- pb.Message{OriginalConnAdd: e.Key, Packet: e.Value}
				case kafka.PartitionEOF:
					fmt.Printf("Reached %v\n", e)
				case kafka.Error:
					fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
					break readScoutMessages
				default:
					fmt.Printf("Ignored %v\n", e)
				}
			}
		}
	}()

	// 监听传入连接
	lis, err := net.Listen("tcp", ":"+connPort)

	if err != nil {
		fmt.Println("Error listening:", err.Error())
		os.Exit(1)
	}

	// 在应用程序关闭时关闭监听器
	defer lis.Close()

	fmt.Println("Listening on Port:" + connPort)

	grpcServer := grpc.NewServer()
	pb.RegisterEchoServer(grpcServer, server)

	if err := grpcServer.Serve(lis); err != nil {
		fmt.Fprintf(os.Stderr, "%% Error: %v\n", err)
	}
}

更多关于Golang实现单个Kafka消费者服务多个gRPC客户端的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang实现单个Kafka消费者服务多个gRPC客户端的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


您遇到的问题很典型:Go通道是单播的,每个消息只能被一个消费者接收。要实现Kafka消息的广播(扇出)到多个gRPC客户端,需要采用不同的架构模式。

以下是修改后的实现,使用扇出模式将Kafka消息广播给所有连接的gRPC客户端:

package main

import (
	"fmt"
	"net"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	pb "github.com/parkhub/scout-echo/echo"
	"google.golang.org/grpc"
)

type clientStream struct {
	stream pb.Echo_GetEchosServer
	done   chan bool
}

type echoServer struct {
	mu              sync.RWMutex
	clientStreams   map[*clientStream]bool
	register        chan *clientStream
	unregister      chan *clientStream
	scoutMessages   chan pb.Message
}

func newEchoServer() *echoServer {
	return &echoServer{
		clientStreams: make(map[*clientStream]bool),
		register:      make(chan *clientStream),
		unregister:    make(chan *clientStream),
		scoutMessages: make(chan pb.Message, 1000), // 缓冲通道避免阻塞
	}
}

func (server *echoServer) run() {
	for {
		select {
		case client := <-server.register:
			server.mu.Lock()
			server.clientStreams[client] = true
			server.mu.Unlock()
			fmt.Printf("Client connected. Total clients: %d\n", len(server.clientStreams))

		case client := <-server.unregister:
			server.mu.Lock()
			if _, ok := server.clientStreams[client]; ok {
				delete(server.clientStreams, client)
				close(client.done)
			}
			server.mu.Unlock()
			fmt.Printf("Client disconnected. Total clients: %d\n", len(server.clientStreams))

		case message := <-server.scoutMessages:
			server.mu.RLock()
			for client := range server.clientStreams {
				select {
				case client.done <- true:
					// 客户端活跃,尝试发送消息
					go func(c *clientStream, msg pb.Message) {
						if err := c.stream.Send(&msg); err != nil {
							server.unregister <- c
						}
					}(client, message)
				default:
					// 客户端不活跃,移除
					server.unregister <- client
				}
			}
			server.mu.RUnlock()
		}
	}
}

func (server *echoServer) GetEchos(_ *pb.Empty, stream pb.Echo_GetEchosServer) error {
	client := &clientStream{
		stream: stream,
		done:   make(chan bool, 1),
	}

	server.register <- client

	// 保持连接直到客户端断开或出错
	<-client.done
	return nil
}

func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}

func main() {
	broker := getEnv("BROKER_ADDR", "kafka:9092")
	connPort := getEnv("GRPC_PORT", "50051")
	groupID := getEnv("GROUP_ID", "SCOUT_ECHO_SERVICE")
	topic := getEnv("TCP_LIVE_TOPIC", "ScoutTCPLiveStream")

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	server := newEchoServer()
	go server.run()

	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":  broker,
		"group.id":           groupID,
		"session.timeout.ms": 6000,
		"default.topic.config": kafka.ConfigMap{
			"auto.offset.reset": "earliest",
		},
	})

	if err != nil {
		fmt.Printf("Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", consumer)
	defer consumer.Close()

	err = consumer.Subscribe(topic, nil)
	if err != nil {
		fmt.Printf("Unable to subscribe to topic: %v\n with error: %s\n", &topic, err)
		os.Exit(1)
	}

	// Kafka消费者goroutine
	go func() {
		for {
			select {
			case <-sigChan:
				fmt.Println("Kafka consumer stopping...")
				return
			default:
				event := consumer.Poll(100)
				if event == nil {
					continue
				}

				switch e := event.(type) {
				case *kafka.Message:
					fmt.Printf("Message on %s\n", e.TopicPartition)
					message := pb.Message{
						OriginalConnAdd: e.Key,
						Packet:          e.Value,
					}
					server.scoutMessages <- message
				case kafka.PartitionEOF:
					fmt.Printf("Reached %v\n", e)
				case kafka.Error:
					fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
					return
				default:
					fmt.Printf("Ignored %v\n", e)
				}
			}
		}
	}()

	// gRPC服务器
	lis, err := net.Listen("tcp", ":"+connPort)
	if err != nil {
		fmt.Println("Error listening:", err.Error())
		os.Exit(1)
	}
	defer lis.Close()

	fmt.Println("Listening on Port:" + connPort)

	grpcServer := grpc.NewServer()
	pb.RegisterEchoServer(grpcServer, server)

	if err := grpcServer.Serve(lis); err != nil {
		fmt.Fprintf(os.Stderr, "%% Error: %v\n", err)
	}
}

这个实现的关键改进:

  1. 扇出模式:使用中央广播器将Kafka消息发送给所有注册的gRPC客户端
  2. 客户端管理:通过注册/注销机制动态管理客户端连接
  3. 并发安全:使用sync.RWMutex保护客户端映射的并发访问
  4. 非阻塞设计:使用缓冲通道和goroutine避免消息发送阻塞主循环
  5. 连接监控:自动检测和清理断开连接的客户端

当Kafka消费者收到消息时,它会将消息发送到scoutMessages通道,广播循环会将该消息发送给所有活跃的gRPC客户端。每个新的gRPC连接都会注册到广播器中,断开时自动注销。

回到顶部