golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用

Golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用

slog-multi 提供了Go结构化日志(slog)的高级组合模式。它使您能够通过结合不同的分发、路由、转换和错误处理策略的多个处理程序来构建复杂的日志工作流。

🎯 功能特性

  • 🔄 Fanout: 并行分发日志到多个处理程序
  • 🛣️ Router: 基于自定义条件路由日志
  • 🔄 Failover: 具有自动回退功能的高可用日志记录
  • ⚖️ Load Balancing: 在多个处理程序之间分配负载
  • 🔗 Pipeline: 使用中间件链转换和过滤日志
  • 🛡️ Error Recovery: 优雅处理日志记录失败

🚀 安装

go get github.com/samber/slog-multi

兼容性: Go >= 1.21

💡 使用示例

广播模式: slogmulti.Fanout()

并行分发日志到多个slog.Handler以获得最大吞吐量和冗余。

import (
    "net"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
    "os"
    "time"
)

func main() {
    logstash, _ := net.Dial("tcp", "logstash.acme:4242")    // 使用github.com/netbrain/goautosocket实现自动重连
    datadogHandler := slogdatadog.NewDatadogHandler(slogdatadog.Option{
        APIKey: "your-api-key",
        Service: "my-service",
    })
    stderr := os.Stderr

    logger := slog.New(
        slogmulti.Fanout(
            slog.NewJSONHandler(logstash, &slog.HandlerOptions{}),  // 传递给第一个处理程序: 通过TCP的logstash
            slog.NewTextHandler(stderr, &slog.HandlerOptions{}),    // 然后传递给第二个处理程序: stderr
            datadogHandler,
            // ...
        ),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        With("error", fmt.Errorf("an error")).
        Error("A message")
}

路由模式: slogmulti.Router()

基于自定义条件(如日志级别、属性或业务逻辑)将日志分发到所有匹配的slog.Handler

import (
    "context"
    slogmulti "github.com/samber/slog-multi"
    slogslack "github.com/samber/slog-slack"
    "log/slog"
    "os"
)

func main() {
    slackChannelUS := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-us"}.NewSlackHandler()
    slackChannelEU := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-eu"}.NewSlackHandler()
    slackChannelAPAC := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-apac"}.NewSlackHandler()

    consoleHandler := slog.NewTextHandler(os.Stderr, nil)

    logger := slog.New(
        slogmulti.Router().
            Add(slackChannelUS, recordMatchRegion("us")).
            Add(slackChannelEU, recordMatchRegion("eu")).
            Add(slackChannelAPAC, recordMatchRegion("apac")).
            Add(consoleHandler, slogmulti.Level(slog.LevelInfo)).
            Handler(),
    )

    logger.
        With("region", "us").
        With("pool", "us-east-1").
        Error("Server desynchronized")
}

func recordMatchRegion(region string) func(ctx context.Context, r slog.Record) bool {
    return func(ctx context.Context, r slog.Record) bool {
        ok := false

        r.Attrs(func(attr slog.Attr) bool {
            if attr.Key == "region" && attr.Value.Kind() == slog.KindString && attr.Value.String() == region {
                ok = true
                return false
            }

            return true
        })

        return ok
    }
}

故障转移模式: slogmulti.Failover()

按顺序尝试多个处理程序直到一个成功,确保日志记录的可靠性。

import (
    "net"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
    "os"
    "time"
)

func main() {
    // 创建到多个日志服务器的连接
    logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
    logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
    logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")

    logger := slog.New(
        slogmulti.Failover()(
            slog.HandlerOptions{}.NewJSONHandler(logstash1, nil),    // 主
            slog.HandlerOptions{}.NewJSONHandler(logstash2, nil),    // 次
            slog.HandlerOptions{}.NewJSONHandler(logstash3, nil),    // 第三
        ),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        With("error", fmt.Errorf("an error")).
        Error("A message")
}

负载均衡: slogmulti.Pool()

使用带随机化的轮询在多个处理程序之间分配日志记录负载。

import (
    "net"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
    "os"
    "time"
)

func main() {
    // 创建多个日志服务器连接
    logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
    logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
    logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")

    logger := slog.New(
        slogmulti.Pool()(
            // 每个日志将随机选择一个处理程序
            slog.HandlerOptions{}.NewJSONHandler(logstash1, nil),
            slog.HandlerOptions{}.NewJSONHandler(logstash2, nil),
            slog.HandlerOptions{}.NewJSONHandler(logstash3, nil),
        ),
    )

    // 高容量日志记录
    for i := 0; i < 1000; i++ {
        logger.
            With(
                slog.Group("user",
                    slog.String("id", "user-123"),
                    slog.Time("created_at", time.Now()),
                ),
            ).
            With("environment", "dev").
            With("error", fmt.Errorf("an error")).
            Error("A message")
    }
}

错误恢复: slogmulti.RecoverHandlerError()

优雅地处理日志记录失败而不使应用程序崩溃。

import (
    "context"
    slogformatter "github.com/samber/slog-formatter"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
    "os"
)

recovery := slogmulti.RecoverHandlerError(
    func(ctx context.Context, record slog.Record, err error) {
        // 仅当后续处理程序失败或返回错误时调用
        log.Println(err.Error())
    },
)
sink := NewSinkHandler(...)

logger := slog.New(
    slogmulti.
        Pipe(recovery).
        Handler(sink),
)

err := fmt.Errorf("an error")
logger.Error("a message",
    slog.Any("very_private_data", "abcd"),
    slog.Any("user", user),
    slog.Any("err", err))

// 输出:
// time=2023-04-10T14:00:0.000000+00:00 level=ERROR msg="a message" error.message="an error" error.type="*errors.errorString" user="John doe" very_private_data="********"

管道处理: slogmulti.Pipe()

使用中间件链转换和过滤日志。

import (
    "context"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
    "os"
    "time"
)

func main() {
    // 第一个中间件: 将Go错误类型格式化为结构化对象
    errorFormattingMiddleware := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
        record.Attrs(func(attr slog.Attr) bool {
            if attr.Key == "error" && attr.Value.Kind() == slog.KindAny {
                if err, ok := attr.Value.Any().(error); ok {
                    record.AddAttrs(
                        slog.String("error_type", "error"),
                        slog.String("error_message", err.Error()),
                    )
                }
            }
            return true
        })
        return next(ctx, record)
    })

    // 第二个中间件: 移除PII
    gdprMiddleware := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
        record.Attrs(func(attr slog.Attr) bool {
            if attr.Key == "email" || attr.Key == "phone" || attr.Key == "created_at" {
                record.AddAttrs(slog.String(attr.Key, "*********"))
            }
            return true
        })
        return next(ctx, record)
    })

    // 最终处理程序
    sink := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{})

    logger := slog.New(
        slogmulti.
            Pipe(errorFormattingMiddleware).
            Pipe(gdprMiddleware).
            // ...
            Handler(sink),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.String("email", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        Error("A message",
            slog.String("foo", "bar"),
            slog.Any("error", fmt.Errorf("an error")),
        )
}

🔧 高级模式

自定义中间件

中间件必须匹配以下原型:

type Middleware func(slog.Handler) slog.Handler

内联处理程序

内联处理程序提供了一种快捷方式来实现slog.Handler而无需创建完整的结构体实现。

mdw := slogmulti.NewHandleInlineHandler(
    // 模拟"Handle()"方法
    func(ctx context.Context, groups []string, attrs []slog.Attr, record slog.Record) error {
        // 自定义逻辑
        // [...]
        return nil
    },
)

内联中间件

内联中间件提供了钩住特定方法的快捷方式。

钩住Enabled()方法

middleware := slogmulti.NewEnabledInlineMiddleware(func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool{
    // 调用next前的自定义逻辑
    if level == slog.LevelDebug {
        return false // 跳过调试日志
    }
    return next(ctx, level)
})

钩住Handle()方法

middleware := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
    // 为所有日志添加时间戳
    record.AddAttrs(slog.Time("logged_at", time.Now()))
    return next(ctx, record)
})

