Golang中如何管理Web服务器和长时间运行进程的goroutine

Golang中如何管理Web服务器和长时间运行进程的goroutine 问题已在 Stackoverflow 上提及。

我有一个程序,它使用 ffmpeg 将 RTSP 摄像头转换为 HLS 格式进行流传输。由于 ffmpeg 在后台运行,因此为每个 RTSP 链接创建了 goroutine。

通过以下代码添加流。

func StreamProcess(data <-chan StreamData, ctx context.Context) {
for v := range data {
    ctx, _ := context.WithCancel(ctx)
    go func() {
        if !getStreams(v.camera_id) {
            var stream StreamState
            stream.camera_id = v.camera_id
            stream.state = true
            go Stream(v, ctx)
            wg.Wait()
        } else {
            return
        }
    }()
}   

运行 ffmpeg 命令的流传输函数。

func Stream(meta StreamData, ctx context.Context) error {
    log.Println("Started Streaming")
    ffmpegCmd := exec.Command("ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
    output, _ := ffmpegCmd.CombinedOutput()

    log.Println(string(output))

    for {
        select {
        case <-ctx.Done():
           log.Println("killing process")
           ffmpegCmd.Process.Kill()
           return nil
        }
    }}

我的目标是停止每个 os.exec 进程(ffmpeg 命令),或者至少在不关闭 Fiber 服务器的情况下,关闭 ffmpeg 命令下的所有 goroutine。


更多关于Golang中如何管理Web服务器和长时间运行进程的goroutine的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何管理Web服务器和长时间运行进程的goroutine的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Golang中管理Web服务器和长时间运行的goroutine,关键在于正确使用context和goroutine生命周期管理。以下是针对你代码的改进方案:

1. 使用context管理goroutine生命周期

type StreamManager struct {
    streams map[string]*StreamProcess
    mu      sync.RWMutex
}

type StreamProcess struct {
    cmd     *exec.Cmd
    cancel  context.CancelFunc
    running bool
}

func (sm *StreamManager) StartStream(meta StreamData) error {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    
    if _, exists := sm.streams[meta.camera_id]; exists {
        return fmt.Errorf("stream already running")
    }
    
    ctx, cancel := context.WithCancel(context.Background())
    
    cmd := exec.CommandContext(ctx, "ffmpeg", 
        "-i", meta.rtsp,
        "-pix_fmt", "yuv420p",
        "-c:v", "libx264",
        "-preset", "ultrafast",
        "-b:v", "600k",
        "-c:a", "aac",
        "-b:a", "160k",
        "-f", "rtsp",
        fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
    
    sp := &StreamProcess{
        cmd:    cmd,
        cancel: cancel,
        running: true,
    }
    
    sm.streams[meta.camera_id] = sp
    
    go func() {
        defer func() {
            sm.mu.Lock()
            delete(sm.streams, meta.camera_id)
            sm.mu.Unlock()
        }()
        
        output, err := cmd.CombinedOutput()
        if err != nil {
            log.Printf("FFmpeg error for camera %s: %v\nOutput: %s", 
                meta.camera_id, err, output)
        }
    }()
    
    return nil
}

func (sm *StreamManager) StopStream(cameraID string) error {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    
    if sp, exists := sm.streams[cameraID]; exists {
        sp.cancel()
        return nil
    }
    
    return fmt.Errorf("stream not found")
}

func (sm *StreamManager) StopAll() {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    
    for cameraID, sp := range sm.streams {
        sp.cancel()
        delete(sm.streams, cameraID)
    }
}

2. 集成到Fiber服务器

func main() {
    streamManager := &StreamManager{
        streams: make(map[string]*StreamProcess),
    }
    
    app := fiber.New()
    
    // 启动流端点
    app.Post("/stream/start", func(c *fiber.Ctx) error {
        var data StreamData
        if err := c.BodyParser(&data); err != nil {
            return c.Status(400).JSON(fiber.Map{"error": err.Error()})
        }
        
        if err := streamManager.StartStream(data); err != nil {
            return c.Status(400).JSON(fiber.Map{"error": err.Error()})
        }
        
        return c.JSON(fiber.Map{"status": "stream started"})
    })
    
    // 停止流端点
    app.Post("/stream/stop/:camera_id", func(c *fiber.Ctx) error {
        cameraID := c.Params("camera_id")
        
        if err := streamManager.StopStream(cameraID); err != nil {
            return c.Status(404).JSON(fiber.Map{"error": err.Error()})
        }
        
        return c.JSON(fiber.Map{"status": "stream stopped"})
    })
    
    // 优雅关闭处理
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt)
    
    go func() {
        <-quit
        log.Println("Shutting down...")
        streamManager.StopAll()
        if err := app.Shutdown(); err != nil {
            log.Fatal("Server shutdown error:", err)
        }
    }()
    
    // 启动服务器
    if err := app.Listen(":3000"); err != nil {
        log.Fatal(err)
    }
}

3. 改进的Stream函数

func Stream(meta StreamData, ctx context.Context) error {
    log.Printf("Started Streaming for camera: %s", meta.camera_id)
    
    ffmpegCmd := exec.CommandContext(ctx, "ffmpeg",
        "-i", meta.rtsp,
        "-pix_fmt", "yuv420p",
        "-c:v", "libx264",
        "-preset", "ultrafast",
        "-b:v", "600k",
        "-c:a", "aac",
        "-b:a", "160k",
        "-f", "rtsp",
        fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
    
    // 设置输出管道以实时读取日志
    stdout, _ := ffmpegCmd.StdoutPipe()
    stderr, _ := ffmpegCmd.StderrPipe()
    
    if err := ffmpegCmd.Start(); err != nil {
        return fmt.Errorf("failed to start ffmpeg: %w", err)
    }
    
    // 实时读取输出
    go func() {
        scanner := bufio.NewScanner(stdout)
        for scanner.Scan() {
            log.Printf("[FFmpeg stdout] %s", scanner.Text())
        }
    }()
    
    go func() {
        scanner := bufio.NewScanner(stderr)
        for scanner.Scan() {
            log.Printf("[FFmpeg stderr] %s", scanner.Text())
        }
    }()
    
    // 等待进程结束或context取消
    done := make(chan error, 1)
    go func() {
        done <- ffmpegCmd.Wait()
    }()
    
    select {
    case <-ctx.Done():
        log.Printf("Stopping ffmpeg for camera: %s", meta.camera_id)
        ffmpegCmd.Process.Kill()
        <-done // 等待进程完全退出
        return ctx.Err()
    case err := <-done:
        if err != nil {
            log.Printf("FFmpeg exited with error for camera %s: %v", meta.camera_id, err)
        }
        return err
    }
}

4. 监控goroutine状态

func (sm *StreamManager) GetStatus() map[string]bool {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    
    status := make(map[string]bool)
    for cameraID, sp := range sm.streams {
        status[cameraID] = sp.running
    }
    return status
}

// 在Fiber中添加状态检查端点
app.Get("/streams/status", func(c *fiber.Ctx) error {
    status := streamManager.GetStatus()
    return c.JSON(status)
})

这个方案提供了:

  1. 使用exec.CommandContext自动管理进程生命周期
  2. 通过StreamManager集中管理所有流
  3. 线程安全的map操作
  4. 优雅的服务器关闭处理
  5. 实时日志输出
  6. RESTful API控制接口

关键点是使用context来控制goroutine和子进程的生命周期,确保在需要时能够正确清理资源。

回到顶部