golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用
Golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用
slog-multi 提供了Go结构化日志(slog)的高级组合模式。它使您能够通过结合不同的分发、路由、转换和错误处理策略的多个处理程序来构建复杂的日志工作流。
🎯 功能特性
- 🔄 Fanout: 并行分发日志到多个处理程序
- 🛣️ Router: 基于自定义条件路由日志
- 🔄 Failover: 具有自动回退功能的高可用日志记录
- ⚖️ Load Balancing: 在多个处理程序之间分配负载
- 🔗 Pipeline: 使用中间件链转换和过滤日志
- 🛡️ Error Recovery: 优雅处理日志记录失败
🚀 安装
go get github.com/samber/slog-multi
兼容性: Go >= 1.21
💡 使用示例
广播模式: slogmulti.Fanout()
并行分发日志到多个slog.Handler
以获得最大吞吐量和冗余。
import (
"net"
slogmulti "github.com/samber/slog-multi"
"log/slog"
"os"
"time"
)
func main() {
logstash, _ := net.Dial("tcp", "logstash.acme:4242") // 使用github.com/netbrain/goautosocket实现自动重连
datadogHandler := slogdatadog.NewDatadogHandler(slogdatadog.Option{
APIKey: "your-api-key",
Service: "my-service",
})
stderr := os.Stderr
logger := slog.New(
slogmulti.Fanout(
slog.NewJSONHandler(logstash, &slog.HandlerOptions{}), // 传递给第一个处理程序: 通过TCP的logstash
slog.NewTextHandler(stderr, &slog.HandlerOptions{}), // 然后传递给第二个处理程序: stderr
datadogHandler,
// ...
),
)
logger.
With(
slog.Group("user",
slog.String("id", "user-123"),
slog.Time("created_at", time.Now()),
),
).
With("environment", "dev").
With("error", fmt.Errorf("an error")).
Error("A message")
}
路由模式: slogmulti.Router()
基于自定义条件(如日志级别、属性或业务逻辑)将日志分发到所有匹配的slog.Handler
。
import (
"context"
slogmulti "github.com/samber/slog-multi"
slogslack "github.com/samber/slog-slack"
"log/slog"
"os"
)
func main() {
slackChannelUS := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-us"}.NewSlackHandler()
slackChannelEU := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-eu"}.NewSlackHandler()
slackChannelAPAC := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-apac"}.NewSlackHandler()
consoleHandler := slog.NewTextHandler(os.Stderr, nil)
logger := slog.New(
slogmulti.Router().
Add(slackChannelUS, recordMatchRegion("us")).
Add(slackChannelEU, recordMatchRegion("eu")).
Add(slackChannelAPAC, recordMatchRegion("apac")).
Add(consoleHandler, slogmulti.Level(slog.LevelInfo)).
Handler(),
)
logger.
With("region", "us").
With("pool", "us-east-1").
Error("Server desynchronized")
}
func recordMatchRegion(region string) func(ctx context.Context, r slog.Record) bool {
return func(ctx context.Context, r slog.Record) bool {
ok := false
r.Attrs(func(attr slog.Attr) bool {
if attr.Key == "region" && attr.Value.Kind() == slog.KindString && attr.Value.String() == region {
ok = true
return false
}
return true
})
return ok
}
}
故障转移模式: slogmulti.Failover()
按顺序尝试多个处理程序直到一个成功,确保日志记录的可靠性。
import (
"net"
slogmulti "github.com/samber/slog-multi"
"log/slog"
"os"
"time"
)
func main() {
// 创建到多个日志服务器的连接
logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")
logger := slog.New(
slogmulti.Failover()(
slog.HandlerOptions{}.NewJSONHandler(logstash1, nil), // 主
slog.HandlerOptions{}.NewJSONHandler(logstash2, nil), // 次
slog.HandlerOptions{}.NewJSONHandler(logstash3, nil), // 第三
),
)
logger.
With(
slog.Group("user",
slog.String("id", "user-123"),
slog.Time("created_at", time.Now()),
),
).
With("environment", "dev").
With("error", fmt.Errorf("an error")).
Error("A message")
}
负载均衡: slogmulti.Pool()
使用带随机化的轮询在多个处理程序之间分配日志记录负载。
import (
"net"
slogmulti "github.com/samber/slog-multi"
"log/slog"
"os"
"time"
)
func main() {
// 创建多个日志服务器连接
logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")
logger := slog.New(
slogmulti.Pool()(
// 每个日志将随机选择一个处理程序
slog.HandlerOptions{}.NewJSONHandler(logstash1, nil),
slog.HandlerOptions{}.NewJSONHandler(logstash2, nil),
slog.HandlerOptions{}.NewJSONHandler(logstash3, nil),
),
)
// 高容量日志记录
for i := 0; i < 1000; i++ {
logger.
With(
slog.Group("user",
slog.String("id", "user-123"),
slog.Time("created_at", time.Now()),
),
).
With("environment", "dev").
With("error", fmt.Errorf("an error")).
Error("A message")
}
}
错误恢复: slogmulti.RecoverHandlerError()
优雅地处理日志记录失败而不使应用程序崩溃。
import (
"context"
slogformatter "github.com/samber/slog-formatter"
slogmulti "github.com/samber/slog-multi"
"log/slog"
"os"
)
recovery := slogmulti.RecoverHandlerError(
func(ctx context.Context, record slog.Record, err error) {
// 仅当后续处理程序失败或返回错误时调用
log.Println(err.Error())
},
)
sink := NewSinkHandler(...)
logger := slog.New(
slogmulti.
Pipe(recovery).
Handler(sink),
)
err := fmt.Errorf("an error")
logger.Error("a message",
slog.Any("very_private_data", "abcd"),
slog.Any("user", user),
slog.Any("err", err))
// 输出:
// time=2023-04-10T14:00:0.000000+00:00 level=ERROR msg="a message" error.message="an error" error.type="*errors.errorString" user="John doe" very_private_data="********"
管道处理: slogmulti.Pipe()
使用中间件链转换和过滤日志。
import (
"context"
slogmulti "github.com/samber/slog-multi"
"log/slog"
"os"
"time"
)
func main() {
// 第一个中间件: 将Go错误类型格式化为结构化对象
errorFormattingMiddleware := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
record.Attrs(func(attr slog.Attr) bool {
if attr.Key == "error" && attr.Value.Kind() == slog.KindAny {
if err, ok := attr.Value.Any().(error); ok {
record.AddAttrs(
slog.String("error_type", "error"),
slog.String("error_message", err.Error()),
)
}
}
return true
})
return next(ctx, record)
})
// 第二个中间件: 移除PII
gdprMiddleware := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
record.Attrs(func(attr slog.Attr) bool {
if attr.Key == "email" || attr.Key == "phone" || attr.Key == "created_at" {
record.AddAttrs(slog.String(attr.Key, "*********"))
}
return true
})
return next(ctx, record)
})
// 最终处理程序
sink := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{})
logger := slog.New(
slogmulti.
Pipe(errorFormattingMiddleware).
Pipe(gdprMiddleware).
// ...
Handler(sink),
)
logger.
With(
slog.Group("user",
slog.String("id", "user-123"),
slog.String("email", "user-123"),
slog.Time("created_at", time.Now()),
),
).
With("environment", "dev").
Error("A message",
slog.String("foo", "bar"),
slog.Any("error", fmt.Errorf("an error")),
)
}
🔧 高级模式
自定义中间件
中间件必须匹配以下原型:
type Middleware func(slog.Handler) slog.Handler
内联处理程序
内联处理程序提供了一种快捷方式来实现slog.Handler
而无需创建完整的结构体实现。
mdw := slogmulti.NewHandleInlineHandler(
// 模拟"Handle()"方法
func(ctx context.Context, groups []string, attrs []slog.Attr, record slog.Record) error {
// 自定义逻辑
// [...]
return nil
},
)
内联中间件
内联中间件提供了钩住特定方法的快捷方式。
钩住Enabled()
方法
middleware := slogmulti.NewEnabledInlineMiddleware(func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool{
// 调用next前的自定义逻辑
if level == slog.LevelDebug {
return false // 跳过调试日志
}
return next(ctx, level)
})
钩住Handle()
方法
middleware := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
// 为所有日志添加时间戳
record.AddAttrs(slog.Time("logged_at", time.Now()))
return next(ctx, record)
})
💡 最佳实践
性能考虑
- 谨慎使用Fanout: 广播到多个处理程序可能影响性能
- 实现采样: 对于高容量日志,考虑采样策略
- 监控处理程序性能: 一些处理程序(如基于网络的)可能很慢
- 使用缓冲: 考虑为基于网络的处理程序使用缓冲
错误处理
- 始终使用错误恢复: 用
RecoverHandlerError
包装处理程序 - 实现回退: 对关键日志使用故障转移模式
- 监控日志记录失败: 跟踪日志记录失败以识别问题
安全和隐私
- 编辑敏感数据: 使用中间件移除PII和秘密
- 验证日志内容: 确保日志不包含敏感信息
- 使用安全连接: 对于基于网络的处理程序,使用TLS
监控和可观察性
- 添加关联ID: 在日志中包含请求ID以进行跟踪
- 结构化日志: 一致使用slog的结构化日志功能
- 日志级别: 对不同类型的信息使用适当的日志级别
🤝 贡献
- 在Twitter上联系我 @samuelberthe
- Fork 项目
- 修复问题或请求新功能
# 安装一些开发依赖
make tools
# 运行测试
make test
# 或
make watch-test
📝 许可证
版权所有 © 2023 Samuel Berthe。
本项目采用MIT许可证。
更多关于golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
slog-multi: Go slog 的链式处理与管道分发插件库
slog-multi
是一个为 Go 标准库 log/slog
设计的 Handler 组合工具库,它提供了多种 Handler 组合模式,可以实现日志的链式处理、并行分发等高级功能。
主要特性
- 链式处理:将多个 Handler 串联起来,形成处理流水线
- 并行分发:将日志消息同时发送到多个 Handler
- 条件路由:根据条件将日志路由到不同的 Handler
- 中间件模式:方便地包装和扩展 Handler 功能
安装
go get github.com/samber/slog-multi@v1
基本用法
1. 链式处理 (Pipeline)
package main
import (
"log/slog"
"os"
"github.com/samber/slog-multi"
)
func main() {
// 创建多个 Handler
consoleHandler := slog.NewTextHandler(os.Stdout, nil)
fileHandler := slog.NewJSONHandler(os.Stderr, nil)
// 创建链式 Handler
handler := slogmulti.Pipeline(
slogmulti.NewMiddleware(func(next slog.Handler) slog.Handler {
return next
}),
consoleHandler,
fileHandler,
)
// 使用组合 Handler
logger := slog.New(handler)
logger.Info("hello world")
}
2. 并行分发 (Fanout)
package main
import (
"log/slog"
"os"
"github.com/samber/slog-multi"
)
func main() {
// 创建多个 Handler
consoleHandler := slog.NewTextHandler(os.Stdout, nil)
fileHandler := slog.NewJSONHandler(os.Stderr, nil)
// 创建并行分发 Handler
handler := slogmulti.Fanout(
consoleHandler,
fileHandler,
)
// 使用组合 Handler
logger := slog.New(handler)
logger.Info("hello world") // 日志会同时输出到控制台和文件
}
3. 条件路由 (Router)
package main
import (
"log/slog"
"os"
"github.com/samber/slog-multi"
)
func main() {
// 创建多个 Handler
consoleHandler := slog.NewTextHandler(os.Stdout, nil)
fileHandler := slog.NewJSONHandler(os.Stderr, nil)
// 创建条件路由 Handler
handler := slogmulti.Router().
Add(slog.LevelError, fileHandler). // 错误级别日志写入文件
Add(slog.LevelInfo, consoleHandler). // 信息级别日志输出到控制台
Else(consoleHandler) // 其他情况默认输出到控制台
// 使用组合 Handler
logger := slog.New(handler)
logger.Info("this goes to console")
logger.Error("this goes to file")
}
4. 自定义中间件
package main
import (
"context"
"log/slog"
"os"
"time"
"github.com/samber/slog-multi"
)
func TimingMiddleware(next slog.Handler) slog.Handler {
return slog.HandlerFunc(func(ctx context.Context, r slog.Record) error {
start := time.Now()
defer func() {
r.Add("duration", time.Since(start))
}()
return next.Handle(ctx, r)
})
}
func main() {
handler := slogmulti.Pipeline(
TimingMiddleware,
slog.NewTextHandler(os.Stdout, nil),
)
logger := slog.New(handler)
logger.Info("hello world") // 输出会包含执行时间
}
高级组合示例
package main
import (
"log/slog"
"os"
"github.com/samber/slog-multi"
)
func main() {
// 创建多个 Handler
consoleHandler := slog.NewTextHandler(os.Stdout, nil)
fileHandler := slog.NewJSONHandler(os.Stderr, nil)
metricsHandler := NewMetricsHandler() // 假设的自定义Handler
// 构建复杂的日志处理流程
handler := slogmulti.Router().
Add(slog.LevelError,
slogmulti.Pipeline(
AddStackTraceMiddleware,
slogmulti.Fanout(
fileHandler,
NewAlertHandler(), // 假设的告警Handler
),
),
).
Add(slog.LevelInfo,
slogmulti.Fanout(
consoleHandler,
metricsHandler,
),
).
Else(consoleHandler)
logger := slog.New(handler)
logger.Info("regular info message")
logger.Error("critical error occurred")
}
性能考虑
slog-multi
的设计考虑了性能因素:
- 零分配:在大多数情况下避免内存分配
- 轻量级:中间件和组合逻辑非常轻量
- 并行处理:Fanout 模式下的 Handler 是并行执行的
总结
slog-multi
为 Go 的标准日志库 slog
提供了强大的组合能力,使得日志处理可以像中间件管道一样灵活组合。无论是简单的日志分流,还是复杂的条件处理流程,都可以通过简洁的 API 实现。
对于需要复杂日志处理策略的项目,slog-multi
是一个值得考虑的工具,它既保持了 slog
的简洁性,又扩展了其功能性。