💡 最佳实践

性能考虑

  • 谨慎使用Fanout: 广播到多个处理程序可能影响性能
  • 实现采样: 对于高容量日志,考虑采样策略
  • 监控处理程序性能: 一些处理程序(如基于网络的)可能很慢
  • 使用缓冲: 考虑为基于网络的处理程序使用缓冲

错误处理

  • 始终使用错误恢复: 用RecoverHandlerError包装处理程序
  • 实现回退: 对关键日志使用故障转移模式
  • 监控日志记录失败: 跟踪日志记录失败以识别问题

安全和隐私

  • 编辑敏感数据: 使用中间件移除PII和秘密
  • 验证日志内容: 确保日志不包含敏感信息
  • 使用安全连接: 对于基于网络的处理程序,使用TLS

监控和可观察性

  • 添加关联ID: 在日志中包含请求ID以进行跟踪
  • 结构化日志: 一致使用slog的结构化日志功能
  • 日志级别: 对不同类型的信息使用适当的日志级别

🤝 贡献

# 安装一些开发依赖
make tools

# 运行测试
make test
# 或
make watch-test

📝 许可证

版权所有 © 2023 Samuel Berthe

本项目采用MIT许可证。


更多关于golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现slog.Handler链式处理与管道分发的插件库slog-multi的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


slog-multi: Go slog 的链式处理与管道分发插件库

