Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式
Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式 大家好,
很高兴来到这里,我希望能学习和分享宝贵的经验。
我的问题是关于 Syslog 主题的。
也许有人能帮助解决以下问题:
在 gRPC 项目中
-
我希望从命令行或 Syslog 端口捕获数据,并使用 https://github.com/elastic/beats/tree/master/filebeat/input/syslog 来适配解析器。
-
然后,将所有进入 Syslog 端口(例如)的事件存储到键值存储中(我假设会是 Redis 或者其他有什么推荐吗?)
-
最后,我想将数据序列化为 parquet 格式,并包含日期和时间,以便能够按时间间隔搜索/查询数据以获取完整路径(并且可能存储在 clickhouse 中)。
非常希望能听到大家对此的想法和意见。
此致,
Ramin
更多关于Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于Golang实现gRPC日志处理:从Syslog端口到键值对再到Parquet格式的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
这是一个非常典型的日志处理流水线设计。以下是如何在Go中实现这个流程的具体方案和代码示例:
1. 从Syslog端口接收数据
使用github.com/hpcloud/tail或直接监听Syslog端口:
package main
import (
"fmt"
"log"
"net"
"strings"
)
func startSyslogServer(port string) {
addr, _ := net.ResolveUDPAddr("udp", ":"+port)
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
buffer := make([]byte, 1024)
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("Error reading from UDP: %v", err)
continue
}
logMessage := string(buffer[:n])
processSyslogMessage(logMessage)
}
}
func processSyslogMessage(msg string) {
// 解析Syslog格式(RFC3164或RFC5424)
parts := strings.SplitN(msg, " ", 5)
if len(parts) >= 5 {
timestamp := parts[0] + " " + parts[1] + " " + parts[2]
hostname := parts[3]
content := parts[4]
logEntry := map[string]interface{}{
"timestamp": timestamp,
"hostname": hostname,
"message": content,
"raw": msg,
}
// 发送到处理管道
sendToKVStore(logEntry)
}
}
2. 存储到键值存储(Redis)
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
var rdb *redis.Client
func initRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
}
func sendToKVStore(logEntry map[string]interface{}) {
ctx := context.Background()
// 生成唯一键
key := fmt.Sprintf("log:%s:%d",
logEntry["hostname"].(string),
time.Now().UnixNano())
// 序列化为JSON
data, err := json.Marshal(logEntry)
if err != nil {
log.Printf("Error marshaling log entry: %v", err)
return
}
// 存储到Redis,设置TTL为7天
err = rdb.Set(ctx, key, data, 7*24*time.Hour).Err()
if err != nil {
log.Printf("Error storing in Redis: %v", err)
}
// 同时添加到时间序列索引
timestamp := time.Now().Unix()
rdb.ZAdd(ctx, "logs:timestamps", &redis.Z{
Score: float64(timestamp),
Member: key,
})
}
3. 转换为Parquet格式
package main
import (
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
"log"
"time"
)
type LogRecord struct {
Timestamp int64 `parquet:"name=timestamp, type=INT64"`
Hostname string `parquet:"name=hostname, type=UTF8"`
Message string `parquet:"name=message, type=UTF8"`
Severity string `parquet:"name=severity, type=UTF8"`
Facility string `parquet:"name=facility, type=UTF8"`
Raw string `parquet:"name=raw, type=UTF8"`
}
func exportToParquet(records []LogRecord, filename string) error {
fw, err := local.NewLocalFileWriter(filename)
if err != nil {
return err
}
defer fw.Close()
pw, err := writer.NewParquetWriter(fw, new(LogRecord), 4)
if err != nil {
return err
}
pw.CompressionType = parquet.CompressionCodec_SNAPPY
for _, record := range records {
if err = pw.Write(record); err != nil {
return err
}
}
if err = pw.WriteStop(); err != nil {
return err
}
return nil
}
func batchExportToParquet() {
ctx := context.Background()
// 获取过去一小时的日志
oneHourAgo := time.Now().Add(-1 * time.Hour).Unix()
keys, err := rdb.ZRangeByScore(ctx, "logs:timestamps",
&redis.ZRangeBy{
Min: fmt.Sprintf("%d", oneHourAgo),
Max: "+inf",
}).Result()
if err != nil {
log.Printf("Error getting keys: %v", err)
return
}
var records []LogRecord
for _, key := range keys {
data, err := rdb.Get(ctx, key).Result()
if err != nil {
continue
}
var logEntry map[string]interface{}
json.Unmarshal([]byte(data), &logEntry)
record := LogRecord{
Timestamp: parseTimestamp(logEntry["timestamp"].(string)),
Hostname: logEntry["hostname"].(string),
Message: logEntry["message"].(string),
Raw: logEntry["raw"].(string),
}
records = append(records, record)
}
// 生成文件名包含时间分区
filename := fmt.Sprintf("logs/year=%d/month=%02d/day=%02d/logs_%d.parquet",
time.Now().Year(),
time.Now().Month(),
time.Now().Day(),
time.Now().Unix())
exportToParquet(records, filename)
}
4. gRPC服务集成
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "your_project/logservice"
)
type LogServer struct {
pb.UnimplementedLogServiceServer
}
func (s *LogServer) IngestLog(ctx context.Context, req *pb.LogRequest) (*pb.LogResponse, error) {
logEntry := map[string]interface{}{
"timestamp": req.Timestamp,
"hostname": req.Hostname,
"message": req.Message,
"severity": req.Severity,
"facility": req.Facility,
}
sendToKVStore(logEntry)
return &pb.LogResponse{
Success: true,
MessageId: generateMessageID(),
}, nil
}
func (s *LogServer) QueryLogs(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
// 实现查询逻辑
return &pb.QueryResponse{}, nil
}
func startGRPCServer() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterLogServiceServer(s, &LogServer{})
log.Printf("gRPC server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
5. Protobuf定义
syntax = "proto3";
package logservice;
option go_package = "your_project/logservice";
service LogService {
rpc IngestLog(LogRequest) returns (LogResponse);
rpc QueryLogs(QueryRequest) returns (QueryResponse);
}
message LogRequest {
string timestamp = 1;
string hostname = 2;
string message = 3;
string severity = 4;
string facility = 5;
}
message LogResponse {
bool success = 1;
string message_id = 2;
}
message QueryRequest {
string start_time = 1;
string end_time = 2;
string hostname = 3;
string severity = 4;
}
message QueryResponse {
repeated LogEntry logs = 1;
}
message LogEntry {
string timestamp = 1;
string hostname = 2;
string message = 3;
string severity = 4;
}
6. 主程序集成
package main
import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
// 初始化Redis连接
initRedis()
// 启动gRPC服务器
go startGRPCServer()
// 启动Syslog服务器
go startSyslogServer("514")
// 定时导出到Parquet
ticker := time.NewTicker(1 * time.Hour)
go func() {
for range ticker.C {
batchExportToParquet()
}
}()
// 等待终止信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-sigChan
log.Println("Shutting down...")
ticker.Stop()
// 清理资源
if rdb != nil {
rdb.Close()
}
wg.Done()
}()
wg.Wait()
log.Println("Server stopped")
}
这个实现提供了完整的流水线:
- 通过UDP监听Syslog端口
- 解析并存储到Redis,包含时间索引
- 定期批量导出为Parquet格式,使用时间分区
- 通过gRPC提供日志摄入和查询接口
Parquet文件可以直接加载到ClickHouse,使用INSERT FROM SELECT语句或通过clickhouse-local工具。

