从Golang工作线程返回任务结果的实现方法

从Golang工作线程返回任务结果的实现方法 以下示例运行了3个goroutine工作协程来处理传入的任务。目前这部分工作正常。然而,我卡住的地方是如何返回每个任务的结果。我该如何实现这一点?理想情况下,一旦任务X完成,我就应该能立即得到结果。https://play.golang.org/p/o4211X0k7bp

谢谢

package main

import (
	"fmt"
	"time"
)

func main() {
	w := NewWorker()
	w.Start()

	for i := 0; i < 19; i++ {
		j := NewJob(i)
		w.Add(j)
		
		time.Sleep(time.Second)
	}

	select {}
}

// ----------------------------------------------------------------------------------------

type Job struct {
	id int
}

func NewJob(id int) *Job {
	return &Job{
		id: id,
	}
}

// ----------------------------------------------------------------------------------------

type Worker struct {
	total   int
	channel chan *Job
}

func NewWorker() *Worker {
	return &Worker{
		total:   3,
		channel: make(chan *Job, 100),
	}
}

func (w *Worker) Start() {
	for i := 1; i <= w.total; i++ {
		go w.run(i)
	}
}

func (w *Worker) Add(job *Job) {
	w.channel <- job
}

func (w *Worker) run(id int) {
	fmt.Println("worker", id, "started...")

	for {
		select {
		case job := <-w.channel:
			fmt.Println(id, "processing job", job.id)

			// This is the result
			// fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String())
		default:
			time.Sleep(1 * time.Second)
			fmt.Println("sleeping")
		}
	}
}

更多关于从Golang工作线程返回任务结果的实现方法的实战教程也可以访问 https://www.itying.com/category-94-b0.html

9 回复

@GoingToGo 你说的“获取结果”是什么意思?

更多关于从Golang工作线程返回任务结果的实现方法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


存在专门用于此目的的通道。

Go by Example - 工作池

@skillian

我大概在你回答前5分钟刚完成,看起来和你的方案有90%相似。不过,剩下的10%看起来比我的方案简洁得多,我会从你的示例中受益。

尽管如此,非常感谢你花费的时间。非常感激。

// 这是结果 // fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String())

假设作业 X 已被处理,我想从工作线程返回 fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String()) 作为结果。

@skillian

两件事。

  1. 你的工作协程在没有任务时会退出,但它们应该始终保持运行,因此应用程序永远不应该退出。
  2. 在 main() 函数中,我们如何知道结果通道是空的?我需要在这种情况下打印一些信息。

这是我目前未采纳你修改的版本:https://play.golang.org/p/AiqLLq1CLPd 只需要解决上面的第2点,但我不知道如何操作。

我也有同样的问题,并且想了解更多关于 Go 并发和通道的知识。这也是我构建自己实现的原因。

也许我的代码能帮到你:

GitHub

frankkopp/WorkerPool

一个用 GO 实现的 WorkerPool。通过在 GitHub 上创建账户来为 frankkopp/WorkerPool 的开发做出贡献。

这是一个实验性项目,旨在理解如何使用通道和协程来构建长期运行的应用程序。根据我的使用场景,以下是我正在做的事情。

这是一个命令行应用程序,它监听传入的请求,处理它们,然后返回每个请求的结果。如果有新请求进来,它会在终端上打印“in”;完成后打印“out”;在等待时(作业通道为空)打印“waiting”;以及(结果通道为空时)打印“empty”。因此,总共有四种场景。

我知道有些部分可能不太合理,但这至少更像是为了学习实践。

要实现从工作线程返回任务结果,可以使用带缓冲的结果通道。以下是修改后的代码示例:

package main

import (
	"fmt"
	"time"
)

func main() {
	w := NewWorker()
	w.Start()

	// 启动结果监听协程
	go func() {
		for result := range w.ResultChan() {
			fmt.Printf("收到结果: %s\n", result)
		}
	}()

	for i := 0; i < 19; i++ {
		j := NewJob(i)
		w.Add(j)
		
		time.Sleep(time.Second)
	}

	// 等待所有任务完成
	time.Sleep(5 * time.Second)
}

// ----------------------------------------------------------------------------------------

type Job struct {
	id int
}

func NewJob(id int) *Job {
	return &Job{
		id: id,
	}
}

// ----------------------------------------------------------------------------------------