slog-multi 是一个为 Go 标准库 log/slog 设计的 Handler 组合工具库,它提供了多种 Handler 组合模式,可以实现日志的链式处理、并行分发等高级功能。

主要特性

  1. 链式处理:将多个 Handler 串联起来,形成处理流水线
  2. 并行分发:将日志消息同时发送到多个 Handler
  3. 条件路由:根据条件将日志路由到不同的 Handler
  4. 中间件模式:方便地包装和扩展 Handler 功能

安装

go get github.com/samber/slog-multi@v1

基本用法

1. 链式处理 (Pipeline)

package main

import (
	"log/slog"
	"os"

	"github.com/samber/slog-multi"
)

func main() {
	// 创建多个 Handler
	consoleHandler := slog.NewTextHandler(os.Stdout, nil)
	fileHandler := slog.NewJSONHandler(os.Stderr, nil)

	// 创建链式 Handler
	handler := slogmulti.Pipeline(
		slogmulti.NewMiddleware(func(next slog.Handler) slog.Handler {
			return next
		}),
		consoleHandler,
		fileHandler,
	)

	// 使用组合 Handler
	logger := slog.New(handler)
	logger.Info("hello world")
}

2. 并行分发 (Fanout)

package main

import (
	"log/slog"
	"os"

	"github.com/samber/slog-multi"
)

func main() {
	// 创建多个 Handler
	consoleHandler := slog.NewTextHandler(os.Stdout, nil)
	fileHandler := slog.NewJSONHandler(os.Stderr, nil)

	// 创建并行分发 Handler
	handler := slogmulti.Fanout(
		consoleHandler,
		fileHandler,
	)

	// 使用组合 Handler
	logger := slog.New(handler)
	logger.Info("hello world") // 日志会同时输出到控制台和文件
}

3. 条件路由 (Router)

package main

import (
	"log/slog"
	"os"

	"github.com/samber/slog-multi"
)

func main() {
	// 创建多个 Handler
	consoleHandler := slog.NewTextHandler(os.Stdout, nil)
	fileHandler := slog.NewJSONHandler(os.Stderr, nil)

	// 创建条件路由 Handler
	handler := slogmulti.Router().
		Add(slog.LevelError, fileHandler).  // 错误级别日志写入文件
		Add(slog.LevelInfo, consoleHandler). // 信息级别日志输出到控制台
		Else(consoleHandler)                // 其他情况默认输出到控制台

	// 使用组合 Handler
	logger := slog.New(handler)
	logger.Info("this goes to console")
	logger.Error("this goes to file")
}

4. 自定义中间件

package main

import (
	"context"
	"log/slog"
	"os"
	"time"

	"github.com/samber/slog-multi"
)

func TimingMiddleware(next slog.Handler) slog.Handler {
	return slog.HandlerFunc(func(ctx context.Context, r slog.Record) error {
		start := time.Now()
		defer func() {
			r.Add("duration", time.Since(start))
		}()
		return next.Handle(ctx, r)
	})
}

func main() {
	handler := slogmulti.Pipeline(
		TimingMiddleware,
		slog.NewTextHandler(os.Stdout, nil),
	)

	logger := slog.New(handler)
	logger.Info("hello world") // 输出会包含执行时间
}

高级组合示例

package main

import (
	"log/slog"
	"os"

	"github.com/samber/slog-multi"
)

func main() {
	// 创建多个 Handler
	consoleHandler := slog.NewTextHandler(os.Stdout, nil)
	fileHandler := slog.NewJSONHandler(os.Stderr, nil)
	metricsHandler := NewMetricsHandler() // 假设的自定义Handler

	// 构建复杂的日志处理流程
	handler := slogmulti.Router().
		Add(slog.LevelError, 
			slogmulti.Pipeline(
				AddStackTraceMiddleware,
				slogmulti.Fanout(
					fileHandler,
					NewAlertHandler(), // 假设的告警Handler
				),
			),
		).
		Add(slog.LevelInfo,
			slogmulti.Fanout(
				consoleHandler,
				metricsHandler,
			),
		).
		Else(consoleHandler)

	logger := slog.New(handler)
	logger.Info("regular info message")
	logger.Error("critical error occurred")
}

性能考虑

slog-multi 的设计考虑了性能因素:

  1. 零分配:在大多数情况下避免内存分配
  2. 轻量级:中间件和组合逻辑非常轻量
  3. 并行处理:Fanout 模式下的 Handler 是并行执行的

总结

slog-multi 为 Go 的标准日志库 slog 提供了强大的组合能力,使得日志处理可以像中间件管道一样灵活组合。无论是简单的日志分流,还是复杂的条件处理流程,都可以通过简洁的 API 实现。

对于需要复杂日志处理策略的项目,slog-multi 是一个值得考虑的工具,它既保持了 slog 的简洁性,又扩展了其功能性。

回到顶部