在Go中实现并发调度器的最佳方式是使用goroutine和channel,配合worker pool模式。以下是针对你场景的解决方案:
package main
import (
"fmt"
"sync"
"time"
)
// 调度器任务结构
type SchedulerTask struct {
ID int
Data map[string]interface{}
Priority int
}
// 调度器处理函数
func processScheduler(task SchedulerTask) {
fmt.Printf("处理调度器 %d: 数据=%v\n", task.ID, task.Data)
// 内部并行处理(10-20个并发任务)
var wg sync.WaitGroup
internalWorkers := 10
for i := 0; i < internalWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// 模拟发送通知等操作
time.Sleep(100 * time.Millisecond)
fmt.Printf(" 调度器 %d - 内部任务 %d 完成\n", task.ID, workerID)
}(i)
}
wg.Wait()
fmt.Printf("调度器 %d 处理完成\n", task.ID)
}
// Worker函数
func worker(id int, tasks <-chan SchedulerTask, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d 开始处理任务 %d\n", id, task.ID)
processScheduler(task)
}
}
func main() {
// 配置worker pool
numWorkers := 10000 // 外部调度器并发数
taskQueue := make(chan SchedulerTask, 100000)
var wg sync.WaitGroup
// 启动worker
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, taskQueue, &wg)
}
// 模拟生成调度器任务(替代cron作业)
go func() {
for i := 1; i <= 1000000; i++ {
task := SchedulerTask{
ID: i,
Data: map[string]interface{}{
"user_id": i * 100,
"timestamp": time.Now(),
"action": "notification",
"parameters": fmt.Sprintf("param_%d", i),
},
Priority: i % 3,
}
taskQueue <- task
// 速率控制:每秒1000个任务
if i%1000 == 0 {
time.Sleep(1 * time.Second)
}
}
close(taskQueue)
}()
// 等待所有worker完成
wg.Wait()
fmt.Println("所有调度器处理完成")
}
对于需要更精细控制的场景,可以使用带优先级的worker pool:
package main
import (
"container/heap"
"context"
"fmt"
"sync"
"time"
)
// 优先级任务
type PriorityTask struct {
SchedulerTask
index int
}
// 优先级队列
type PriorityQueue []*PriorityTask
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Priority > pq[j].Priority // 值越大优先级越高
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
task := x.(*PriorityTask)
task.index = n
*pq = append(*pq, task)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
task := old[n-1]
task.index = -1
*pq = old[0 : n-1]
return task
}
// 高级调度器
type AdvancedScheduler struct {
taskQueue chan SchedulerTask
priorityQueue PriorityQueue
mu sync.Mutex
workers int
ctx context.Context
cancel context.CancelFunc
}
func NewAdvancedScheduler(workers int) *AdvancedScheduler {
ctx, cancel := context.WithCancel(context.Background())
return &AdvancedScheduler{
taskQueue: make(chan SchedulerTask, 10000),
priorityQueue: make(PriorityQueue, 0),
workers: workers,
ctx: ctx,
cancel: cancel,
}
}
func (s *AdvancedScheduler) Start() {
heap.Init(&s.priorityQueue)
// 启动worker
for i := 0; i < s.workers; i++ {
go s.worker(i)
}
// 启动调度器
go s.dispatcher()
}
func (s *AdvancedScheduler) dispatcher() {
for {
select {
case <-s.ctx.Done():
return
default:
s.mu.Lock()
if s.priorityQueue.Len() > 0 {
task := heap.Pop(&s.priorityQueue).(*PriorityTask)
s.mu.Unlock()
// 处理任务
processScheduler(task.SchedulerTask)
} else {
s.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
}
}
}
func (s *AdvancedScheduler) worker(id int) {
for task := range s.taskQueue {
fmt.Printf("Worker %d 接收任务 %d (优先级: %d)\n", id, task.ID, task.Priority)
s.mu.Lock()
heap.Push(&s.priorityQueue, &PriorityTask{
SchedulerTask: task,
})
s.mu.Unlock()
}
}
func (s *AdvancedScheduler) Submit(task SchedulerTask) {
s.taskQueue <- task
}
func (s *AdvancedScheduler) Stop() {
s.cancel()
close(s.taskQueue)
}
func main() {
// 创建调度器,10000个并发worker
scheduler := NewAdvancedScheduler(10000)
scheduler.Start()
// 提交任务
for i := 1; i <= 100000; i++ {
task := SchedulerTask{
ID: i,
Data: map[string]interface{}{
"batch": i / 1000,
"sequence": i % 1000,
},
Priority: i % 5, // 5个优先级级别
}
scheduler.Submit(task)
// 速率限制
if i%10000 == 0 {
time.Sleep(1 * time.Second)
}
}
// 等待处理完成
time.Sleep(5 * time.Second)
scheduler.Stop()
}
对于需要持久化和恢复的调度器,可以结合数据库:
package main
import (
"database/sql"
"fmt"
"log"
"sync"
"time"
_ "github.com/lib/pq"
)
type PersistentScheduler struct {
db *sql.DB
workers int
taskChan chan DBTask
wg sync.WaitGroup
}
type DBTask struct {
ID int
Type string
Payload string
Status string
CreatedAt time.Time
}
func NewPersistentScheduler(dbConn string, workers int) (*PersistentScheduler, error) {
db, err := sql.Open("postgres", dbConn)
if err != nil {
return nil, err
}
return &PersistentScheduler{
db: db,
workers: workers,
taskChan: make(chan DBTask, 10000),
}, nil
}
func (ps *PersistentScheduler) Start() {
// 从数据库加载未完成的任务
go ps.loadPendingTasks()
// 启动worker
for i := 0; i < ps.workers; i++ {
ps.wg.Add(1)
go ps.dbWorker(i)
}
}
func (ps *PersistentScheduler) loadPendingTasks() {
rows, err := ps.db.Query(`
SELECT id, type, payload, status, created_at
FROM scheduler_tasks
WHERE status IN ('pending', 'processing')
ORDER BY created_at ASC
LIMIT 100000
`)
if err != nil {
log.Printf("加载待处理任务失败: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var task DBTask
if err := rows.Scan(&task.ID, &task.Type, &task.Payload, &task.Status, &task.CreatedAt); err != nil {
log.Printf("扫描任务失败: %v", err)
continue
}
ps.taskChan <- task
}
}
func (ps *PersistentScheduler) dbWorker(id int) {
defer ps.wg.Done()
for task := range ps.taskChan {
// 更新状态为处理中
ps.updateTaskStatus(task.ID, "processing")
// 处理任务
if err := ps.processTask(task); err != nil {
log.Printf("Worker %d 处理任务 %d 失败: %v", id, task.ID, err)
ps.updateTaskStatus(task.ID, "failed")
} else {
ps.updateTaskStatus(task.ID, "completed")
}
}
}
func (ps *PersistentScheduler) processTask(task DBTask) error {
// 模拟任务处理
time.Sleep(50 * time.Millisecond)
fmt.Printf("处理数据库任务 %d: %s\n", task.ID, task.Payload)
return nil
}
func (ps *PersistentScheduler) updateTaskStatus(taskID int, status string) {
_, err := ps.db.Exec(
"UPDATE scheduler_tasks SET status = $1, updated_at = NOW() WHERE id = $2",
status, taskID,
)
if err != nil {
log.Printf("更新任务状态失败: %v", err)
}
}
func (ps *PersistentScheduler) SubmitTask(taskType, payload string) error {
_, err := ps.db.Exec(`
INSERT INTO scheduler_tasks (type, payload, status, created_at)
VALUES ($1, $2, 'pending', NOW())
`, taskType, payload)
return err
}
func (ps *PersistentScheduler) Stop() {
close(ps.taskChan)
ps.wg.Wait()
ps.db.Close()
}
这些方案完全避免了HTTP和curl的限制,使用Go原生并发机制,可以高效处理数百万个调度器任务。