golang高效管理Goroutine并发池插件库tunny的使用

Golang高效管理Goroutine并发池插件库tunny的使用

Tunny

Tunny是一个用于创建和管理goroutine池的Golang库,它允许你通过同步API限制来自任意数量goroutine的工作。

为什么使用Tunny

固定大小的goroutine池在以下场景非常有用:当工作来自任意数量的异步源,但并行处理能力有限时。例如,在处理CPU密集型的HTTP请求作业时,你可以创建一个与CPU数量匹配的池。

安装

使用go get安装:

go get github.com/Jeffail/tunny

或者使用dep:

dep ensure -add github.com/Jeffail/tunny

基本使用

对于大多数情况,你的繁重工作可以用一个简单的func()表示,这时可以使用NewFunc。以下是一个HTTP请求处理CPU密集型工作的示例:

package main

import (
	"io/ioutil"
	"net/http"
	"runtime"

	"github.com/Jeffail/tunny"
)

func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte

		// TODO: 在这里对payload进行CPU密集型操作

		return result
	})
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		}
		defer r.Body.Close()

		// 将工作放入池中。这个调用是同步的,会阻塞直到作业完成
		result := pool.Process(input)

		w.Write(result.([]byte))
	})

	http.ListenAndServe(":8080", nil)
}

超时处理

Tunny支持超时。你可以将上面的Process调用替换为:

result, err := pool.ProcessTimed(input, time.Second*5)
if err == tunny.ErrJobTimedOut {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

或者使用请求的context来处理超时和截止时间:

result, err := pool.ProcessCtx(r.Context(), input)
if err == context.DeadlineExceeded {
	http.Error(w, "Request timed out", http.StatusRequestTimeout)
}

动态调整池大小

Tunny池的大小可以随时通过SetSize(int)改变:

pool.SetSize(10) // 10个goroutine
pool.SetSize(100) // 100个goroutine

这个操作是线程安全的,即使其他goroutine仍在处理也可以安全调用。

带状态的Goroutine

有时Tunny池中的每个goroutine都需要自己的管理状态。在这种情况下,你应该实现tunny.Worker接口,它包括终止、中断(在作业超时且不再需要时)和阻塞下一个作业分配直到满足条件的方法。

当使用Worker类型创建池时,你需要提供一个构造函数来生成你的自定义实现:

pool := tunny.New(poolSize, func() Worker {
	// TODO: 在这里进行每个goroutine的状态分配
	return newCustomWorker()
})

这允许Tunny在池大小改变时干净地创建和销毁Worker类型。

处理顺序

积压的作业不保证按顺序处理。由于当前通道和select块的实现方式,积压的作业栈将作为FIFO队列处理。然而,这种行为不是规范的一部分,不应依赖于此。


更多关于golang高效管理Goroutine并发池插件库tunny的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效管理Goroutine并发池插件库tunny的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang高效管理Goroutine并发池插件库tunny使用指南

什么是tunny

tunny是一个轻量级的Goroutine池实现,它可以帮助开发者高效管理并发任务,避免频繁创建和销毁Goroutine带来的性能开销。tunny的主要特点包括:

  • 固定数量的工作Goroutine
  • 任务队列机制
  • 支持超时控制
  • 简单的API设计

tunny基本使用

安装tunny

go get github.com/Jeffail/tunny

基本示例

package main

import (
	"fmt"
	"time"
	"github.com/Jeffail/tunny"
)

func main() {
	// 创建一个包含3个工作Goroutine的池
	pool := tunny.NewFunc(3, func(payload interface{}) interface{} {
		// 这里是工作函数
		n := payload.(int)
		result := n * n
		fmt.Printf("处理 %d => %d\n", n, result)
		time.Sleep(1 * time.Second) // 模拟耗时操作
		return result
	})
	defer pool.Close()

	// 提交任务到池中
	for i := 1; i <= 10; i++ {
		go func(n int) {
			// 同步调用
			result, err := pool.ProcessTimed(n, 2*time.Second)
			if err != nil {
				fmt.Printf("任务 %d 处理超时: %v\n", n, err)
				return
			}
			fmt.Printf("任务 %d 完成, 结果: %d\n", n, result.(int))
		}(i)
	}

	// 等待所有任务完成
	time.Sleep(5 * time.Second)
}

tunny高级特性

1. 动态调整池大小

pool.SetSize(5) // 将池大小调整为5

2. 异步处理

go func() {
    result := pool.Process(10)
    fmt.Println(result)
}()

3. 超时控制

result, err := pool.ProcessTimed(5, 1*time.Second)
if err != nil {
    fmt.Println("处理超时:", err)
}

4. 获取池状态

queuedJobs := pool.QueueLength() // 获取队列中等待的任务数

实际应用示例

处理HTTP请求

package main

import (
	"fmt"
	"net/http"
	"sync"
	"github.com/Jeffail/tunny"
)

func main() {
	// 创建处理池
	pool := tunny.NewFunc(10, func(payload interface{}) interface{} {
		req := payload.(*http.Request)
		// 模拟处理请求
		time.Sleep(500 * time.Millisecond)
		return fmt.Sprintf("已处理请求: %s", req.URL.Path)
	})
	defer pool.Close()

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		// 使用池处理请求
		result, err := pool.ProcessTimed(r, 1*time.Second)
		if err != nil {
			http.Error(w, "处理超时", http.StatusRequestTimeout)
			return
		}
		fmt.Fprintln(w, result)
	})

	fmt.Println("服务器启动在 :8080")
	http.ListenAndServe(":8080", nil)
}

批量处理数据

package main

import (
	"fmt"
	"sync"
	"time"
	"github.com/Jeffail/tunny"
)

func main() {
	// 创建池
	pool := tunny.NewFunc(4, func(data interface{}) interface{} {
		item := data.(string)
		time.Sleep(500 * time.Millisecond) // 模拟处理时间
		return fmt.Sprintf("处理后的-%s", item)
	})
	defer pool.Close()

	// 待处理数据
	items := []string{"A", "B", "C", "D", "E", "F", "G", "H"}

	var wg sync.WaitGroup
	results := make([]string, len(items))

	for i, item := range items {
		wg.Add(1)
		go func(idx int, val string) {
			defer wg.Done()
			res, _ := pool.ProcessTimed(val, 1*time.Second)
			results[idx] = res.(string)
		}(i, item)
	}

	wg.Wait()
	fmt.Println("处理结果:", results)
}

性能考虑

  1. 池大小选择:通常设置为CPU核心数的2-4倍
  2. 任务队列长度:默认无限制,大量任务可能导致内存问题
  3. 超时设置:根据任务特性设置合理超时
  4. 资源释放:记得调用Close()释放资源

与其他库的比较

  • 相比ants:tunny更轻量,API更简单
  • 相比workerpool:tunny支持超时控制
  • 相比原生Goroutine:tunny避免了频繁创建销毁的开销

tunny是一个简单高效的Goroutine池实现,适合需要控制并发数量的场景,特别是当任务执行时间较长或任务量较大时,能显著提升性能。

回到顶部