golang实现消息调度到通道的定时分发插件库gosd的使用

golang实现消息调度到通道的定时分发插件库gosd的使用

概述

gosd (go-schedulable-dispatcher) 是一个用于调度消息何时分发到通道的库。它提供了一个交互式API来处理带有调度器的消息调度。

实现原理

消息被接收并处理到一个基于堆的优先级队列中。默认情况下,当消息具有相同的调度时间时,不保证顺序,但可以通过配置更改。如果保证顺序,性能会稍微降低。如果严格排序对您的应用程序不关键,建议保持默认设置。

使用示例

下面是一个完整的示例代码,展示如何使用gosd库:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/alexsniffin/gosd"
)

func main() {
	// 创建调度器实例
	dispatcher, err := gosd.NewDispatcher[string](&gosd.DispatcherConfig{
		IngressChannelSize:  100,  // 入口通道大小
		DispatchChannelSize: 100,  // 分发通道大小
		MaxMessages:         100,  // 最大消息数
		GuaranteeOrder:      false, // 是否保证顺序
	})
	if err != nil {
		panic(err)
	}

	// 启动调度器处理进程
	go dispatcher.Start()

	// 调度一个1秒后发送的消息
	dispatcher.IngressChannel() <- &gosd.ScheduledMessage[string]{
		At:      time.Now().Add(1 * time.Second), // 1秒后发送
		Message: "Hello World in 1 second!",     // 消息内容
	}

	// 等待接收消息
	msg := <-dispatcher.DispatchChannel()

	fmt.Println(msg)
	// 输出: Hello World in 1 second!

	// 关闭调度器,没有截止时间
	dispatcher.Shutdown(context.Background(), false)
}

配置选项

DispatcherConfig结构体包含以下配置选项:

  • IngressChannelSize: 入口通道大小
  • DispatchChannelSize: 分发通道大小
  • MaxMessages: 最大消息数
  • GuaranteeOrder: 是否保证消息顺序

性能基准测试

使用Go 1.19和每次迭代1000条消息进行测试,结果如下:

goos: windows
goarch: amd64
pkg: github.com/alexsniffin/gosd/v2
cpu: Intel(R) Core(TM) i7-8700K CPU @ 3.70GHz
Benchmark_integration_unordered
Benchmark_integration_unordered-12                           307           3690528 ns/op
Benchmark_integration_unorderedSmallBuffer
Benchmark_integration_unorderedSmallBuffer-12                274           4120104 ns/op
Benchmark_integration_unorderedSmallHeap
Benchmark_integration_unorderedSmallHeap-12                  348           3452703 ns/op
Benchmark_integration_ordered
Benchmark_integration_ordered-12                             135           8650709 ns/op
Benchmark_integration_orderedSmallBuffer
Benchmark_integration_orderedSmallBuffer-12                  207           5867338 ns/op
Benchmark_integration_orderedSmallHeap
Benchmark_integration_orderedSmallHeap-12                    350           3592990 ns/op
Benchmark_integration_orderedSameTime
Benchmark_integration_orderedSameTime-12                     133           8909311 ns/op

从基准测试可以看出,不保证顺序的模式(unordered)比保证顺序的模式(ordered)性能更好。


更多关于golang实现消息调度到通道的定时分发插件库gosd的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现消息调度到通道的定时分发插件库gosd的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 实现消息定时分发的 gosd 库使用指南

gosd 是一个用于 Golang 的消息调度和定时分发到通道的插件库,它可以帮助你实现消息的延迟发送和定时调度功能。

安装 gosd

go get github.com/yourusername/gosd

基本使用示例

1. 创建调度器实例

package main

import (
	"fmt"
	"time"
	
	"github.com/yourusername/gosd"
)

func main() {
	// 创建调度器实例
	scheduler := gosd.NewScheduler()
	
	// 确保在程序退出前关闭调度器
	defer scheduler.Stop()
	
	// 创建一个消息通道
	msgChan := make(chan string, 10)
	
	// 启动调度器
	go scheduler.Run()
	
	// 添加定时任务
	scheduler.Schedule(time.Now().Add(2*time.Second), "Hello after 2 seconds", msgChan)
	scheduler.Schedule(time.Now().Add(5*time.Second), "Hello after 5 seconds", msgChan)
	
	// 接收消息
	for i := 0; i < 2; i++ {
		msg := <-msgChan
		fmt.Println("Received:", msg)
	}
}

2. 周期性任务

func periodicExample() {
	scheduler := gosd.NewScheduler()
	defer scheduler.Stop()
	
	msgChan := make(chan string, 10)
	
	go scheduler.Run()
	
	// 每隔1秒发送一次消息,共发送5次
	scheduler.SchedulePeriodic(1*time.Second, 5, "Periodic message", msgChan)
	
	for i := 0; i < 5; i++ {
		msg := <-msgChan
		fmt.Println("Periodic:", msg, "at", time.Now().Format("15:04:05"))
	}
}

高级功能

1. 自定义消息类型

type CustomMessage struct {
	ID      int
	Content string
}

func customTypeExample() {
	scheduler := gosd.NewScheduler()
	defer scheduler.Stop()
	
	customChan := make(chan CustomMessage, 10)
	
	go scheduler.Run()
	
	// 发送自定义类型消息
	scheduler.Schedule(time.Now().Add(3*time.Second), 
		CustomMessage{ID: 1, Content: "Custom content"}, 
		customChan)
	
	msg := <-customChan
	fmt.Printf("Received custom message: %+v\n", msg)
}

2. 取消已调度的任务

func cancelExample() {
	scheduler := gosd.NewScheduler()
	defer scheduler.Stop()
	
	msgChan := make(chan string, 10)
	
	go scheduler.Run()
	
	// 添加任务并获取任务ID
	taskID := scheduler.Schedule(time.Now().Add(10*time.Second), "This will be cancelled", msgChan)
	
	// 取消任务
	scheduler.Cancel(taskID)
	
	// 验证是否取消
	select {
	case msg := <-msgChan:
		fmt.Println("Unexpected message:", msg)
	case <-time.After(11 * time.Second):
		fmt.Println("Message was successfully cancelled")
	}
}

最佳实践

  1. 资源管理:确保在不再需要调度器时调用 Stop() 方法释放资源
  2. 通道缓冲:为消息通道设置适当的缓冲大小以防止阻塞
  3. 错误处理:考虑添加错误处理通道来接收调度过程中的错误
  4. 性能考量:对于大量任务,考虑使用优先级队列优化

扩展功能

如果需要更复杂的功能,可以扩展 gosd:

// 扩展调度器添加优先级功能
type PriorityScheduler struct {
	*gosd.Scheduler
	priorityQueue *PriorityQueue
}

func NewPriorityScheduler() *PriorityScheduler {
	return &PriorityScheduler{
		Scheduler:     gosd.NewScheduler(),
		priorityQueue: NewPriorityQueue(),
	}
}

// 重写Schedule方法支持优先级
func (ps *PriorityScheduler) ScheduleWithPriority(when time.Time, priority int, msg interface{}, ch interface{}) int {
	// 实现优先级调度逻辑
	// ...
}

gosd 提供了一个简单而强大的基础,你可以根据实际需求进行扩展和定制。

回到顶部