type Worker struct {
	total      int
	jobChannel chan *Job
	resultChan chan string
}

func NewWorker() *Worker {
	return &Worker{
		total:      3,
		jobChannel: make(chan *Job, 100),
		resultChan: make(chan string, 100),
	}
}

func (w *Worker) Start() {
	for i := 1; i <= w.total; i++ {
		go w.run(i)
	}
}

func (w *Worker) Add(job *Job) {
	w.jobChannel <- job
}

func (w *Worker) ResultChan() <-chan string {
	return w.resultChan
}

func (w *Worker) run(id int) {
	fmt.Printf("worker %d started...\n", id)

	for {
		select {
		case job := <-w.jobChannel:
			fmt.Printf("%d processing job %d\n", id, job.id)

			// 处理任务并生成结果
			result := fmt.Sprintf("任务 %d 由 worker %d 完成 @ %s", 
				job.id, id, time.Now().UTC().Format("15:04:05"))
			
			// 将结果发送到结果通道
			w.resultChan <- result
			
		default:
			time.Sleep(1 * time.Second)
			fmt.Println("sleeping")
		}
	}
}

另一种更简洁的实现方式是使用带返回值的任务函数:

package main

import (
	"fmt"
	"time"
)

func main() {
	w := NewWorker()
	w.Start()

	// 提交任务并获取结果通道
	for i := 0; i < 10; i++ {
		taskID := i
		resultChan := w.Submit(func() string {
			time.Sleep(500 * time.Millisecond)
			return fmt.Sprintf("任务 %d 完成 @ %s", 
				taskID, time.Now().Format("15:04:05.000"))
		})

		// 异步获取结果
		go func(id int, ch <-chan string) {
			result := <-ch
			fmt.Printf("任务 %d 结果: %s\n", id, result)
		}(taskID, resultChan)
	}

	time.Sleep(3 * time.Second)
}

type Worker struct {
	taskChan chan func() string
}

func NewWorker() *Worker {
	return &Worker{
		taskChan: make(chan func() string, 100),
	}
}

func (w *Worker) Start() {
	for i := 0; i < 3; i++ {
		go func(workerID int) {
			for task := range w.taskChan {
				result := task()
				fmt.Printf("Worker %d 处理完成\n", workerID)
				// 在实际实现中,这里需要将结果传回
			}
		}(i)
	}
}

func (w *Worker) Submit(task func() string) <-chan string {
	resultChan := make(chan string, 1)
	
	wrappedTask := func() string {
		result := task()
		resultChan <- result
		return result
	}
	
	w.taskChan <- wrappedTask
	return resultChan
}

使用 sync.WaitGroup 等待所有任务完成的完整示例:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	w := NewWorker(3)
	w.Start()

	var wg sync.WaitGroup
	results := make(chan string, 10)

	// 提交任务
	for i := 0; i < 10; i++ {
		wg.Add(1)
		taskID := i
		
		go func() {
			defer wg.Done()
			
			result := w.Process(taskID)
			results <- result
		}()
	}

	// 等待所有任务完成并关闭结果通道
	go func() {
		wg.Wait()
		close(results)
	}()

	// 收集结果
	for result := range results {
		fmt.Println("收到结果:", result)
	}
}

type Worker struct {
	workers int
	tasks   chan int
	results chan string
	wg      sync.WaitGroup
}

func NewWorker(workers int) *Worker {
	return &Worker{
		workers: workers,
		tasks:   make(chan int, 100),
		results: make(chan string, 100),
	}
}

func (w *Worker) Start() {
	for i := 0; i < w.workers; i++ {
		w.wg.Add(1)
		go func(workerID int) {
			defer w.wg.Done()
			for taskID := range w.tasks {
				// 模拟处理时间
				time.Sleep(100 * time.Millisecond)
				
				result := fmt.Sprintf("任务 %d 由 worker %d 完成", 
					taskID, workerID)
				w.results <- result
			}
		}(i)
	}
}

func (w *Worker) Process(taskID int) string {
	w.tasks <- taskID
	return <-w.results
}

func (w *Worker) Stop() {
	close(w.tasks)
	w.wg.Wait()
	close(w.results)
}

这些示例展示了如何在Golang中从工作线程返回任务结果,包括使用结果通道、异步回调等模式。关键点是为每个任务创建结果通道,或者使用共享的结果通道配合任务ID来关联结果。

回到顶部