Golang实现gRPC反向代理的最佳实践
Golang实现gRPC反向代理的最佳实践 我已经搜索了几个小时,但未能找到一个适用于gRPC的良好反向代理。为了更详细地说明,以下是我的问题。
详细情况如下:
- 一个接受gRPC请求的服务器。
- 一个接受客户端请求并将这些gRPC请求中继到服务器的代理。
- 一个连接到代理以通过gRPC获取数据的客户端。
请求的工作流程大致是:客户端 -> 代理 -> 服务器
当客户端向代理发出请求,要求通过gRPC获取一个巨大的数据集时,如果数据集大约为1GB,代理(运行在Docker容器中)的内存使用量会增加到1GB,以满足客户端的gRPC请求。如果我为运行代理的Docker容器分配较少的内存来处理该请求,容器会崩溃并出现错误代码(137),这对应于内存不足导致的终止。
代理的预期行为是,它应该能够将数据从服务器流式传输到客户端,而不会增加其自身的内存占用。
此外,当我切换到HTTP连接处理相同情况时,即使在请求数百MB的数据时,我也看不到代理的内存占用增加。
客户端示例代码:
conn, err := grpc.Dial("localhost:3080", grpc.WithInsecure(), grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(GRPCMaxSize),
grpc.MaxCallSendMsgSize(GRPCMaxSize),
))
代理示例代码(参考了grpc-proxy:https://github.com/trusch/grpc-proxy):
// To accept connections from the client
grpcServer := grpc.NewServer(
grpc.UnknownServiceHandler(
proxy.TransparentHandler(app.HandleGRPC)),
grpc.MaxRecvMsgSize(GRPCMaxSize), grpc.MaxSendMsgSize(GRPCMaxSize),
)
lis, _ := net.Listen("tcp", ":"+app.ctx.grpcPort)
grpcServer.Serve(lis)
// To relay requests to the server
conn, err := grpc.DialContext(context.Background(), "dns:///"+GrpcURL, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(GRPCMaxSize),
grpc.MaxCallSendMsgSize(GRPCMaxSize),
))
看起来我正在使用的代理没有按预期工作。任何关于良好反向代理的提示都将非常有帮助。
更多关于Golang实现gRPC反向代理的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于Golang实现gRPC反向代理的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
要实现高效的gRPC反向代理,关键在于使用流式传输而非缓冲整个消息。以下是基于grpc-proxy库的改进方案:
package main
import (
"context"
"io"
"net"
"github.com/trusch/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// 自定义流式处理器
type streamingHandler struct {
director proxy.StreamDirector
}
func (h *streamingHandler) Handler(srv interface{}, stream grpc.ServerStream) error {
ctx := stream.Context()
// 获取目标连接
backendConn, err := grpc.DialContext(ctx, "backend-server:50051",
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*1024), // 1GB
grpc.MaxCallSendMsgSize(1024*1024*1024),
),
)
if err != nil {
return status.Errorf(codes.Unavailable, "failed to connect to backend: %v", err)
}
defer backendConn.Close()
// 创建客户端流
clientStream, err := grpc.NewClientStream(ctx,
&grpc.StreamDesc{
ServerStreams: true,
ClientStreams: true,
},
backendConn,
stream.Method(),
)
if err != nil {
return status.Errorf(codes.Internal, "failed to create client stream: %v", err)
}
// 双向流式转发
errChan := make(chan error, 2)
// 从客户端转发到后端
go func() {
for {
msg := &[]byte{}
if err := stream.RecvMsg(msg); err != nil {
if err == io.EOF {
clientStream.CloseSend()
break
}
errChan <- err
return
}
if err := clientStream.SendMsg(msg); err != nil {
errChan <- err
return
}
}
errChan <- nil
}()
// 从后端转发到客户端
go func() {
for {
msg := &[]byte{}
if err := clientStream.RecvMsg(msg); err != nil {
if err == io.EOF {
break
}
errChan <- err
return
}
if err := stream.SendMsg(msg); err != nil {
errChan <- err
return
}
}
errChan <- nil
}()
// 等待任一方向完成
for i := 0; i < 2; i++ {
if err := <-errChan; err != nil {
return err
}
}
return nil
}
func main() {
// 创建代理服务器
proxyServer := grpc.NewServer(
grpc.UnknownServiceHandler(
proxy.TransparentHandler(func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)
ctx = metadata.NewOutgoingContext(ctx, md)
conn, err := grpc.DialContext(ctx, "backend-server:50051",
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*1024),
grpc.MaxCallSendMsgSize(1024*1024*1024),
),
)
return ctx, conn, err
}),
),
)
// 启动服务器
lis, _ := net.Listen("tcp", ":3080")
proxyServer.Serve(lis)
}
对于大文件传输,建议使用服务器端流式RPC:
// 服务端流式处理示例
func (s *server) DownloadLargeFile(req *FileRequest, stream FileService_DownloadLargeFileServer) error {
chunkSize := 1024 * 1024 // 1MB chunks
data := make([]byte, chunkSize)
for offset := 0; offset < int(req.FileSize); offset += chunkSize {
// 从存储读取数据块
chunk, err := readChunk(req.FileId, offset, chunkSize)
if err != nil {
return err
}
// 立即发送数据块
if err := stream.Send(&FileChunk{
Data: chunk,
}); err != nil {
return err
}
}
return nil
}
// 代理中的流式转发
func forwardServerStream(serverStream grpc.ServerStream, clientStream grpc.ClientStream) error {
for {
chunk := &FileChunk{}
if err := clientStream.RecvMsg(chunk); err != nil {
if err == io.EOF {
return nil
}
return err
}
if err := serverStream.SendMsg(chunk); err != nil {
return err
}
}
}
关键配置优化:
// 调整gRPC连接参数
conn, err := grpc.Dial("backend:50051",
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*1024*2), // 2GB
grpc.MaxCallSendMsgSize(1024*1024*1024*2),
),
grpc.WithInitialWindowSize(65535), // 流控窗口
grpc.WithInitialConnWindowSize(65535), // 连接窗口
grpc.WithReadBufferSize(65535), // 读缓冲区
grpc.WithWriteBufferSize(65535), // 写缓冲区
)
这个实现通过流式传输避免了内存缓冲,代理仅作为管道传递数据块,不会在内存中累积整个数据集。

