golang高性能无限任务队列与并发工作池插件库kyoo的使用

Golang高性能无限任务队列与并发工作池插件库kyoo的使用

关于kyoo

kyoo是队列(queue)一词的音标转写。它提供了一个任务队列,可以容纳尽可能多的任务,仅受运行系统可用资源的限制。

该队列具有以下特点:

  • 任务队列无限制(仅受系统资源=内存限制)
  • 使用工作池并发处理任务
  • 停止队列时,待处理的任务仍会被处理

该库包含一个简单的Job接口和一个简单的FuncExecutorJob,它只是执行给定的函数并实现该接口。通过这种方式,几乎所有类型的工作负载都应该可以被处理,当然也可以添加Job接口的自定义实现。

kyoo的典型使用场景包括:

  • RabbitMQ或Amazon SQS等消息队列的消费者
  • 将Web服务器请求中耗时的处理工作卸载到后台任务
  • 各种后台处理任务,如图像优化等

示例代码

以下示例展示了一个简单的HTTP服务器,将任务卸载到持续在后台处理的任务队列中。

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
	"time"

	jobqueue "github.com/dirkaholic/kyoo"
	"github.com/dirkaholic/kyoo/job"
)

var queue *jobqueue.JobQueue

// HTTP处理函数,将任务提交到队列
func handler(w http.ResponseWriter, r *http.Request) {
	queue.Submit(&job.FuncExecutorJob{Func: func() error {
		return doTheHeavyBackgroundWork(r.URL.Path)
	}})
	fmt.Printf("%s - submitted %s !!\n", time.Now().String(), r.URL.Path)

	fmt.Fprint(w, "Job added to queue.")
}

func main() {
	// 创建工作队列,工作池大小为CPU核心数的2倍
	queue = jobqueue.NewJobQueue(runtime.NumCPU() * 2)
	queue.Start()

	// 设置HTTP路由并启动服务器
	http.HandleFunc("/", handler)
	log.Fatal(http.ListenAndServe(":8080", nil))
}

// 模拟耗时后台任务
func doTheHeavyBackgroundWork(path string) error {
	time.Sleep(2 * time.Second)
	fmt.Printf("%s - processed %s !!\n", time.Now().String(), path)
	return nil
}

测试示例

可以通过向服务器发送一系列HTTP请求来测试任务卸载功能:

$ for i in {1..10}; do http http://127.0.0.1:8080/test/$i; done

服务器端的输出应该类似于这样:

2020-01-09 21:36:36.156277 +0100 CET m=+5.733617272 - submitted /test/1 !!
2020-01-09 21:36:36.443521 +0100 CET m=+6.020861136 - submitted /test/2 !!
2020-01-09 21:36:36.730535 +0100 CET m=+6.307874793 - submitted /test/3 !!
2020-01-09 21:36:37.021405 +0100 CET m=+6.598744533 - submitted /test/4 !!
2020-01-09 21:36:37.311973 +0100 CET m=+6.889312431 - submitted /test/5 !!
2020-01-09 21:36:37.609868 +0100 CET m=+7.187208115 - submitted /test/6 !!
2020-01-09 21:36:37.895222 +0100 CET m=+7.472561850 - submitted /test/7 !!
2020-01-09 21:36:38.160524 +0100 CET m=+7.737863891 - processed /test/1 !!
2020-01-09 21:36:38.171491 +0100 CET m=+7.748830724 - submitted /test/8 !!
2020-01-09 21:36:38.445832 +0100 CET m=+8.023171514 - processed /test/2 !!
2020-01-09 21:36:38.448423 +0100 CET m=+8.025762679 - submitted /test/9 !!
2020-01-09 21:36:38.730541 +0100 CET m=+8.307880933 - submitted /test/10 !!
2020-01-09 21:36:38.735158 +0100 CET m=+8.312497505 - processed /test/3 !!
2020-01-09 21:36:39.024788 +0100 CET m=+8.602128093 - processed /test/4 !!
2020-01-09 21:36:39.315991 +0100 CET m=+8.893331115 - processed /test/5 !!
2020-01-09 21:36:39.614848 +0100 CET m=+9.192187633 - processed /test/6 !!
2020-01-09 21:36:39.896692 +0100 CET m=+9.474031970 - processed /test/7 !!
2020-01-09 21:36:40.175952 +0100 CET m=+9.753291345 - processed /test/8 !!
2020-01-09 21:36:40.451877 +0100 CET m=+10.029216847 - processed /test/9 !!
2020-01-09 21:36:40.734289 +0100 CET m=+10.311628415 - processed /test/10 !!

更多示例

kyoo还提供了其他使用示例,如SQS worker等,可以参考库中的示例代码。


