Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb

Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb 我尝试通过 gRPC 网关使用服务器发送事件 eventsource,因此阅读了这篇文章这篇文章,并得出了下面的 proto 定义:

syntax = "proto3";

package protobuf;
option go_package = "/proto";

import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";

service Events {
  rpc StreamEvents (google.protobuf.Empty) returns (stream EventSource) {
    option (google.api.http) = {
      get: "/v1/rawdata/stream"
    };
  }
}

message Empty{}

message EventSource {
  string event = 1;
  google.protobuf.Any data = 2;
}

并使用以下命令生成了所有必需的 gRPC 文件(消息/服务/网关):

protoc -I ./proto \
  --go_out ./proto --go_opt paths=source_relative \
  --go-grpc_out ./proto --go-grpc_opt paths=source_relative \
  --grpc-gateway_out ./proto --grpc-gateway_opt paths=source_relative \
  ./proto/data.proto

并尝试在我的主应用程序中注册生成的服务和网关,如下所示:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
	_ "github.com/mattn/go-sqlite3"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	pb "walistner/proto"
)

func main() {
	gRPcPort := ":50005"
	// 在 TCP 端口上创建 gRPC 监听器
	lis, err := net.Listen("tcp", gRPcPort)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// 创建 gRPC 服务器对象
	s := grpc.NewServer()

	// 将 Greeter 服务附加到服务器
	pb.RegisterEventsServer(s, server{})

	log.Println("Serving gRPC server on 0.0.0.0:50005")

	// 启动 gRPC 服务器
	grpcTerminated := make(chan struct{})
	go func() {
		if err := s.Serve(lis); err != nil {
			log.Fatalf("failed to serve: %v", err)
			close(grpcTerminated) // 如果服务器在我们请求之前终止,则关闭通道
		}
	}()

	// 创建到我们刚刚启动的 gRPC 服务器的客户端连接
	// 这是 gRPC-Gateway 代理请求的地方
	//	gateWayTarget := fmt.Sprintf("0.0.0.0%s", gRPcPort)
	conn, err := grpc.DialContext(
		context.Background(),
		gRPcPort,
		grpc.WithBlock(),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalln("Failed to dial server:", err)
	}

	gwmux := runtime.NewServeMux()

    // 此处理程序用于 JSON ServerSentEvent,而不是 gRPC 的
	//http.HandleFunc("/sse", passer.HandleSignal)
	// 为 GET /hello/{name} 注册自定义路由
	//err = gwmux.HandlePath("GET", "/sse", passer.HandleSignal)
	//if err != nil {
	//	fmt.Println("Error:", err)
	//}

    // 此处理程序仅用于确认服务器已启动并运行
	// 为 GET /hello/{name} 注册自定义路由
	err = gwmux.HandlePath("GET", "/hello/{name}", func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
		w.Write([]byte("hello " + pathParams["name"]))
	})
	if err != nil {
		fmt.Println("Error:", err)
	}

	// 注册 gRPC ServerSentEvent
	err = pb.RegisterEventsHandler(context.Background(), gwmux, conn)
	if err != nil {
		log.Fatalln("Failed to register gateway:", err)
	}

	gwServer := &http.Server{
		Addr:    ":8090",
		Handler: allowCORS(gwmux),
	}

	log.Println("Serving gRPC-Gateway on http://localhost:8090")

	fmt.Println("run POST request of: http://localhost:8090/v1/rawdata/stream")
	fmt.Println("or run curl -X GET -k http://localhost:8090/v1/rawdata/stream")
	log.Fatal(gwServer.ListenAndServe()) // <- 仅这一行可能就足够了,不需要后面的所有代码,

	// 应用程序可能还在做其他事情,你会希望能够干净地关闭;
	// 传入上下文是一个好方法..
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // 确保最终调用取消函数

	grpcWebTerminated := make(chan struct{})
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		if err := gwServer.ListenAndServe(); err != nil {
			fmt.Printf("Web server (GRPC) shutdown: %s", err)
		}
		close(grpcWebTerminated) // 如果服务器在我们请求之前终止,则关闭通道
	}()

	// 等待 Web 服务器关闭或上下文被取消...
	select {
	case <-ctx.Done():
		// 关闭服务器(有请求此操作的关闭命令)
	case <-grpcTerminated:
		// 如果发生这种情况(由于意外错误),你可能想退出
	case <-grpcWebTerminated:
		// 如果发生这种情况(由于意外错误),你可能想退出
	}
	// 等待 goroutine 完成
	<-grpcTerminated
	<-grpcWebTerminated

	// 监听 Ctrl+C(你也可以做其他防止程序退出的操作)
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)

	<-c
	if client.IsConnected() {
		passer.data <- sseData{
			event:   "notification",
			message: "Server is shut down at the host machine...",
		}
		client.Disconnect()
	}
}

