Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式

Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式 大家好,

很高兴来到这里,我希望能学习和分享宝贵的经验。

我的问题是关于 Syslog 主题的。

也许有人能帮助解决以下问题:

在 gRPC 项目中

  1. 我希望从命令行或 Syslog 端口捕获数据,并使用 https://github.com/elastic/beats/tree/master/filebeat/input/syslog 来适配解析器。

  2. 然后,将所有进入 Syslog 端口(例如)的事件存储到键值存储中(我假设会是 Redis 或者其他有什么推荐吗?)

  3. 最后,我想将数据序列化为 parquet 格式,并包含日期和时间,以便能够按时间间隔搜索/查询数据以获取完整路径(并且可能存储在 clickhouse 中)。

非常希望能听到大家对此的想法和意见。

此致,

Ramin


更多关于Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这是一个非常典型的日志处理流水线设计。以下是如何在Go中实现这个流程的具体方案和代码示例:

1. 从Syslog端口接收数据

使用github.com/hpcloud/tail或直接监听Syslog端口:

package main

import (
    "fmt"
    "log"
    "net"
    "strings"
)

func startSyslogServer(port string) {
    addr, _ := net.ResolveUDPAddr("udp", ":"+port)
    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    buffer := make([]byte, 1024)
    for {
        n, _, err := conn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("Error reading from UDP: %v", err)
            continue
        }
        
        logMessage := string(buffer[:n])
        processSyslogMessage(logMessage)
    }
}

func processSyslogMessage(msg string) {
    // 解析Syslog格式(RFC3164或RFC5424)
    parts := strings.SplitN(msg, " ", 5)
    if len(parts) >= 5 {
        timestamp := parts[0] + " " + parts[1] + " " + parts[2]
        hostname := parts[3]
        content := parts[4]
        
        logEntry := map[string]interface{}{
            "timestamp": timestamp,
            "hostname":  hostname,
            "message":   content,
            "raw":       msg,
        }
        
        // 发送到处理管道
        sendToKVStore(logEntry)
    }
}

2. 存储到键值存储(Redis)

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

var rdb *redis.Client

func initRedis() {
    rdb = redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })
}

func sendToKVStore(logEntry map[string]interface{}) {
    ctx := context.Background()
    
    // 生成唯一键
    key := fmt.Sprintf("log:%s:%d", 
        logEntry["hostname"].(string), 
        time.Now().UnixNano())
    
    // 序列化为JSON
    data, err := json.Marshal(logEntry)
    if err != nil {
        log.Printf("Error marshaling log entry: %v", err)
        return
    }
    
    // 存储到Redis,设置TTL为7天
    err = rdb.Set(ctx, key, data, 7*24*time.Hour).Err()
    if err != nil {
        log.Printf("Error storing in Redis: %v", err)
    }
    
    // 同时添加到时间序列索引
    timestamp := time.Now().Unix()
    rdb.ZAdd(ctx, "logs:timestamps", &redis.Z{
        Score:  float64(timestamp),
        Member: key,
    })
}

3. 转换为Parquet格式

package main

import (
    "github.com/xitongsys/parquet-go-source/local"
    "github.com/xitongsys/parquet-go/parquet"
    "github.com/xitongsys/parquet-go/writer"
    "log"
    "time"
)

type LogRecord struct {
    Timestamp int64  `parquet:"name=timestamp, type=INT64"`
    Hostname  string `parquet:"name=hostname, type=UTF8"`
    Message   string `parquet:"name=message, type=UTF8"`
    Severity  string `parquet:"name=severity, type=UTF8"`
    Facility  string `parquet:"name=facility, type=UTF8"`
    Raw       string `parquet:"name=raw, type=UTF8"`
}

func exportToParquet(records []LogRecord, filename string) error {
    fw, err := local.NewLocalFileWriter(filename)
    if err != nil {
        return err
    }
    defer fw.Close()

    pw, err := writer.NewParquetWriter(fw, new(LogRecord), 4)
    if err != nil {
        return err
    }

    pw.CompressionType = parquet.CompressionCodec_SNAPPY
    
    for _, record := range records {
        if err = pw.Write(record); err != nil {
            return err
        }
    }
    
    if err = pw.WriteStop(); err != nil {
        return err
    }
    
    return nil
}