更多关于golang高性能无限任务队列与并发工作池插件库kyoo的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能无限任务队列与并发工作池插件库kyoo的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


kyoo - Go高性能无限任务队列与并发工作池库

kyoo 是一个轻量级的Go语言库,用于实现高性能的任务队列和并发工作池模式。下面我将详细介绍它的使用方法和示例代码。

基本概念

kyoo 的核心特点:

  • 无限任务队列容量
  • 动态工作池大小调整
  • 任务优先级支持
  • 优雅关闭机制
  • 极简API设计

安装

go get github.com/rytsh/kyoo

基础使用示例

1. 创建简单工作池

package main

import (
	"fmt"
	"github.com/rytsh/kyoo"
	"time"
)

func main() {
	// 创建一个工作池,默认5个worker
	pool := kyoo.New(5)
	
	// 提交任务
	for i := 0; i < 10; i++ {
		taskID := i
		pool.Add(func() {
			fmt.Printf("处理任务 %d\n", taskID)
			time.Sleep(1 * time.Second)
		})
	}
	
	// 等待所有任务完成
	pool.Wait()
	fmt.Println("所有任务完成")
}

2. 动态调整工作池大小

pool := kyoo.New(3) // 初始3个worker

// 动态调整worker数量
pool.Tune(5) // 增加到5个
pool.Tune(2) // 减少到2个

3. 带优先级的任务

pool := kyoo.New(3)

// 高优先级任务
pool.AddPriority(func() {
	fmt.Println("高优先级任务")
}, kyoo.High)

// 普通优先级任务(默认)
pool.AddPriority(func() {
	fmt.Println("普通优先级任务")
}, kyoo.Normal)

// 低优先级任务
pool.AddPriority(func() {
	fmt.Println("低优先级任务")
}, kyoo.Low)

高级功能示例

1. 优雅关闭

pool := kyoo.New(3)

// 提交一些任务
for i := 0; i < 10; i++ {
	pool.Add(func() {
		time.Sleep(500 * time.Millisecond)
		fmt.Println("任务完成")
	})
}

// 优雅关闭 - 等待所有任务完成
pool.Close()

2. 获取任务结果

pool := kyoo.New(3)

// 使用ResultChan获取任务结果
resultChan := make(chan int, 10)
for i := 0; i < 10; i++ {
	i := i
	pool.Add(func() {
		// 模拟工作
		time.Sleep(100 * time.Millisecond)
		resultChan <- i * 2
	})
}

// 收集结果
go func() {
	for res := range resultChan {
		fmt.Printf("结果: %d\n", res)
	}
}()

pool.Wait()
close(resultChan)

3. 错误处理

pool := kyoo.New(3)
errChan := make(chan error, 10)

for i := 0; i < 10; i++ {
	i := i
	pool.Add(func() {
		defer func() {
			if r := recover(); r != nil {
				errChan <- fmt.Errorf("任务 %d 出错: %v", i, r)
			}
		}()
		
		if i == 5 {
			panic("模拟错误")
		}
		fmt.Printf("处理任务 %d\n", i)
	})
}

go func() {
	for err := range errChan {
		fmt.Println("捕获到错误:", err)
	}
}()

pool.Wait()
close(errChan)

性能优化建议

  1. 批量提交任务:减少锁竞争

    tasks := make([]func(), 0, 100)
    for i := 0; i < 100; i++ {
        tasks = append(tasks, func() {
            // 任务逻辑
        })
    }
    pool.AddBatch(tasks...)
    
  2. 合理设置worker数量:通常设置为CPU核心数的2-3倍

  3. 使用对象池减少GC压力

    var taskPool = sync.Pool{
        New: func() interface{} {
            return new(MyTask)
        },
    }
    
    pool.Add(func() {
        task := taskPool.Get().(*MyTask)
        defer taskPool.Put(task)
        // 使用task
    })
    

与标准库sync.WaitGroup对比

kyoo相比sync.WaitGroup的优势:

  • 自动的任务分发和负载均衡
  • 内置并发控制
  • 优先级支持
  • 更丰富的管理接口

总结

kyoo是一个简单但功能强大的Go并发任务处理库,适合需要处理大量异步任务的场景。它的API设计简洁,同时提供了足够的灵活性来处理各种并发模式。

对于更复杂的场景,你还可以考虑结合其他库如:

  • github.com/gammazero/workerpool - 另一个流行的worker pool实现
  • github.com/Jeffail/tunny - 基于goroutine池的实现

希望这个介绍对你有所帮助!如果需要更高级的功能或有任何问题,可以查阅kyoo的官方文档或源代码。

回到顶部