从os.stdin读取日志的Golang实现方法

从os.stdin读取日志的Golang实现方法 我正在为Nginx创建一个Go-Syslog系统。Nginx将通过1515端口发送日志到syslog服务器,我有一个消费者程序用于解析日志。目前我的消费者无法从标准输入读取数据。请帮我检查一下代码?

服务器代码:

package main
import (
"fmt"
"os"
"gopkg.in/mcuadros/go-syslog.v2"
)

func main() {
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)
server := syslog.NewServer()
server.SetFormat(syslog.RFC3164)
server.SetHandler(handler)
fmt.Println("Nginx syslog server...")
server.ListenUDP("0.0.0.0:1515")
server.Boot()
go func(channel syslog.LogPartsChannel) {
if _, err := os.Stdin.Stat(); err != nil {
fmt.Println("Stdin not available:", err)
} else {
fmt.Println("Stdin available.")
}
for logParts := range channel {
fmt.Fprintln(os.Stdin, logParts["content"])
}
}(channel)
server.Wait()
}

消费者代码:

package main

import (
"flag"
"fmt"
"io"
"os"
"strings"
"github.com/satyrius/gonx"
)

var conf string
var format string
var logFile string

func init() {
flag.StringVar(&conf, "conf", "dummy", "Nginx config file (e.g. /etc/nginx/nginx.conf)")
flag.StringVar(&format, "format", "main", "Nginx log_format name")
flag.StringVar(&logFile, "log", "-", "Log file name to read. Read from STDIN if file name is '-'")
}