func batchExportToParquet() {
    ctx := context.Background()
    
    // 获取过去一小时的日志
    oneHourAgo := time.Now().Add(-1 * time.Hour).Unix()
    keys, err := rdb.ZRangeByScore(ctx, "logs:timestamps", 
        &redis.ZRangeBy{
            Min: fmt.Sprintf("%d", oneHourAgo),
            Max: "+inf",
        }).Result()
    
    if err != nil {
        log.Printf("Error getting keys: %v", err)
        return
    }
    
    var records []LogRecord
    for _, key := range keys {
        data, err := rdb.Get(ctx, key).Result()
        if err != nil {
            continue
        }
        
        var logEntry map[string]interface{}
        json.Unmarshal([]byte(data), &logEntry)
        
        record := LogRecord{
            Timestamp: parseTimestamp(logEntry["timestamp"].(string)),
            Hostname:  logEntry["hostname"].(string),
            Message:   logEntry["message"].(string),
            Raw:       logEntry["raw"].(string),
        }
        records = append(records, record)
    }
    
    // 生成文件名包含时间分区
    filename := fmt.Sprintf("logs/year=%d/month=%02d/day=%02d/logs_%d.parquet",
        time.Now().Year(),
        time.Now().Month(),
        time.Now().Day(),
        time.Now().Unix())
    
    exportToParquet(records, filename)
}

4. gRPC服务集成

package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"
    pb "your_project/logservice"
)

type LogServer struct {
    pb.UnimplementedLogServiceServer
}

func (s *LogServer) IngestLog(ctx context.Context, req *pb.LogRequest) (*pb.LogResponse, error) {
    logEntry := map[string]interface{}{
        "timestamp": req.Timestamp,
        "hostname":  req.Hostname,
        "message":   req.Message,
        "severity":  req.Severity,
        "facility":  req.Facility,
    }
    
    sendToKVStore(logEntry)
    
    return &pb.LogResponse{
        Success: true,
        MessageId: generateMessageID(),
    }, nil
}

func (s *LogServer) QueryLogs(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
    // 实现查询逻辑
    return &pb.QueryResponse{}, nil
}

func startGRPCServer() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    
    s := grpc.NewServer()
    pb.RegisterLogServiceServer(s, &LogServer{})
    
    log.Printf("gRPC server listening at %v", lis.Addr())
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

5. Protobuf定义

syntax = "proto3";

package logservice;

option go_package = "your_project/logservice";

service LogService {
    rpc IngestLog(LogRequest) returns (LogResponse);
    rpc QueryLogs(QueryRequest) returns (QueryResponse);
}

message LogRequest {
    string timestamp = 1;
    string hostname = 2;
    string message = 3;
    string severity = 4;
    string facility = 5;
}

message LogResponse {
    bool success = 1;
    string message_id = 2;
}

message QueryRequest {
    string start_time = 1;
    string end_time = 2;
    string hostname = 3;
    string severity = 4;
}

message QueryResponse {
    repeated LogEntry logs = 1;
}

message LogEntry {
    string timestamp = 1;
    string hostname = 2;
    string message = 3;
    string severity = 4;
}

6. 主程序集成

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    // 初始化Redis连接
    initRedis()
    
    // 启动gRPC服务器
    go startGRPCServer()
    
    // 启动Syslog服务器
    go startSyslogServer("514")
    
    // 定时导出到Parquet
    ticker := time.NewTicker(1 * time.Hour)
    go func() {
        for range ticker.C {
            batchExportToParquet()
        }
    }()
    
    // 等待终止信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
        <-sigChan
        log.Println("Shutting down...")
        ticker.Stop()
        
        // 清理资源
        if rdb != nil {
            rdb.Close()
        }
        
        wg.Done()
    }()
    
    wg.Wait()
    log.Println("Server stopped")
}

这个实现提供了完整的流水线:

  1. 通过UDP监听Syslog端口
  2. 解析并存储到Redis,包含时间索引
  3. 定期批量导出为Parquet格式,使用时间分区
  4. 通过gRPC提供日志摄入和查询接口

Parquet文件可以直接加载到ClickHouse,使用INSERT FROM SELECT语句或通过clickhouse-local工具。

回到顶部