// allowCORS 允许来自任何源的跨源资源共享。
// 在生产系统中未经考虑不要这样做。
func allowCORS(h http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if origin := r.Header.Get("Origin"); origin != "" {
			w.Header().Set("Access-Control-Allow-Origin", origin)
		}
		h.ServeHTTP(w, r)
	})
}

而出现问题的主要部分是 gateway 注册:

package main

import (
	"fmt"
	"log"
	"sync"
	pb "walistner/proto"

	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/peer"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/known/anypb"
	"google.golang.org/protobuf/types/known/emptypb"
)

type server struct {
	pb.UnimplementedEventsServer
}

func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
	if p, ok := peer.FromContext(srv.Context()); ok {
		fmt.Println("Client ip is:", p.Addr.String())
	}
	md := metadata.New(map[string]string{"Content-Type": "text/event-stream", "Connection": "keep-alive"})

	srv.SetHeader(md)

	//使用等待组允许进程并发执行
	var wg sync.WaitGroup

	for {
		incomingInput := <-passer.data
		wg.Add(1)
		//go func(count int64) {
		go func() {
			defer wg.Done()
      
            myData := incomingInput.data <---------------这是一个字符串

            // 将数据转换为 *anypb.Any
			data, err := anypb.New(myData.(protoreflect.ProtoMessage)) <----- 尝试了这个但失败了
			if err != nil {
				//...
			}

			resp := pb.EventSource{
				Event: incomingInput.event,
				Data:  data, // <---------- 这需要是 *anypb.Any 类型
			}

			if err := srv.Send(&resp); err != nil {
				log.Printf("send error %v", err)
			}
		}()
	}

	wg.Wait()
	return nil
}

我在 Go 应用程序内部的数据结构是:

type sseData struct {
	event, message string
}
type DataPasser struct {
	data       chan sseData
	logs       chan string
	connection chan struct{} // 用于控制允许的最大客户端连接数
}

var passer *DataPasser

func init() {
	passer = &DataPasser{
		data:       make(chan sseData),
		logs:       make(chan string),
		connection: make(chan struct{}, maxClients),
	}
}

更多关于Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

anypb#New 接受一个 proto.Message,它是 protoreflect.ProtoMessage 的别名,该接口定义如下:

type ProtoMessage interface{
    ProtoReflect() Message
}

因此,为了符合该接口,你的类型需要实现该接口并返回一个 protoreflect.Message

更多关于Golang中如何将字符串转换为protoreflect.ProtoMessage或anypb的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中将字符串转换为protoreflect.ProtoMessage或anypb.Any,需要先创建一个具体的protobuf消息类型,然后使用anypb.New()进行包装。以下是几种解决方案:

方案1:使用预定义的protobuf消息类型

首先定义一个简单的protobuf消息类型来包装字符串:

// 在proto文件中添加
message StringData {
  string value = 1;
}

然后在Go代码中:

import (
    "google.golang.org/protobuf/types/known/anypb"
    pb "walistner/proto"
)

func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
    // ... 其他代码
    
    for {
        incomingInput := <-passer.data
        
        // 创建StringData消息
        stringData := &pb.StringData{
            Value: incomingInput.message, // 使用message字段
        }
        
        // 转换为anypb.Any
        data, err := anypb.New(stringData)
        if err != nil {
            log.Printf("failed to create anypb.Any: %v", err)
            continue
        }
        
        resp := pb.EventSource{
            Event: incomingInput.event,
            Data:  data,
        }
        
        if err := srv.Send(&resp); err != nil {
            log.Printf("send error %v", err)
        }
    }
}

方案2:动态创建protobuf消息

如果不想修改proto定义,可以使用动态消息:

