Golang中如何管理Web服务器和长时间运行进程的goroutine
Golang中如何管理Web服务器和长时间运行进程的goroutine 问题已在 Stackoverflow 上提及。
我有一个程序,它使用 ffmpeg 将 RTSP 摄像头转换为 HLS 格式进行流传输。由于 ffmpeg 在后台运行,因此为每个 RTSP 链接创建了 goroutine。
通过以下代码添加流。
func StreamProcess(data <-chan StreamData, ctx context.Context) {
for v := range data {
ctx, _ := context.WithCancel(ctx)
go func() {
if !getStreams(v.camera_id) {
var stream StreamState
stream.camera_id = v.camera_id
stream.state = true
go Stream(v, ctx)
wg.Wait()
} else {
return
}
}()
}
运行 ffmpeg 命令的流传输函数。
func Stream(meta StreamData, ctx context.Context) error {
log.Println("Started Streaming")
ffmpegCmd := exec.Command("ffmpeg", "-i", meta.rtsp, "-pix_fmt", "yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-b:v", "600k", "-c:a", "aac", "-b:a", "160k", "-f", "rtsp", fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
output, _ := ffmpegCmd.CombinedOutput()
log.Println(string(output))
for {
select {
case <-ctx.Done():
log.Println("killing process")
ffmpegCmd.Process.Kill()
return nil
}
}}
我的目标是停止每个 os.exec 进程(ffmpeg 命令),或者至少在不关闭 Fiber 服务器的情况下,关闭 ffmpeg 命令下的所有 goroutine。
更多关于Golang中如何管理Web服务器和长时间运行进程的goroutine的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang中如何管理Web服务器和长时间运行进程的goroutine的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Golang中管理Web服务器和长时间运行的goroutine,关键在于正确使用context和goroutine生命周期管理。以下是针对你代码的改进方案:
1. 使用context管理goroutine生命周期
type StreamManager struct {
streams map[string]*StreamProcess
mu sync.RWMutex
}
type StreamProcess struct {
cmd *exec.Cmd
cancel context.CancelFunc
running bool
}
func (sm *StreamManager) StartStream(meta StreamData) error {
sm.mu.Lock()
defer sm.mu.Unlock()
if _, exists := sm.streams[meta.camera_id]; exists {
return fmt.Errorf("stream already running")
}
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, "ffmpeg",
"-i", meta.rtsp,
"-pix_fmt", "yuv420p",
"-c:v", "libx264",
"-preset", "ultrafast",
"-b:v", "600k",
"-c:a", "aac",
"-b:a", "160k",
"-f", "rtsp",
fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
sp := &StreamProcess{
cmd: cmd,
cancel: cancel,
running: true,
}
sm.streams[meta.camera_id] = sp
go func() {
defer func() {
sm.mu.Lock()
delete(sm.streams, meta.camera_id)
sm.mu.Unlock()
}()
output, err := cmd.CombinedOutput()
if err != nil {
log.Printf("FFmpeg error for camera %s: %v\nOutput: %s",
meta.camera_id, err, output)
}
}()
return nil
}
func (sm *StreamManager) StopStream(cameraID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
if sp, exists := sm.streams[cameraID]; exists {
sp.cancel()
return nil
}
return fmt.Errorf("stream not found")
}
func (sm *StreamManager) StopAll() {
sm.mu.Lock()
defer sm.mu.Unlock()
for cameraID, sp := range sm.streams {
sp.cancel()
delete(sm.streams, cameraID)
}
}
2. 集成到Fiber服务器
func main() {
streamManager := &StreamManager{
streams: make(map[string]*StreamProcess),
}
app := fiber.New()
// 启动流端点
app.Post("/stream/start", func(c *fiber.Ctx) error {
var data StreamData
if err := c.BodyParser(&data); err != nil {
return c.Status(400).JSON(fiber.Map{"error": err.Error()})
}
if err := streamManager.StartStream(data); err != nil {
return c.Status(400).JSON(fiber.Map{"error": err.Error()})
}
return c.JSON(fiber.Map{"status": "stream started"})
})
// 停止流端点
app.Post("/stream/stop/:camera_id", func(c *fiber.Ctx) error {
cameraID := c.Params("camera_id")
if err := streamManager.StopStream(cameraID); err != nil {
return c.Status(404).JSON(fiber.Map{"error": err.Error()})
}
return c.JSON(fiber.Map{"status": "stream stopped"})
})
// 优雅关闭处理
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
go func() {
<-quit
log.Println("Shutting down...")
streamManager.StopAll()
if err := app.Shutdown(); err != nil {
log.Fatal("Server shutdown error:", err)
}
}()
// 启动服务器
if err := app.Listen(":3000"); err != nil {
log.Fatal(err)
}
}
3. 改进的Stream函数
func Stream(meta StreamData, ctx context.Context) error {
log.Printf("Started Streaming for camera: %s", meta.camera_id)
ffmpegCmd := exec.CommandContext(ctx, "ffmpeg",
"-i", meta.rtsp,
"-pix_fmt", "yuv420p",
"-c:v", "libx264",
"-preset", "ultrafast",
"-b:v", "600k",
"-c:a", "aac",
"-b:a", "160k",
"-f", "rtsp",
fmt.Sprintf("rtsp://localhost:8554/%s", meta.camera_id))
// 设置输出管道以实时读取日志
stdout, _ := ffmpegCmd.StdoutPipe()
stderr, _ := ffmpegCmd.StderrPipe()
if err := ffmpegCmd.Start(); err != nil {
return fmt.Errorf("failed to start ffmpeg: %w", err)
}
// 实时读取输出
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
log.Printf("[FFmpeg stdout] %s", scanner.Text())
}
}()
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Printf("[FFmpeg stderr] %s", scanner.Text())
}
}()
// 等待进程结束或context取消
done := make(chan error, 1)
go func() {
done <- ffmpegCmd.Wait()
}()
select {
case <-ctx.Done():
log.Printf("Stopping ffmpeg for camera: %s", meta.camera_id)
ffmpegCmd.Process.Kill()
<-done // 等待进程完全退出
return ctx.Err()
case err := <-done:
if err != nil {
log.Printf("FFmpeg exited with error for camera %s: %v", meta.camera_id, err)
}
return err
}
}
4. 监控goroutine状态
func (sm *StreamManager) GetStatus() map[string]bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
status := make(map[string]bool)
for cameraID, sp := range sm.streams {
status[cameraID] = sp.running
}
return status
}
// 在Fiber中添加状态检查端点
app.Get("/streams/status", func(c *fiber.Ctx) error {
status := streamManager.GetStatus()
return c.JSON(status)
})
这个方案提供了:
- 使用
exec.CommandContext自动管理进程生命周期 - 通过StreamManager集中管理所有流
- 线程安全的map操作
- 优雅的服务器关闭处理
- 实时日志输出
- RESTful API控制接口
关键点是使用context来控制goroutine和子进程的生命周期,确保在需要时能够正确清理资源。

