Golang实现gRPC反向代理的最佳实践

Golang实现gRPC反向代理的最佳实践 我已经搜索了几个小时,但未能找到一个适用于gRPC的良好反向代理。为了更详细地说明,以下是我的问题。

详细情况如下:

  1. 一个接受gRPC请求的服务器。
  2. 一个接受客户端请求并将这些gRPC请求中继到服务器的代理。
  3. 一个连接到代理以通过gRPC获取数据的客户端。

请求的工作流程大致是:客户端 -> 代理 -> 服务器

当客户端向代理发出请求,要求通过gRPC获取一个巨大的数据集时,如果数据集大约为1GB,代理(运行在Docker容器中)的内存使用量会增加到1GB,以满足客户端的gRPC请求。如果我为运行代理的Docker容器分配较少的内存来处理该请求,容器会崩溃并出现错误代码(137),这对应于内存不足导致的终止。

代理的预期行为是,它应该能够将数据从服务器流式传输到客户端,而不会增加其自身的内存占用。

此外,当我切换到HTTP连接处理相同情况时,即使在请求数百MB的数据时,我也看不到代理的内存占用增加。

客户端示例代码:

conn, err := grpc.Dial("localhost:3080", grpc.WithInsecure(), grpc.WithDefaultCallOptions(
        grpc.MaxCallRecvMsgSize(GRPCMaxSize),
        grpc.MaxCallSendMsgSize(GRPCMaxSize),
    ))

代理示例代码(参考了grpc-proxy:https://github.com/trusch/grpc-proxy):

// To accept connections from the client
grpcServer := grpc.NewServer(
        grpc.UnknownServiceHandler(
            proxy.TransparentHandler(app.HandleGRPC)),
        grpc.MaxRecvMsgSize(GRPCMaxSize), grpc.MaxSendMsgSize(GRPCMaxSize),
    )
lis, _ := net.Listen("tcp", ":"+app.ctx.grpcPort)
grpcServer.Serve(lis)

// To relay requests to the server
conn, err := grpc.DialContext(context.Background(), "dns:///"+GrpcURL, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithDefaultCallOptions(
        grpc.MaxCallRecvMsgSize(GRPCMaxSize),
        grpc.MaxCallSendMsgSize(GRPCMaxSize),
    ))

看起来我正在使用的代理没有按预期工作。任何关于良好反向代理的提示都将非常有帮助。


更多关于Golang实现gRPC反向代理的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang实现gRPC反向代理的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


要实现高效的gRPC反向代理,关键在于使用流式传输而非缓冲整个消息。以下是基于grpc-proxy库的改进方案:

package main

import (
    "context"
    "io"
    "net"

    "github.com/trusch/grpc-proxy/proxy"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
)

// 自定义流式处理器
type streamingHandler struct {
    director proxy.StreamDirector
}

func (h *streamingHandler) Handler(srv interface{}, stream grpc.ServerStream) error {
    ctx := stream.Context()
    
    // 获取目标连接
    backendConn, err := grpc.DialContext(ctx, "backend-server:50051",
        grpc.WithInsecure(),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(1024*1024*1024), // 1GB
            grpc.MaxCallSendMsgSize(1024*1024*1024),
        ),
    )
    if err != nil {
        return status.Errorf(codes.Unavailable, "failed to connect to backend: %v", err)
    }
    defer backendConn.Close()

    // 创建客户端流
    clientStream, err := grpc.NewClientStream(ctx, 
        &grpc.StreamDesc{
            ServerStreams: true,
            ClientStreams: true,
        },
        backendConn,
        stream.Method(),
    )
    if err != nil {
        return status.Errorf(codes.Internal, "failed to create client stream: %v", err)
    }

    // 双向流式转发
    errChan := make(chan error, 2)
    
    // 从客户端转发到后端
    go func() {
        for {
            msg := &[]byte{}
            if err := stream.RecvMsg(msg); err != nil {
                if err == io.EOF {
                    clientStream.CloseSend()
                    break
                }
                errChan <- err
                return
            }
            if err := clientStream.SendMsg(msg); err != nil {
                errChan <- err
                return
            }
        }
        errChan <- nil
    }()

    // 从后端转发到客户端
    go func() {
        for {
            msg := &[]byte{}
            if err := clientStream.RecvMsg(msg); err != nil {
                if err == io.EOF {
                    break
                }
                errChan <- err
                return
            }
            if err := stream.SendMsg(msg); err != nil {
                errChan <- err
                return
            }
        }
        errChan <- nil
    }()

    // 等待任一方向完成
    for i := 0; i < 2; i++ {
        if err := <-errChan; err != nil {
            return err
        }
    }
    return nil
}

func main() {
    // 创建代理服务器
    proxyServer := grpc.NewServer(
        grpc.UnknownServiceHandler(
            proxy.TransparentHandler(func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
                md, _ := metadata.FromIncomingContext(ctx)
                ctx = metadata.NewOutgoingContext(ctx, md)
                
                conn, err := grpc.DialContext(ctx, "backend-server:50051",
                    grpc.WithInsecure(),
                    grpc.WithDefaultCallOptions(
                        grpc.MaxCallRecvMsgSize(1024*1024*1024),
                        grpc.MaxCallSendMsgSize(1024*1024*1024),
                    ),
                )
                return ctx, conn, err
            }),
        ),
    )

    // 启动服务器
    lis, _ := net.Listen("tcp", ":3080")
    proxyServer.Serve(lis)
}

对于大文件传输,建议使用服务器端流式RPC:

// 服务端流式处理示例
func (s *server) DownloadLargeFile(req *FileRequest, stream FileService_DownloadLargeFileServer) error {
    chunkSize := 1024 * 1024 // 1MB chunks
    data := make([]byte, chunkSize)
    
    for offset := 0; offset < int(req.FileSize); offset += chunkSize {
        // 从存储读取数据块
        chunk, err := readChunk(req.FileId, offset, chunkSize)
        if err != nil {
            return err
        }
        
        // 立即发送数据块
        if err := stream.Send(&FileChunk{
            Data: chunk,
        }); err != nil {
            return err
        }
    }
    return nil
}

// 代理中的流式转发
func forwardServerStream(serverStream grpc.ServerStream, clientStream grpc.ClientStream) error {
    for {
        chunk := &FileChunk{}
        if err := clientStream.RecvMsg(chunk); err != nil {
            if err == io.EOF {
                return nil
            }
            return err
        }
        if err := serverStream.SendMsg(chunk); err != nil {
            return err
        }
    }
}

关键配置优化:

// 调整gRPC连接参数
conn, err := grpc.Dial("backend:50051",
    grpc.WithInsecure(),
    grpc.WithDefaultCallOptions(
        grpc.MaxCallRecvMsgSize(1024*1024*1024*2), // 2GB
        grpc.MaxCallSendMsgSize(1024*1024*1024*2),
    ),
    grpc.WithInitialWindowSize(65535),    // 流控窗口
    grpc.WithInitialConnWindowSize(65535), // 连接窗口
    grpc.WithReadBufferSize(65535),        // 读缓冲区
    grpc.WithWriteBufferSize(65535),       // 写缓冲区
)

这个实现通过流式传输避免了内存缓冲,代理仅作为管道传递数据块,不会在内存中累积整个数据集。

回到顶部