import (
    "fmt"
    "google.golang.org/protobuf/encoding/protojson"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/reflect/protoreflect"
    "google.golang.org/protobuf/types/dynamicpb"
    "google.golang.org/protobuf/types/known/anypb"
    "google.golang.org/protobuf/types/known/structpb"
)

func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
    // ... 其他代码
    
    for {
        incomingInput := <-passer.data
        
        // 方法1:使用structpb.Value
        strValue := structpb.NewStringValue(incomingInput.message)
        data, err := anypb.New(strValue)
        if err != nil {
            log.Printf("failed to create anypb.Any: %v", err)
            continue
        }
        
        // 或者方法2:使用structpb.Struct
        st := &structpb.Struct{
            Fields: map[string]*structpb.Value{
                "message": structpb.NewStringValue(incomingInput.message),
                "event":   structpb.NewStringValue(incomingInput.event),
            },
        }
        
        data, err = anypb.New(st)
        if err != nil {
            log.Printf("failed to create anypb.Any: %v", err)
            continue
        }
        
        resp := pb.EventSource{
            Event: incomingInput.event,
            Data:  data,
        }
        
        if err := srv.Send(&resp); err != nil {
            log.Printf("send error %v", err)
        }
    }
}

方案3:使用JSON序列化

如果数据是JSON字符串,可以这样处理:

import (
    "encoding/json"
    "google.golang.org/protobuf/encoding/protojson"
    "google.golang.org/protobuf/types/known/anypb"
    "google.golang.org/protobuf/types/known/structpb"
)

func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
    // ... 其他代码
    
    for {
        incomingInput := <-passer.data
        
        // 假设incomingInput.message是JSON字符串
        var jsonData map[string]interface{}
        if err := json.Unmarshal([]byte(incomingInput.message), &jsonData); err == nil {
            // 成功解析JSON
            st, err := structpb.NewStruct(jsonData)
            if err != nil {
                log.Printf("failed to create struct: %v", err)
                continue
            }
            
            data, err := anypb.New(st)
            if err != nil {
                log.Printf("failed to create anypb.Any: %v", err)
                continue
            }
            
            resp := pb.EventSource{
                Event: incomingInput.event,
                Data:  data,
            }
            
            if err := srv.Send(&resp); err != nil {
                log.Printf("send error %v", err)
            }
        } else {
            // 如果不是JSON,作为普通字符串处理
            strValue := structpb.NewStringValue(incomingInput.message)
            data, err := anypb.New(strValue)
            if err != nil {
                log.Printf("failed to create anypb.Any: %v", err)
                continue
            }
            
            resp := pb.EventSource{
                Event: incomingInput.event,
                Data:  data,
            }
            
            if err := srv.Send(&resp); err != nil {
                log.Printf("send error %v", err)
            }
        }
    }
}

方案4:创建通用的包装消息

定义一个通用的包装消息类型:

// 在proto文件中添加
message GenericData {
  string type = 1;
  string content = 2;
  map<string, string> metadata = 3;
}
func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
    // ... 其他代码
    
    for {
        incomingInput := <-passer.data
        
        // 创建GenericData消息
        genericData := &pb.GenericData{
            Type:    "string",
            Content: incomingInput.message,
            Metadata: map[string]string{
                "event_type": incomingInput.event,
            },
        }
        
        // 转换为anypb.Any
        data, err := anypb.New(genericData)
        if err != nil {
            log.Printf("failed to create anypb.Any: %v", err)
            continue
        }
        
        resp := pb.EventSource{
            Event: incomingInput.event,
            Data:  data,
        }
        
        if err := srv.Send(&resp); err != nil {
            log.Printf("send error %v", err)
        }
    }
}

关键点总结:

  1. anypb.New()需要protoreflect.ProtoMessage类型:不能直接传递字符串
  2. 需要先创建具体的protobuf消息:如StringData、structpb.Value等
  3. structpb包很有用:提供了动态创建protobuf消息的方法
  4. JSON数据可以转换为structpb.Struct:便于处理结构化数据

选择哪种方案取决于你的具体需求:

  • 如果数据结构固定,使用方案1(预定义消息类型)
  • 如果数据结构动态变化,使用方案2或3
  • 如果需要通用包装,使用方案4
回到顶部