golang实现消息调度到通道的定时分发插件库gosd的使用
golang实现消息调度到通道的定时分发插件库gosd的使用
概述
gosd (go-schedulable-dispatcher) 是一个用于调度消息何时分发到通道的库。它提供了一个交互式API来处理带有调度器的消息调度。
实现原理
消息被接收并处理到一个基于堆的优先级队列中。默认情况下,当消息具有相同的调度时间时,不保证顺序,但可以通过配置更改。如果保证顺序,性能会稍微降低。如果严格排序对您的应用程序不关键,建议保持默认设置。
使用示例
下面是一个完整的示例代码,展示如何使用gosd库:
package main
import (
"context"
"fmt"
"time"
"github.com/alexsniffin/gosd"
)
func main() {
// 创建调度器实例
dispatcher, err := gosd.NewDispatcher[string](&gosd.DispatcherConfig{
IngressChannelSize: 100, // 入口通道大小
DispatchChannelSize: 100, // 分发通道大小
MaxMessages: 100, // 最大消息数
GuaranteeOrder: false, // 是否保证顺序
})
if err != nil {
panic(err)
}
// 启动调度器处理进程
go dispatcher.Start()
// 调度一个1秒后发送的消息
dispatcher.IngressChannel() <- &gosd.ScheduledMessage[string]{
At: time.Now().Add(1 * time.Second), // 1秒后发送
Message: "Hello World in 1 second!", // 消息内容
}
// 等待接收消息
msg := <-dispatcher.DispatchChannel()
fmt.Println(msg)
// 输出: Hello World in 1 second!
// 关闭调度器,没有截止时间
dispatcher.Shutdown(context.Background(), false)
}
配置选项
DispatcherConfig结构体包含以下配置选项:
IngressChannelSize
: 入口通道大小DispatchChannelSize
: 分发通道大小MaxMessages
: 最大消息数GuaranteeOrder
: 是否保证消息顺序
性能基准测试
使用Go 1.19和每次迭代1000条消息进行测试,结果如下:
goos: windows
goarch: amd64
pkg: github.com/alexsniffin/gosd/v2
cpu: Intel(R) Core(TM) i7-8700K CPU @ 3.70GHz
Benchmark_integration_unordered
Benchmark_integration_unordered-12 307 3690528 ns/op
Benchmark_integration_unorderedSmallBuffer
Benchmark_integration_unorderedSmallBuffer-12 274 4120104 ns/op
Benchmark_integration_unorderedSmallHeap
Benchmark_integration_unorderedSmallHeap-12 348 3452703 ns/op
Benchmark_integration_ordered
Benchmark_integration_ordered-12 135 8650709 ns/op
Benchmark_integration_orderedSmallBuffer
Benchmark_integration_orderedSmallBuffer-12 207 5867338 ns/op
Benchmark_integration_orderedSmallHeap
Benchmark_integration_orderedSmallHeap-12 350 3592990 ns/op
Benchmark_integration_orderedSameTime
Benchmark_integration_orderedSameTime-12 133 8909311 ns/op
从基准测试可以看出,不保证顺序的模式(unordered
)比保证顺序的模式(ordered
)性能更好。
更多关于golang实现消息调度到通道的定时分发插件库gosd的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang实现消息调度到通道的定时分发插件库gosd的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang 实现消息定时分发的 gosd 库使用指南
gosd 是一个用于 Golang 的消息调度和定时分发到通道的插件库,它可以帮助你实现消息的延迟发送和定时调度功能。
安装 gosd
go get github.com/yourusername/gosd
基本使用示例
1. 创建调度器实例
package main
import (
"fmt"
"time"
"github.com/yourusername/gosd"
)
func main() {
// 创建调度器实例
scheduler := gosd.NewScheduler()
// 确保在程序退出前关闭调度器
defer scheduler.Stop()
// 创建一个消息通道
msgChan := make(chan string, 10)
// 启动调度器
go scheduler.Run()
// 添加定时任务
scheduler.Schedule(time.Now().Add(2*time.Second), "Hello after 2 seconds", msgChan)
scheduler.Schedule(time.Now().Add(5*time.Second), "Hello after 5 seconds", msgChan)
// 接收消息
for i := 0; i < 2; i++ {
msg := <-msgChan
fmt.Println("Received:", msg)
}
}
2. 周期性任务
func periodicExample() {
scheduler := gosd.NewScheduler()
defer scheduler.Stop()
msgChan := make(chan string, 10)
go scheduler.Run()
// 每隔1秒发送一次消息,共发送5次
scheduler.SchedulePeriodic(1*time.Second, 5, "Periodic message", msgChan)
for i := 0; i < 5; i++ {
msg := <-msgChan
fmt.Println("Periodic:", msg, "at", time.Now().Format("15:04:05"))
}
}
高级功能
1. 自定义消息类型
type CustomMessage struct {
ID int
Content string
}
func customTypeExample() {
scheduler := gosd.NewScheduler()
defer scheduler.Stop()
customChan := make(chan CustomMessage, 10)
go scheduler.Run()
// 发送自定义类型消息
scheduler.Schedule(time.Now().Add(3*time.Second),
CustomMessage{ID: 1, Content: "Custom content"},
customChan)
msg := <-customChan
fmt.Printf("Received custom message: %+v\n", msg)
}
2. 取消已调度的任务
func cancelExample() {
scheduler := gosd.NewScheduler()
defer scheduler.Stop()
msgChan := make(chan string, 10)
go scheduler.Run()
// 添加任务并获取任务ID
taskID := scheduler.Schedule(time.Now().Add(10*time.Second), "This will be cancelled", msgChan)
// 取消任务
scheduler.Cancel(taskID)
// 验证是否取消
select {
case msg := <-msgChan:
fmt.Println("Unexpected message:", msg)
case <-time.After(11 * time.Second):
fmt.Println("Message was successfully cancelled")
}
}
最佳实践
- 资源管理:确保在不再需要调度器时调用
Stop()
方法释放资源 - 通道缓冲:为消息通道设置适当的缓冲大小以防止阻塞
- 错误处理:考虑添加错误处理通道来接收调度过程中的错误
- 性能考量:对于大量任务,考虑使用优先级队列优化
扩展功能
如果需要更复杂的功能,可以扩展 gosd:
// 扩展调度器添加优先级功能
type PriorityScheduler struct {
*gosd.Scheduler
priorityQueue *PriorityQueue
}
func NewPriorityScheduler() *PriorityScheduler {
return &PriorityScheduler{
Scheduler: gosd.NewScheduler(),
priorityQueue: NewPriorityQueue(),
}
}
// 重写Schedule方法支持优先级
func (ps *PriorityScheduler) ScheduleWithPriority(when time.Time, priority int, msg interface{}, ch interface{}) int {
// 实现优先级调度逻辑
// ...
}
gosd 提供了一个简单而强大的基础,你可以根据实际需求进行扩展和定制。