Golang中如何高效分离日志记录与应用程序

Golang中如何高效分离日志记录与应用程序 我想请教一个建议,因为我之前还没有遇到过这样的任务。

我从服务器流式传输数据,将其记录到文件中,同时进行一些实时计算(两者都是用Go语言编写的)。问题在于,每当计算中出现错误时,程序会崩溃,流式传输和记录也会随之停止。我该如何有效地将流式传输和计算应用程序分离开来,使得计算应用程序能够使用流式传输的数据?我想到的一个解决方案是使用Linux中的tail命令,但这已经是一种外部解决方案,可能带来很大的开销。非常感谢任何建议 🙏

4 回复

感谢您的建议,我会研究一下,并且我认为应该能够实现它。如果可能的话,有一个简单的例子就更好了……非常感谢!!

更多关于Golang中如何高效分离日志记录与应用程序的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


编写一个通道管道。在执行计算的 goroutine 中,使用 defer 进行 panic 恢复。如果发生 panic,记录错误并继续从通道读取,以便流处理能够继续进行。

我想感谢 @mje 为我指明了正确的方向,并补充这篇文章,它极大地帮助我解决了我的问题。非常感谢!!

在Go中实现日志记录与应用程序的分离,可以通过并发模型和管道模式来高效解决。以下是一个示例方案:

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "os"
    "sync"
    "time"
)

// 日志记录器
type Logger struct {
    file   *os.File
    writer *bufio.Writer
    ch     chan []byte
    wg     sync.WaitGroup
}

func NewLogger(filename string) (*Logger, error) {
    file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if err != nil {
        return nil, err
    }
    
    l := &Logger{
        file:   file,
        writer: bufio.NewWriter(file),
        ch:     make(chan []byte, 1000), // 缓冲通道
    }
    
    l.wg.Add(1)
    go l.process()
    return l, nil
}

func (l *Logger) Write(data []byte) {
    select {
    case l.ch <- data:
    default:
        // 缓冲区满时丢弃数据或采取其他策略
        fmt.Println("日志缓冲区已满,丢弃数据")
    }
}

func (l *Logger) process() {
    defer l.wg.Done()
    
    for data := range l.ch {
        _, err := l.writer.Write(data)
        if err != nil {
            fmt.Printf("写入日志失败: %v\n", err)
        }
        l.writer.WriteByte('\n')
    }
}

func (l *Logger) Close() {
    close(l.ch)
    l.wg.Wait()
    l.writer.Flush()
    l.file.Close()
}

// 数据流处理器
type StreamProcessor struct {
    logger *Logger
    calcCh chan map[string]interface{}
    wg     sync.WaitGroup
}

func NewStreamProcessor(logger *Logger) *StreamProcessor {
    sp := &StreamProcessor{
        logger: logger,
        calcCh: make(chan map[string]interface{}, 100),
    }
    
    sp.wg.Add(1)
    go sp.calculate()
    return sp
}

func (sp *StreamProcessor) Process(data map[string]interface{}) {
    // 记录日志
    jsonData, _ := json.Marshal(data)
    sp.logger.Write(jsonData)
    
    // 发送到计算通道
    select {
    case sp.calcCh <- data:
    default:
        fmt.Println("计算缓冲区已满,丢弃数据")
    }
}

func (sp *StreamProcessor) calculate() {
    defer sp.wg.Done()
    
    for data := range sp.calcCh {
        // 模拟计算过程
        if err := performCalculation(data); err != nil {
            fmt.Printf("计算错误: %v\n", err)
            // 错误不会影响日志记录
            continue
        }
    }
}

func performCalculation(data map[string]interface{}) error {
    // 模拟可能失败的计算
    if value, ok := data["critical"].(float64); ok && value > 0.9 {
        return fmt.Errorf("计算值超出阈值: %v", value)
    }
    return nil
}

func (sp *StreamProcessor) Close() {
    close(sp.calcCh)
    sp.wg.Wait()
}

// 主应用程序
func main() {
    // 初始化日志记录器
    logger, err := NewLogger("stream.log")
    if err != nil {
        panic(err)
    }
    defer logger.Close()
    
    // 初始化流处理器
    processor := NewStreamProcessor(logger)
    defer processor.Close()
    
    // 模拟数据流
    for i := 0; i < 100; i++ {
        data := map[string]interface{}{
            "timestamp": time.Now().UnixNano(),
            "value":     float64(i) / 100.0,
            "critical":  float64(i) / 99.0,
        }
        
        processor.Process(data)
        time.Sleep(10 * time.Millisecond)
    }
}

这个方案的关键特性:

  1. 分离的goroutine:日志记录和计算在不同的goroutine中运行
  2. 缓冲通道:使用缓冲通道解耦数据流,防止阻塞
  3. 错误隔离:计算错误不会影响日志记录
  4. 优雅关闭:使用WaitGroup确保所有goroutine正确结束

如果需要更高级的解决方案,可以考虑以下扩展:

// 使用多个计算worker
type WorkerPool struct {
    workers int
    jobCh   chan map[string]interface{}
    wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    wp := &WorkerPool{
        workers: workers,
        jobCh:   make(chan map[string]interface{}, 100),
    }
    
    for i := 0; i < workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
    return wp
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    for data := range wp.jobCh {
        fmt.Printf("Worker %d processing: %v\n", id, data)
    }
}

func (wp *WorkerPool) Submit(data map[string]interface{}) {
    wp.jobCh <- data
}

func (wp *WorkerPool) Close() {
    close(wp.jobCh)
    wp.wg.Wait()
}

这种架构确保了日志记录和计算之间的完全分离,即使计算部分崩溃,日志记录仍能继续工作。

回到顶部