func main() {
flag.Parse()
// 读取指定文件或从标准输入读取
var logReader io.Reader
var err error
if logFile == "dummy" {
logReader = strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`)
} else if logFile == "-" {
logReader = os.Stdin
} else {
file, err := os.Open(logFile)
if err != nil {
panic(err)
}
logReader = file
defer file.Close()
}

// 使用nginx配置文件按名称提取格式
var nginxConfig io.Reader
if conf == "dummy" {
nginxConfig = strings.NewReader(`            http {                 log_format   main  '$remote_addr [$time_local] "$request"';             }        `)
} else {
nginxConfigFile, err := os.Open(conf)
if err != nil {
panic(err)
}
nginxConfig = nginxConfigFile
defer nginxConfigFile.Close()
}

// 从标准输入读取并使用log_format解析日志记录
reader, err := gonx.NewNginxReader(logReader, nginxConfig, format)
if err != nil {
panic(err)
}
for {
rec, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
panic(err)
}
// 处理记录... 例如:
fmt.Printf("Parsed entry: %+v\n", rec)
}
}

更多关于从os.stdin读取日志的Golang实现方法的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

感谢Giulio,我已经解决了问题

更多关于从os.stdin读取日志的Golang实现方法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你不能使用 fmt.Printf 输出到 os.Stdin,只能输出到 os.Stdout(以及 os.Stderr)。

你想实现什么功能?

我认为你需要一个服务器,用于将来自系统日志网络套接字的每一行内容打印(到 os.Stdout),以及一个客户端,用于读取 os.Stdin 并将接收到的每一行通过系统日志网络套接字发送到服务器。是这样吗?

PS:请尝试正确格式化你的代码,或者最好使用 play.golang.org。这样我们可以更方便地查看代码。

感谢大家,

问题已经解决了。

var conf string
var format string
var nginxConfig io.Reader
func init() {
flag.StringVar(&conf, "conf", "/usr/local/etc/nginx/nginx.conf", "Nginx config file")
flag.StringVar(&format, "format", "main", "Nginx log_format name")
}

func main() {
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)

server := syslog.NewServer()
server.SetFormat(syslog.RFC3164)
server.SetHandler(handler)
fmt.Println("Xunya syslog server...")
server.ListenUDP("0.0.0.0:1515")
server.Boot()

go func(channel syslog.LogPartsChannel) {
for logParts := range channel {
logformat := logParts["content"].(string)
logReader := strings.NewReader(logformat)
nginxConfigFile, err := os.Open(conf)
if err != nil {
panic(err)
}
nginxConfig = nginxConfigFile
defer nginxConfigFile.Close()
//fmt.Println(logParts["content"])
reader, err := gonx.NewNginxReader(logReader, nginxConfig, format)
if err != nil {
panic(err)
}
for {
rec, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
panic(err)
}
// Process the record... e.g.
fmt.Printf("Parsed entry: %+v\n", rec)
}
//fmt.Println(logformat)
}
}(channel)

server.Wait()
}

您的代码中存在几个问题,主要是在服务器程序中错误地使用了标准输入。服务器程序应该从syslog通道接收数据,而不是向标准输入写入数据。

服务器代码修正:

package main

import (
    "fmt"
    "gopkg.in/mcuadros/go-syslog.v2"
)

func main() {
    channel := make(syslog.LogPartsChannel)
    handler := syslog.NewChannelHandler(channel)
    server := syslog.NewServer()
    server.SetFormat(syslog.RFC3164)
    server.SetHandler(handler)
    
    fmt.Println("Nginx syslog server starting...")
    server.ListenUDP("0.0.0.0:1515")
    server.Boot()
    
    go func(channel syslog.LogPartsChannel) {
        for logParts := range channel {
            // 直接处理日志内容,而不是写入stdin
            if content, ok := logParts["content"]; ok {
                fmt.Printf("Received log: %v\n", content)
            }
        }
    }(channel)
    
    server.Wait()
}

消费者代码修正:

您的消费者代码基本正确,但如果要从标准输入持续读取流式数据,需要调整读取逻辑:

package main

import (
    "bufio"
    "flag"
    "fmt"
    "io"
    "os"
    "strings"
    "github.com/satyrius/gonx"
)

var conf string
var format string
var logFile string

func init() {
    flag.StringVar(&conf, "conf", "dummy", "Nginx config file (e.g. /etc/nginx/nginx.conf)")
    flag.StringVar(&format, "format", "main", "Nginx log_format name")
    flag.StringVar(&logFile, "log", "-", "Log file name to read. Read from STDIN if file name is '-'")
}

func main() {
    flag.Parse()
    
    // 配置nginx日志格式
    var nginxConfig io.Reader
    if conf == "dummy" {
        nginxConfig = strings.NewReader(`http { log_format main '$remote_addr [$time_local] "$request"'; }`)
    } else {
        nginxConfigFile, err := os.Open(conf)
        if err != nil {
            panic(err)
        }
        nginxConfig = nginxConfigFile
        defer nginxConfigFile.Close()
    }
    
    // 从标准输入持续读取
    if logFile == "-" {
        scanner := bufio.NewScanner(os.Stdin)
        reader, err := gonx.NewNginxReader(strings.NewReader(""), nginxConfig, format)
        if err != nil {
            panic(err)
        }
        
        for scanner.Scan() {
            line := scanner.Text()
            lineReader := strings.NewReader(line)
            reader, err := gonx.NewNginxReader(lineReader, nginxConfig, format)
            if err != nil {
                fmt.Printf("Error creating reader: %v\n", err)
                continue
            }
            
            rec, err := reader.Read()
            if err != nil && err != io.EOF {
                fmt.Printf("Error parsing log: %v\n", err)
                continue
            }
            
            fmt.Printf("Parsed entry: %+v\n", rec)
        }
        
        if err := scanner.Err(); err != nil {
            fmt.Printf("Error reading stdin: %v\n", err)
        }
    } else {
        // 文件读取逻辑保持不变
        var logReader io.Reader
        if logFile == "dummy" {
            logReader = strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`)
        } else {
            file, err := os.Open(logFile)
            if err != nil {
                panic(err)
            }
            logReader = file
            defer file.Close()
        }
        
        reader, err := gonx.NewNginxReader(logReader, nginxConfig, format)
        if err != nil {
            panic(err)
        }
        
        for {
            rec, err := reader.Read()
            if err == io.EOF {
                break
            } else if err != nil {
                panic(err)
            }
            fmt.Printf("Parsed entry: %+v\n", rec)
        }
    }
}

关键修改点:

  1. 服务器程序移除了向os.Stdin写入的错误逻辑,改为直接处理接收到的日志内容
  2. 消费者程序使用bufio.Scanner来持续从标准输入读取流式数据
  3. 为每一行日志创建新的reader进行解析,避免EOF问题

这样的架构允许服务器接收syslog消息,而消费者可以从标准输入管道接收并解析日志数据。

回到顶部