Golang实现守护进程中的watchdog功能

Golang实现守护进程中的watchdog功能 我需要在我的Go守护进程中实现看门狗机制。

因此,我从主函数中调用了StartUpdateCycle()函数。该函数运行一个无限for循环,但由于某些网络问题,进程会挂起且无法恢复。所以我需要监控StartUpdateCycle,如果出现挂起、恐慌、卡死或运行时错误等问题,它应该能够自行恢复。

func StartUpdateCycle(){

    //无限循环

    for {

        //连接到redis集群

        // 在Linux机器上运行命令并收集输出

        //推送到redis集群

        //休眠1分钟

    }
}
func main(){
    StartUpdateCycle()
}

更多关于Golang实现守护进程中的watchdog功能的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang实现守护进程中的watchdog功能的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现守护进程的watchdog功能,可以通过goroutine监控和进程重启机制来完成。以下是几种实现方案:

方案一:使用context超时控制

package main

import (
    "context"
    "log"
    "time"
)

func StartUpdateCycle(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            log.Println("Update cycle stopped by watchdog")
            return
        default:
            // 连接到redis集群
            // 运行命令并收集输出
            // 推送到redis集群
            
            // 添加超时检查点
            if !doWorkWithTimeout() {
                log.Println("Work timeout detected")
                return
            }
            
            select {
            case <-time.After(1 * time.Minute):
                continue
            case <-ctx.Done():
                return
            }
        }
    }
}

func doWorkWithTimeout() bool {
    workCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    done := make(chan bool)
    
    go func() {
        // 实际工作逻辑
        // connectRedis()
        // runCommand()
        // pushToRedis()
        done <- true
    }()
    
    select {
    case <-done:
        return true
    case <-workCtx.Done():
        log.Printf("Work timeout after 30 seconds")
        return false
    }
}

方案二:独立watchdog goroutine

package main

import (
    "log"
    "runtime"
    "time"
)

type Watchdog struct {
    timeout     time.Duration
    lastCheckin time.Time
    resetChan   chan struct{}
    stopChan    chan struct{}
}

func NewWatchdog(timeout time.Duration) *Watchdog {
    return &Watchdog{
        timeout:   timeout,
        resetChan: make(chan struct{}, 1),
        stopChan:  make(chan struct{}),
    }
}

func (w *Watchdog) Start() {
    go func() {
        for {
            select {
            case <-w.stopChan:
                return
            case <-time.After(w.timeout):
                if time.Since(w.lastCheckin) > w.timeout {
                    log.Printf("Watchdog timeout triggered, restarting")
                    restartProcess()
                }
            }
        }
    }()
}

func (w *Watchdog) Checkin() {
    w.lastCheckin = time.Now()
    select {
    case w.resetChan <- struct{}{}:
    default:
    }
}

func (w *Watchdog) Stop() {
    close(w.stopChan)
}

func StartUpdateCycle(watchdog *Watchdog) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Panic recovered: %v", r)
            // 记录堆栈信息
            buf := make([]byte, 1024)
            n := runtime.Stack(buf, false)
            log.Printf("Stack trace: %s", buf[:n])
        }
    }()
    
    for {
        watchdog.Checkin()
        
        // 工作逻辑,每个步骤都检查超时
        if err := connectRedisWithTimeout(); err != nil {
            log.Printf("Redis connection failed: %v", err)
            continue
        }
        
        if output, err := runCommandWithTimeout(); err != nil {
            log.Printf("Command execution failed: %v", err)
        } else {
            if err := pushToRedisWithTimeout(output); err != nil {
                log.Printf("Redis push failed: %v", err)
            }
        }
        
        watchdog.Checkin()
        time.Sleep(1 * time.Minute)
    }
}

方案三:完整守护进程重启

package main

import (
    "log"
    "os"
    "os/exec"
    "syscall"
    "time"
)

func main() {
    // 如果是子进程,运行工作循环
    if os.Getenv("WATCHDOG_CHILD") == "1" {
        runWorker()
        return
    }
    
    // 父进程作为watchdog
    for {
        cmd := exec.Command(os.Args[0])
        cmd.Env = append(os.Environ(), "WATCHDOG_CHILD=1")
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
        cmd.SysProcAttr = &syscall.SysProcAttr{
            Setpgid: true,
        }
        
        if err := cmd.Start(); err != nil {
            log.Printf("Failed to start worker: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }
        
        done := make(chan error, 1)
        go func() {
            done <- cmd.Wait()
        }()
        
        select {
        case err := <-done:
            log.Printf("Worker exited with error: %v", err)
            time.Sleep(2 * time.Second)
        case <-time.After(5 * time.Minute):
            log.Printf("Worker healthy for 5 minutes, continuing")
            cmd.Process.Signal(syscall.SIGTERM)
            <-done
        }
    }
}

func runWorker() {
    // 原来的StartUpdateCycle逻辑
    for {
        // 工作逻辑
        time.Sleep(1 * time.Minute)
    }
}

方案四:使用health check端点

package main

import (
    "net/http"
    "time"
)

var lastSuccessfulUpdate time.Time

func StartUpdateCycle() {
    go func() {
        for {
            // 更新工作
            lastSuccessfulUpdate = time.Now()
            time.Sleep(1 * time.Minute)
        }
    }()
    
    // 启动健康检查服务器
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        if time.Since(lastSuccessfulUpdate) > 2*time.Minute {
            w.WriteHeader(http.StatusServiceUnavailable)
            w.Write([]byte("UNHEALTHY"))
            // 可以在这里触发重启
            go restartProcess()
            return
        }
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("HEALTHY"))
    })
    
    http.ListenAndServe(":8080", nil)
}

关键监控点实现

func connectRedisWithTimeout() error {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // Redis连接逻辑,支持context
    // client := redis.NewClient(&redis.Options{})
    // _, err := client.Ping(ctx).Result()
    
    return nil
}

func runCommandWithTimeout() (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    // 使用CommandContext
    // cmd := exec.CommandContext(ctx, "ls", "-la")
    // output, err := cmd.Output()
    
    return "", nil
}

func restartProcess() {
    // 获取当前可执行文件路径
    execPath, _ := os.Executable()
    
    // 启动新进程
    cmd := exec.Command(execPath, os.Args[1:]...)
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr
    cmd.Stdin = os.Stdin
    cmd.Env = os.Environ()
    
    // 启动新进程后退出当前进程
    if err := cmd.Start(); err == nil {
        os.Exit(0)
    }
}

这些方案提供了不同级别的监控和恢复机制,可以根据具体需求选择或组合使用。方案二提供了细粒度的控制,方案三提供了进程级别的隔离和重启。

回到顶部