Golang中如何识别并停止一个goroutine

Golang中如何识别并停止一个goroutine 大家好,

我正在尝试将一些bash脚本迁移到Go语言,因为在我看来它是一门很棒的语言。 但是我在goroutine方面遇到了一个问题,通过在这里或stackoverflow上搜索也无法解决。

以下是我的项目描述: 我有一个包含两个端点的Web服务器;一个端点每次添加一个任务,另一个端点每次停止一个任务(为简化起见,假设停止最先添加的那个)。 服务器启动时也会启动一些任务。 这里的任务是长时间运行的计算任务。 该项目能够并发(或并行;非阻塞)执行多个任务。

以下是一个示例:

func main() {

    for i := 0; i < 10; i++ {
        addTask()
    }

    http.HandleFunc("/addtask", addTask)
    http.HandleFunc("/stoptask", stopTask)

    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal(err)
    }
}

func addTask() {
    go task() // 启动一个goroutine
}

func stopTask() {
    // 这里就是谜团所在,我应该使用context.WithCancel()还是通道来停止任务,或者也许有其他方法?
}

// 这里是我试图停止的任务。应该能够在任何时候停止执行;在计算过程中或休眠期间(效果应等同于SIGKILL)
func task() {
    longComputationalCalculation() // 可能需要几分钟才能完成
    time.Sleep(time.Duration(time.Second * 30)) // 在第二次计算前休眠30秒
    longComputationalCalculation2() // 将第一次计算的结果作为参数;也可能需要几分钟才能完成
    time.Sleep(time.Duration(time.Second * 30)) // 最后休眠30秒
}

网上有很多例子使用了context.WithTimeout()或timer.Sleep()。但在这里,我有实际的计算需要执行,并且除非用户想要停止其中一个,否则它们应该继续执行。

如果使用bash,我会在一个新会话中启动这个任务,如果要终止它,我会向该会话发送一个pkill命令。

提前感谢您花时间阅读我的项目描述,如果您能指引我找到解决方案,再次表示感谢。

此致


更多关于Golang中如何识别并停止一个goroutine的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何识别并停止一个goroutine的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中停止goroutine的推荐方法是使用context.Contextselect语句。以下是针对你场景的解决方案:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
)

type Task struct {
    ID      int
    ctx     context.Context
    cancel  context.CancelFunc
    running bool
}

var (
    tasks     = make(map[int]*Task)
    tasksLock sync.Mutex
    taskID    = 0
)

func main() {
    // 启动初始任务
    for i := 0; i < 10; i++ {
        addTask()
    }

    http.HandleFunc("/addtask", func(w http.ResponseWriter, r *http.Request) {
        id := addTask()
        fmt.Fprintf(w, "Task %d started", id)
    })

    http.HandleFunc("/stoptask", func(w http.ResponseWriter, r *http.Request) {
        id := stopFirstTask()
        if id != -1 {
            fmt.Fprintf(w, "Task %d stopped", id)
        } else {
            fmt.Fprintf(w, "No tasks to stop")
        }
    })

    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal(err)
    }
}

func addTask() int {
    tasksLock.Lock()
    defer tasksLock.Unlock()

    taskID++
    ctx, cancel := context.WithCancel(context.Background())
    
    task := &Task{
        ID:      taskID,
        ctx:     ctx,
        cancel:  cancel,
        running: true,
    }
    
    tasks[taskID] = task
    
    go task.run()
    
    return taskID
}

func stopFirstTask() int {
    tasksLock.Lock()
    defer tasksLock.Unlock()

    for id, task := range tasks {
        if task.running {
            task.cancel()
            delete(tasks, id)
            return id
        }
    }
    return -1
}

func (t *Task) run() {
    defer func() {
        tasksLock.Lock()
        t.running = false
        tasksLock.Unlock()
    }()

    // 在计算中定期检查context是否被取消
    if !t.longComputationalCalculation() {
        return
    }

    // 使用context-aware的sleep
    select {
    case <-time.After(30 * time.Second):
        // 继续执行
    case <-t.ctx.Done():
        return
    }

    if !t.longComputationalCalculation2() {
        return
    }

    select {
    case <-time.After(30 * time.Second):
        // 任务完成
    case <-t.ctx.Done():
        return
    }
}

func (t *Task) longComputationalCalculation() bool {
    // 模拟长时间计算,定期检查context
    for i := 0; i < 100; i++ {
        select {
        case <-t.ctx.Done():
            return false
        default:
            // 执行计算工作
            time.Sleep(50 * time.Millisecond) // 模拟计算
        }
    }
    return true
}

func (t *Task) longComputationalCalculation2() bool {
    // 另一个长时间计算
    for i := 0; i < 100; i++ {
        select {
        case <-t.ctx.Done():
            return false
        default:
            // 执行计算工作
            time.Sleep(50 * time.Millisecond) // 模拟计算
        }
    }
    return true
}

如果你需要更即时的停止(类似SIGKILL),可以在计算函数中添加更频繁的检查点:

func (t *Task) intensiveCalculation() bool {
    // 每100次迭代检查一次context
    for i := 0; i < 1000000; i++ {
        if i%100 == 0 {
            select {
            case <-t.ctx.Done():
                return false
            default:
            }
        }
        // 实际计算工作
        _ = i * i
    }
    return true
}

对于无法添加检查点的阻塞操作,可以使用带有超时的context:

func (t *Task) blockingOperation() bool {
    ctx, cancel := context.WithTimeout(t.ctx, 5*time.Second)
    defer cancel()

    // 执行可能阻塞的操作
    result := make(chan bool)
    go func() {
        // 模拟阻塞操作
        time.Sleep(10 * time.Second)
        result <- true
    }()

    select {
    case <-ctx.Done():
        return false
    case <-result:
        return true
    }
}

这个方案使用context进行优雅停止,通过定期检查context.Done()来及时响应停止请求。每个任务都有独立的context,可以单独控制停止。

回到顶部