从Golang工作线程返回任务结果的实现方法
从Golang工作线程返回任务结果的实现方法 以下示例运行了3个goroutine工作协程来处理传入的任务。目前这部分工作正常。然而,我卡住的地方是如何返回每个任务的结果。我该如何实现这一点?理想情况下,一旦任务X完成,我就应该能立即得到结果。https://play.golang.org/p/o4211X0k7bp
谢谢
package main
import (
"fmt"
"time"
)
func main() {
w := NewWorker()
w.Start()
for i := 0; i < 19; i++ {
j := NewJob(i)
w.Add(j)
time.Sleep(time.Second)
}
select {}
}
// ----------------------------------------------------------------------------------------
type Job struct {
id int
}
func NewJob(id int) *Job {
return &Job{
id: id,
}
}
// ----------------------------------------------------------------------------------------
type Worker struct {
total int
channel chan *Job
}
func NewWorker() *Worker {
return &Worker{
total: 3,
channel: make(chan *Job, 100),
}
}
func (w *Worker) Start() {
for i := 1; i <= w.total; i++ {
go w.run(i)
}
}
func (w *Worker) Add(job *Job) {
w.channel <- job
}
func (w *Worker) run(id int) {
fmt.Println("worker", id, "started...")
for {
select {
case job := <-w.channel:
fmt.Println(id, "processing job", job.id)
// This is the result
// fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String())
default:
time.Sleep(1 * time.Second)
fmt.Println("sleeping")
}
}
}
更多关于从Golang工作线程返回任务结果的实现方法的实战教程也可以访问 https://www.itying.com/category-94-b0.html
@GoingToGo 你说的“获取结果”是什么意思?
更多关于从Golang工作线程返回任务结果的实现方法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
存在专门用于此目的的通道。
Go by Example - 工作池
// 这是结果 // fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String())
假设作业 X 已被处理,我想从工作线程返回 fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String()) 作为结果。
两件事。
- 你的工作协程在没有任务时会退出,但它们应该始终保持运行,因此应用程序永远不应该退出。
- 在 main() 函数中,我们如何知道结果通道是空的?我需要在这种情况下打印一些信息。
这是我目前未采纳你修改的版本:https://play.golang.org/p/AiqLLq1CLPd 只需要解决上面的第2点,但我不知道如何操作。
我也有同样的问题,并且想了解更多关于 Go 并发和通道的知识。这也是我构建自己实现的原因。
也许我的代码能帮到你:
frankkopp/WorkerPool
一个用 GO 实现的 WorkerPool。通过在 GitHub 上创建账户来为 frankkopp/WorkerPool 的开发做出贡献。
这是一个实验性项目,旨在理解如何使用通道和协程来构建长期运行的应用程序。根据我的使用场景,以下是我正在做的事情。
这是一个命令行应用程序,它监听传入的请求,处理它们,然后返回每个请求的结果。如果有新请求进来,它会在终端上打印“in”;完成后打印“out”;在等待时(作业通道为空)打印“waiting”;以及(结果通道为空时)打印“empty”。因此,总共有四种场景。
我知道有些部分可能不太合理,但这至少更像是为了学习实践。
要实现从工作线程返回任务结果,可以使用带缓冲的结果通道。以下是修改后的代码示例:
package main
import (
"fmt"
"time"
)
func main() {
w := NewWorker()
w.Start()
// 启动结果监听协程
go func() {
for result := range w.ResultChan() {
fmt.Printf("收到结果: %s\n", result)
}
}()
for i := 0; i < 19; i++ {
j := NewJob(i)
w.Add(j)
time.Sleep(time.Second)
}
// 等待所有任务完成
time.Sleep(5 * time.Second)
}
// ----------------------------------------------------------------------------------------
type Job struct {
id int
}
func NewJob(id int) *Job {
return &Job{
id: id,
}
}
// ----------------------------------------------------------------------------------------
type Worker struct {
total int
jobChannel chan *Job
resultChan chan string
}
func NewWorker() *Worker {
return &Worker{
total: 3,
jobChannel: make(chan *Job, 100),
resultChan: make(chan string, 100),
}
}
func (w *Worker) Start() {
for i := 1; i <= w.total; i++ {
go w.run(i)
}
}
func (w *Worker) Add(job *Job) {
w.jobChannel <- job
}
func (w *Worker) ResultChan() <-chan string {
return w.resultChan
}
func (w *Worker) run(id int) {
fmt.Printf("worker %d started...\n", id)
for {
select {
case job := <-w.jobChannel:
fmt.Printf("%d processing job %d\n", id, job.id)
// 处理任务并生成结果
result := fmt.Sprintf("任务 %d 由 worker %d 完成 @ %s",
job.id, id, time.Now().UTC().Format("15:04:05"))
// 将结果发送到结果通道
w.resultChan <- result
default:
time.Sleep(1 * time.Second)
fmt.Println("sleeping")
}
}
}
另一种更简洁的实现方式是使用带返回值的任务函数:
package main
import (
"fmt"
"time"
)
func main() {
w := NewWorker()
w.Start()
// 提交任务并获取结果通道
for i := 0; i < 10; i++ {
taskID := i
resultChan := w.Submit(func() string {
time.Sleep(500 * time.Millisecond)
return fmt.Sprintf("任务 %d 完成 @ %s",
taskID, time.Now().Format("15:04:05.000"))
})
// 异步获取结果
go func(id int, ch <-chan string) {
result := <-ch
fmt.Printf("任务 %d 结果: %s\n", id, result)
}(taskID, resultChan)
}
time.Sleep(3 * time.Second)
}
type Worker struct {
taskChan chan func() string
}
func NewWorker() *Worker {
return &Worker{
taskChan: make(chan func() string, 100),
}
}
func (w *Worker) Start() {
for i := 0; i < 3; i++ {
go func(workerID int) {
for task := range w.taskChan {
result := task()
fmt.Printf("Worker %d 处理完成\n", workerID)
// 在实际实现中,这里需要将结果传回
}
}(i)
}
}
func (w *Worker) Submit(task func() string) <-chan string {
resultChan := make(chan string, 1)
wrappedTask := func() string {
result := task()
resultChan <- result
return result
}
w.taskChan <- wrappedTask
return resultChan
}
使用 sync.WaitGroup 等待所有任务完成的完整示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
w := NewWorker(3)
w.Start()
var wg sync.WaitGroup
results := make(chan string, 10)
// 提交任务
for i := 0; i < 10; i++ {
wg.Add(1)
taskID := i
go func() {
defer wg.Done()
result := w.Process(taskID)
results <- result
}()
}
// 等待所有任务完成并关闭结果通道
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Println("收到结果:", result)
}
}
type Worker struct {
workers int
tasks chan int
results chan string
wg sync.WaitGroup
}
func NewWorker(workers int) *Worker {
return &Worker{
workers: workers,
tasks: make(chan int, 100),
results: make(chan string, 100),
}
}
func (w *Worker) Start() {
for i := 0; i < w.workers; i++ {
w.wg.Add(1)
go func(workerID int) {
defer w.wg.Done()
for taskID := range w.tasks {
// 模拟处理时间
time.Sleep(100 * time.Millisecond)
result := fmt.Sprintf("任务 %d 由 worker %d 完成",
taskID, workerID)
w.results <- result
}
}(i)
}
}
func (w *Worker) Process(taskID int) string {
w.tasks <- taskID
return <-w.results
}
func (w *Worker) Stop() {
close(w.tasks)
w.wg.Wait()
close(w.results)
}
这些示例展示了如何在Golang中从工作线程返回任务结果,包括使用结果通道、异步回调等模式。关键点是为每个任务创建结果通道,或者使用共享的结果通道配合任务ID来关联结果。

