golang高性能线程池实现插件库threadpool的使用

Golang高性能线程池实现插件库threadpool的使用

安装

go get github.com/shettyh/threadpool

线程池使用

1. 实现Runnable接口

首先需要创建一个实现了Runnable接口的任务结构体:

type MyTask struct { }
 
func (t *MyTask) Run(){
  // 在这里实现你的任务逻辑
}

2. 创建线程池实例

创建一个线程池实例,指定工作线程数量和任务队列大小:

pool := threadpool.NewThreadPool(200, 1000000)

3. 执行任务

创建任务并提交到线程池执行:

task := &MyTask{}
err := pool.Execute(task)

4. 使用Callable任务

如果需要获取任务执行结果,可以实现Callable接口:

type MyTaskCallable struct { }

func (c *MyTaskCallable) Call() interface{} {
  // 执行任务并返回结果
  return result
}

// 执行可返回结果的任务
task := &MyTaskCallable{}
future, err := pool.ExecuteFuture(task)

// 检查任务是否完成
isDone := future.IsDone() // true/false

// 获取结果(阻塞调用)
result := future.Get()

5. 关闭线程池

使用完毕后关闭线程池:

pool.Close()

定时线程池使用

1. 创建定时线程池

schedulerPool := threadpool.NewScheduledThreadPool(10)

2. 调度任务

延迟执行任务:

task := &MyTask{}
pool.ScheduleOnce(task, time.Second*20) // 当前仅支持秒级延迟

3. 关闭线程池

pool.Close()

完整示例

package main

import (
	"fmt"
	"time"
	"github.com/shettyh/threadpool"
)

// 普通任务
type SimpleTask struct{}

func (t *SimpleTask) Run() {
	fmt.Println("SimpleTask is running")
}

// 可返回结果的任务
type ResultTask struct{}

func (t *ResultTask) Call() interface{} {
	time.Sleep(1 * time.Second)
	return "Task result"
}

func main() {
	// 创建线程池
	pool := threadpool.NewThreadPool(5, 1000)
	defer pool.Close()

	// 执行普通任务
	simpleTask := &SimpleTask{}
	err := pool.Execute(simpleTask)
	if err != nil {
		fmt.Println("Execute error:", err)
	}

	// 执行可返回结果的任务
	resultTask := &ResultTask{}
	future, err := pool.ExecuteFuture(resultTask)
	if err != nil {
		fmt.Println("ExecuteFuture error:", err)
		return
	}

	// 获取结果
	result := future.Get()
	fmt.Println("Task result:", result)

	// 定时线程池示例
	scheduler := threadpool.NewScheduledThreadPool(2)
	defer scheduler.Close()

	// 延迟执行任务
	scheduler.ScheduleOnce(&SimpleTask{}, time.Second*2)
	time.Sleep(3 * time.Second)
}

这个高性能线程池库提供了简单易用的API,可以方便地实现并发任务处理和定时任务调度。通过合理配置工作线程数量和任务队列大小,可以有效地处理高并发场景下的任务执行需求。


更多关于golang高性能线程池实现插件库threadpool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能线程池实现插件库threadpool的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang高性能线程池实现 - threadpool插件库使用指南

在Go语言中,虽然goroutine本身非常轻量级,但在某些场景下,使用线程池可以更好地控制资源使用、限制并发数量以及复用goroutine。下面我将介绍如何使用threadpool这个高性能线程池插件库。

threadpool库简介

threadpool是一个轻量级、高性能的Golang线程池实现,主要特点包括:

  • 动态调整池大小
  • 任务队列管理
  • 优雅关闭
  • 错误处理机制

安装

go get github.com/ivpusic/grpool

基本使用

1. 创建线程池

package main

import (
	"fmt"
	"github.com/ivpusic/grpool"
	"time"
)

func main() {
	// 创建一个拥有10个工作goroutine,最大1000个任务队列的池
	pool := grpool.NewPool(10, 1000)
	
	// 确保在程序结束时关闭池
	defer pool.Release()
	
	// 提交任务到池中
	for i := 0; i < 20; i++ {
		count := i
		pool.JobQueue <- func() {
			time.Sleep(1 * time.Second)
			fmt.Printf("I am worker! Number %d\n", count)
		}
	}
	
	// 等待所有任务完成
	pool.WaitAll()
}

2. 带返回值的任务

package main

import (
	"fmt"
	"github.com/ivpusic/grpool"
)

func main() {
	pool := grpool.NewPool(5, 20)
	defer pool.Release()
	
	results := make(chan int, 10)
	
	for i := 0; i < 10; i++ {
		i := i
		pool.JobQueue <- func() {
			results <- i * 2
		}
	}
	
	pool.WaitAll()
	close(results)
	
	for res := range results {
		fmt.Println("Result:", res)
	}
}

高级特性

1. 动态调整池大小

pool := grpool.NewPool(5, 100)

// 调整工作goroutine数量为10
pool.Tune(10)

2. 优雅关闭

pool := grpool.NewPool(5, 100)

// 停止接受新任务
pool.Release()

// 强制立即关闭,不等待当前任务完成
// pool.ReleaseNow()

3. 错误处理

pool := grpool.NewPool(5, 100)

pool.JobQueue <- func() {
	defer func() {
		if r := recover(); r != nil {
			fmt.Println("Recovered in worker:", r)
		}
	}()
	
	// 可能引发panic的代码
	panic("something went wrong")
}

pool.WaitAll()

性能优化建议

  1. 合理设置池大小:通常设置为CPU核心数的2-5倍
  2. 任务粒度:确保每个任务有足够的工作量,避免任务过小
  3. 避免阻塞:长时间阻塞的任务会占用工作goroutine
  4. 复用池:避免频繁创建和销毁池

完整示例

package main

import (
	"fmt"
	"github.com/ivpusic/grpool"
	"sync/atomic"
	"time"
)

func main() {
	// 创建线程池
	pool := grpool.NewPool(10, 1000)
	defer pool.Release()
	
	var counter int32
	
	// 提交100个任务
	for i := 0; i < 100; i++ {
		pool.JobQueue <- func() {
			time.Sleep(100 * time.Millisecond)
			atomic.AddInt32(&counter, 1)
		}
	}
	
	// 等待所有任务完成
	pool.WaitAll()
	
	fmt.Println("Total tasks completed:", counter)
	
	// 性能测试
	start := time.Now()
	for i := 0; i < 1000; i++ {
		pool.JobQueue <- func() {
			time.Sleep(1 * time.Millisecond)
		}
	}
	pool.WaitAll()
	fmt.Printf("Processed 1000 tasks in %v\n", time.Since(start))
}

替代方案

如果threadpool库不满足需求,也可以考虑以下替代方案:

  1. ants:高性能goroutine池
  2. goworker:更复杂的任务队列实现
  3. tunny:另一种goroutine池实现

总结

threadpool库提供了简单易用的线程池实现,适合需要控制并发数量的场景。通过合理配置池大小和任务队列长度,可以在保证性能的同时避免资源耗尽。记得在程序退出时正确释放池资源,并根据实际任务特点调整参数。

希望这个指南能帮助你高效使用Golang线程池!

回到顶部