Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb
Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb
我尝试通过 gRPC 网关使用服务器发送事件 eventsource,因此阅读了这篇文章和这篇文章,并得出了下面的 proto 定义:
syntax = "proto3";
package protobuf;
option go_package = "/proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
service Events {
rpc StreamEvents (google.protobuf.Empty) returns (stream EventSource) {
option (google.api.http) = {
get: "/v1/rawdata/stream"
};
}
}
message Empty{}
message EventSource {
string event = 1;
google.protobuf.Any data = 2;
}
并使用以下命令生成了所有必需的 gRPC 文件(消息/服务/网关):
protoc -I ./proto \
--go_out ./proto --go_opt paths=source_relative \
--go-grpc_out ./proto --go-grpc_opt paths=source_relative \
--grpc-gateway_out ./proto --grpc-gateway_opt paths=source_relative \
./proto/data.proto
并尝试在我的主应用程序中注册生成的服务和网关,如下所示:
package main
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
_ "github.com/mattn/go-sqlite3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "walistner/proto"
)
func main() {
gRPcPort := ":50005"
// 在 TCP 端口上创建 gRPC 监听器
lis, err := net.Listen("tcp", gRPcPort)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 创建 gRPC 服务器对象
s := grpc.NewServer()
// 将 Greeter 服务附加到服务器
pb.RegisterEventsServer(s, server{})
log.Println("Serving gRPC server on 0.0.0.0:50005")
// 启动 gRPC 服务器
grpcTerminated := make(chan struct{})
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
close(grpcTerminated) // 如果服务器在我们请求之前终止,则关闭通道
}
}()
// 创建到我们刚刚启动的 gRPC 服务器的客户端连接
// 这是 gRPC-Gateway 代理请求的地方
// gateWayTarget := fmt.Sprintf("0.0.0.0%s", gRPcPort)
conn, err := grpc.DialContext(
context.Background(),
gRPcPort,
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalln("Failed to dial server:", err)
}
gwmux := runtime.NewServeMux()
// 此处理程序用于 JSON ServerSentEvent,而不是 gRPC 的
//http.HandleFunc("/sse", passer.HandleSignal)
// 为 GET /hello/{name} 注册自定义路由
//err = gwmux.HandlePath("GET", "/sse", passer.HandleSignal)
//if err != nil {
// fmt.Println("Error:", err)
//}
// 此处理程序仅用于确认服务器已启动并运行
// 为 GET /hello/{name} 注册自定义路由
err = gwmux.HandlePath("GET", "/hello/{name}", func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
w.Write([]byte("hello " + pathParams["name"]))
})
if err != nil {
fmt.Println("Error:", err)
}
// 注册 gRPC ServerSentEvent
err = pb.RegisterEventsHandler(context.Background(), gwmux, conn)
if err != nil {
log.Fatalln("Failed to register gateway:", err)
}
gwServer := &http.Server{
Addr: ":8090",
Handler: allowCORS(gwmux),
}
log.Println("Serving gRPC-Gateway on http://localhost:8090")
fmt.Println("run POST request of: http://localhost:8090/v1/rawdata/stream")
fmt.Println("or run curl -X GET -k http://localhost:8090/v1/rawdata/stream")
log.Fatal(gwServer.ListenAndServe()) // <- 仅这一行可能就足够了,不需要后面的所有代码,
// 应用程序可能还在做其他事情,你会希望能够干净地关闭;
// 传入上下文是一个好方法..
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 确保最终调用取消函数
grpcWebTerminated := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := gwServer.ListenAndServe(); err != nil {
fmt.Printf("Web server (GRPC) shutdown: %s", err)
}
close(grpcWebTerminated) // 如果服务器在我们请求之前终止,则关闭通道
}()
// 等待 Web 服务器关闭或上下文被取消...
select {
case <-ctx.Done():
// 关闭服务器(有请求此操作的关闭命令)
case <-grpcTerminated:
// 如果发生这种情况(由于意外错误),你可能想退出
case <-grpcWebTerminated:
// 如果发生这种情况(由于意外错误),你可能想退出
}
// 等待 goroutine 完成
<-grpcTerminated
<-grpcWebTerminated
// 监听 Ctrl+C(你也可以做其他防止程序退出的操作)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
if client.IsConnected() {
passer.data <- sseData{
event: "notification",
message: "Server is shut down at the host machine...",
}
client.Disconnect()
}
}
// allowCORS 允许来自任何源的跨源资源共享。
// 在生产系统中未经考虑不要这样做。
func allowCORS(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if origin := r.Header.Get("Origin"); origin != "" {
w.Header().Set("Access-Control-Allow-Origin", origin)
}
h.ServeHTTP(w, r)
})
}
而出现问题的主要部分是 gateway 注册:
package main
import (
"fmt"
"log"
"sync"
pb "walistner/proto"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/emptypb"
)
type server struct {
pb.UnimplementedEventsServer
}
func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
if p, ok := peer.FromContext(srv.Context()); ok {
fmt.Println("Client ip is:", p.Addr.String())
}
md := metadata.New(map[string]string{"Content-Type": "text/event-stream", "Connection": "keep-alive"})
srv.SetHeader(md)
//使用等待组允许进程并发执行
var wg sync.WaitGroup
for {
incomingInput := <-passer.data
wg.Add(1)
//go func(count int64) {
go func() {
defer wg.Done()
myData := incomingInput.data <---------------这是一个字符串
// 将数据转换为 *anypb.Any
data, err := anypb.New(myData.(protoreflect.ProtoMessage)) <----- 尝试了这个但失败了
if err != nil {
//...
}
resp := pb.EventSource{
Event: incomingInput.event,
Data: data, // <---------- 这需要是 *anypb.Any 类型
}
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
}()
}
wg.Wait()
return nil
}
我在 Go 应用程序内部的数据结构是:
type sseData struct {
event, message string
}
type DataPasser struct {
data chan sseData
logs chan string
connection chan struct{} // 用于控制允许的最大客户端连接数
}
var passer *DataPasser
func init() {
passer = &DataPasser{
data: make(chan sseData),
logs: make(chan string),
connection: make(chan struct{}, maxClients),
}
}
更多关于Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb的实战教程也可以访问 https://www.itying.com/category-94-b0.html
anypb#New 接受一个 proto.Message,它是 protoreflect.ProtoMessage 的别名,该接口定义如下:
type ProtoMessage interface{
ProtoReflect() Message
}
因此,为了符合该接口,你的类型需要实现该接口并返回一个 protoreflect.Message。
更多关于Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Golang中将字符串转换为protoreflect.ProtoMessage或anypb.Any,需要先创建一个具体的protobuf消息类型,然后使用anypb.New()进行包装。以下是几种解决方案:
方案1:使用预定义的protobuf消息类型
首先定义一个简单的protobuf消息类型来包装字符串:
// 在proto文件中添加
message StringData {
string value = 1;
}
然后在Go代码中:
import (
"google.golang.org/protobuf/types/known/anypb"
pb "walistner/proto"
)
func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
// ... 其他代码
for {
incomingInput := <-passer.data
// 创建StringData消息
stringData := &pb.StringData{
Value: incomingInput.message, // 使用message字段
}
// 转换为anypb.Any
data, err := anypb.New(stringData)
if err != nil {
log.Printf("failed to create anypb.Any: %v", err)
continue
}
resp := pb.EventSource{
Event: incomingInput.event,
Data: data,
}
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
}
}
方案2:动态创建protobuf消息
如果不想修改proto定义,可以使用动态消息:
import (
"fmt"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
)
func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
// ... 其他代码
for {
incomingInput := <-passer.data
// 方法1:使用structpb.Value
strValue := structpb.NewStringValue(incomingInput.message)
data, err := anypb.New(strValue)
if err != nil {
log.Printf("failed to create anypb.Any: %v", err)
continue
}
// 或者方法2:使用structpb.Struct
st := &structpb.Struct{
Fields: map[string]*structpb.Value{
"message": structpb.NewStringValue(incomingInput.message),
"event": structpb.NewStringValue(incomingInput.event),
},
}
data, err = anypb.New(st)
if err != nil {
log.Printf("failed to create anypb.Any: %v", err)
continue
}
resp := pb.EventSource{
Event: incomingInput.event,
Data: data,
}
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
}
}
方案3:使用JSON序列化
如果数据是JSON字符串,可以这样处理:
import (
"encoding/json"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
)
func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
// ... 其他代码
for {
incomingInput := <-passer.data
// 假设incomingInput.message是JSON字符串
var jsonData map[string]interface{}
if err := json.Unmarshal([]byte(incomingInput.message), &jsonData); err == nil {
// 成功解析JSON
st, err := structpb.NewStruct(jsonData)
if err != nil {
log.Printf("failed to create struct: %v", err)
continue
}
data, err := anypb.New(st)
if err != nil {
log.Printf("failed to create anypb.Any: %v", err)
continue
}
resp := pb.EventSource{
Event: incomingInput.event,
Data: data,
}
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
} else {
// 如果不是JSON,作为普通字符串处理
strValue := structpb.NewStringValue(incomingInput.message)
data, err := anypb.New(strValue)
if err != nil {
log.Printf("failed to create anypb.Any: %v", err)
continue
}
resp := pb.EventSource{
Event: incomingInput.event,
Data: data,
}
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
}
}
}
方案4:创建通用的包装消息
定义一个通用的包装消息类型:
// 在proto文件中添加
message GenericData {
string type = 1;
string content = 2;
map<string, string> metadata = 3;
}
func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
// ... 其他代码
for {
incomingInput := <-passer.data
// 创建GenericData消息
genericData := &pb.GenericData{
Type: "string",
Content: incomingInput.message,
Metadata: map[string]string{
"event_type": incomingInput.event,
},
}
// 转换为anypb.Any
data, err := anypb.New(genericData)
if err != nil {
log.Printf("failed to create anypb.Any: %v", err)
continue
}
resp := pb.EventSource{
Event: incomingInput.event,
Data: data,
}
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
}
}
关键点总结:
- anypb.New()需要protoreflect.ProtoMessage类型:不能直接传递字符串
- 需要先创建具体的protobuf消息:如StringData、structpb.Value等
- structpb包很有用:提供了动态创建protobuf消息的方法
- JSON数据可以转换为structpb.Struct:便于处理结构化数据
选择哪种方案取决于你的具体需求:
- 如果数据结构固定,使用方案1(预定义消息类型)
- 如果数据结构动态变化,使用方案2或3
- 如果需要通用包装,使用方案4

