Golang中如何实现并发运行调度器

Golang中如何实现并发运行调度器 我有一个公开网站,它处理数据并为这些数据运行不同的调度器以进行处理,例如将数据保存到数据库、向用户发送通知。

所有这些进程都通过调度器独立运行,并且这些进程是并发执行的。

我尝试使用 HTTP 来运行这些调度器,但使用 HTTP 的问题是系统可用的 TCP 端口不足,因为我需要处理海量数据,这些数据会在特定时间运行数百万个调度器,我也已经实现了速率限制。

我也尝试过使用 curl 来运行调度器,但它开始报错:

打开的文件过多

即使我已经将系统中的打开文件限制增加到 100 万,并且 curl 占用太多资源,所以我避免使用 curl。

为了更清楚地说明数据情况,假设我有 10,000 个调度器在并发运行,而在这些调度器内部,有 10-20 个调度器(针对每个调度器)并行运行以发送通知,我正在考虑用另一种方法来运行这些内部调度器,而不是使用 HTTP 或 curl。

注意:我必须向每个调度器传递不同的数据。并且数据是通过使用 Go 的 cron 作业进行处理的。

我正在考虑在内部运行这些调度器,我可以这样做吗?

是否有不使用 HTTP 或 curl 来运行调度器的更好解决方案?


更多关于Golang中如何实现并发运行调度器的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

本质上就是定时任务。

更多关于Golang中如何实现并发运行调度器的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你好,Sahil,你能澄清一下“调度器”是什么吗?

那么减少工作线程数量!

我不会同时处理大约超过1000个请求。请记住,不仅你的本地主机资源有限,你正在向其发送大量请求的服务器也是如此。

甚至还没有谈及参与路由你请求的主机。

调度器是并发调用的 net/http 请求,我们无法将这些调用放入 cron 中,因为这些 net/http 调用是在每个 cron 运行时决定的。

例如,某人设置了在上午 11 点发送提醒,而另一个用户设置了在下午 1 点发送提醒,因此这些调用是在运行时决定的。

所以我需要并发调用 net/http 请求以实现并行执行。

如果我并发调用这些 net/http 请求 50000 次,那么它就会开始报错。

我的系统配置如下:

  • 内存:16GB
  • 处理器:Intel® Core™ i5-10400 CPU @ 2.90GHz
  • 每个插槽的核心数:6
  • 每个核心的线程数:2
  • 操作系统:Ubuntu 20.04.5 LTS

sahil.garg:

所有这些进程都通过调度器独立运行,并且这些进程是并发执行的

sahil.garg:

为了更清楚地说明数据,假设我有 10,000 个调度器在并发运行,在这些调度器内部,每个调度器又有 10-20 个调度器在并行运行以发送通知。我正在考虑用另一种方法来运行这些内部的调度器,而不是使用 HTTP 或 curl。

当您提到“调度器”和“进程”时,您是指实际的 /usr/sbin/cron 实例(或者如果是 Windows 服务器,则是指 Windows 任务计划程序。您运行的是什么操作系统?)生成了多个可执行文件的操作系统进程,还是指单个进程中的 goroutine?根据操作系统以及是多进程还是单进程,您可以尝试更改不同的设置来增加服务器将接受的打开连接数。服务器的内存也可能在其中扮演一个潜在的重要角色。

假设您运行的是一个相对较新的 Linux 发行版,Baeldung 有一篇关于每个用户/会话/全局最大连接数的各种设置的好文章。

在Go中实现并发调度器的最佳方式是使用goroutine和channel,配合worker pool模式。以下是针对你场景的解决方案:

package main

import (
	"fmt"
	"sync"
	"time"
)

// 调度器任务结构
type SchedulerTask struct {
	ID       int
	Data     map[string]interface{}
	Priority int
}

// 调度器处理函数
func processScheduler(task SchedulerTask) {
	fmt.Printf("处理调度器 %d: 数据=%v\n", task.ID, task.Data)
	
	// 内部并行处理(10-20个并发任务)
	var wg sync.WaitGroup
	internalWorkers := 10
	
	for i := 0; i < internalWorkers; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			// 模拟发送通知等操作
			time.Sleep(100 * time.Millisecond)
			fmt.Printf("  调度器 %d - 内部任务 %d 完成\n", task.ID, workerID)
		}(i)
	}
	wg.Wait()
	
	fmt.Printf("调度器 %d 处理完成\n", task.ID)
}

// Worker函数
func worker(id int, tasks <-chan SchedulerTask, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range tasks {
		fmt.Printf("Worker %d 开始处理任务 %d\n", id, task.ID)
		processScheduler(task)
	}
}

func main() {
	// 配置worker pool
	numWorkers := 10000 // 外部调度器并发数
	taskQueue := make(chan SchedulerTask, 100000)
	
	var wg sync.WaitGroup
	
	// 启动worker
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, taskQueue, &wg)
	}
	
	// 模拟生成调度器任务(替代cron作业)
	go func() {
		for i := 1; i <= 1000000; i++ {
			task := SchedulerTask{
				ID: i,
				Data: map[string]interface{}{
					"user_id":    i * 100,
					"timestamp":  time.Now(),
					"action":     "notification",
					"parameters": fmt.Sprintf("param_%d", i),
				},
				Priority: i % 3,
			}
			taskQueue <- task
			
			// 速率控制:每秒1000个任务
			if i%1000 == 0 {
				time.Sleep(1 * time.Second)
			}
		}
		close(taskQueue)
	}()
	
	// 等待所有worker完成
	wg.Wait()
	fmt.Println("所有调度器处理完成")
}

对于需要更精细控制的场景,可以使用带优先级的worker pool:

package main

import (
	"container/heap"
	"context"
	"fmt"
	"sync"
	"time"
)

// 优先级任务
type PriorityTask struct {
	SchedulerTask
	index int
}

// 优先级队列
type PriorityQueue []*PriorityTask

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].Priority > pq[j].Priority // 值越大优先级越高
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
	n := len(*pq)
	task := x.(*PriorityTask)
	task.index = n
	*pq = append(*pq, task)
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	task := old[n-1]
	task.index = -1
	*pq = old[0 : n-1]
	return task
}

// 高级调度器
type AdvancedScheduler struct {
	taskQueue chan SchedulerTask
	priorityQueue PriorityQueue
	mu          sync.Mutex
	workers     int
	ctx         context.Context
	cancel      context.CancelFunc
}

func NewAdvancedScheduler(workers int) *AdvancedScheduler {
	ctx, cancel := context.WithCancel(context.Background())
	return &AdvancedScheduler{
		taskQueue: make(chan SchedulerTask, 10000),
		priorityQueue: make(PriorityQueue, 0),
		workers:     workers,
		ctx:         ctx,
		cancel:      cancel,
	}
}

func (s *AdvancedScheduler) Start() {
	heap.Init(&s.priorityQueue)
	
	// 启动worker
	for i := 0; i < s.workers; i++ {
		go s.worker(i)
	}
	
	// 启动调度器
	go s.dispatcher()
}

func (s *AdvancedScheduler) dispatcher() {
	for {
		select {
		case <-s.ctx.Done():
			return
		default:
			s.mu.Lock()
			if s.priorityQueue.Len() > 0 {
				task := heap.Pop(&s.priorityQueue).(*PriorityTask)
				s.mu.Unlock()
				
				// 处理任务
				processScheduler(task.SchedulerTask)
			} else {
				s.mu.Unlock()
				time.Sleep(10 * time.Millisecond)
			}
		}
	}
}

func (s *AdvancedScheduler) worker(id int) {
	for task := range s.taskQueue {
		fmt.Printf("Worker %d 接收任务 %d (优先级: %d)\n", id, task.ID, task.Priority)
		
		s.mu.Lock()
		heap.Push(&s.priorityQueue, &PriorityTask{
			SchedulerTask: task,
		})
		s.mu.Unlock()
	}
}

func (s *AdvancedScheduler) Submit(task SchedulerTask) {
	s.taskQueue <- task
}

func (s *AdvancedScheduler) Stop() {
	s.cancel()
	close(s.taskQueue)
}

func main() {
	// 创建调度器,10000个并发worker
	scheduler := NewAdvancedScheduler(10000)
	scheduler.Start()
	
	// 提交任务
	for i := 1; i <= 100000; i++ {
		task := SchedulerTask{
			ID: i,
			Data: map[string]interface{}{
				"batch":    i / 1000,
				"sequence": i % 1000,
			},
			Priority: i % 5, // 5个优先级级别
		}
		scheduler.Submit(task)
		
		// 速率限制
		if i%10000 == 0 {
			time.Sleep(1 * time.Second)
		}
	}
	
	// 等待处理完成
	time.Sleep(5 * time.Second)
	scheduler.Stop()
}

对于需要持久化和恢复的调度器,可以结合数据库:

package main

import (
	"database/sql"
	"fmt"
	"log"
	"sync"
	"time"
	
	_ "github.com/lib/pq"
)

type PersistentScheduler struct {
	db        *sql.DB
	workers   int
	taskChan  chan DBTask
	wg        sync.WaitGroup
}

type DBTask struct {
	ID        int
	Type      string
	Payload   string
	Status    string
	CreatedAt time.Time
}

func NewPersistentScheduler(dbConn string, workers int) (*PersistentScheduler, error) {
	db, err := sql.Open("postgres", dbConn)
	if err != nil {
		return nil, err
	}
	
	return &PersistentScheduler{
		db:       db,
		workers:  workers,
		taskChan: make(chan DBTask, 10000),
	}, nil
}

func (ps *PersistentScheduler) Start() {
	// 从数据库加载未完成的任务
	go ps.loadPendingTasks()
	
	// 启动worker
	for i := 0; i < ps.workers; i++ {
		ps.wg.Add(1)
		go ps.dbWorker(i)
	}
}

func (ps *PersistentScheduler) loadPendingTasks() {
	rows, err := ps.db.Query(`
		SELECT id, type, payload, status, created_at 
		FROM scheduler_tasks 
		WHERE status IN ('pending', 'processing')
		ORDER BY created_at ASC
		LIMIT 100000
	`)
	if err != nil {
		log.Printf("加载待处理任务失败: %v", err)
		return
	}
	defer rows.Close()
	
	for rows.Next() {
		var task DBTask
		if err := rows.Scan(&task.ID, &task.Type, &task.Payload, &task.Status, &task.CreatedAt); err != nil {
			log.Printf("扫描任务失败: %v", err)
			continue
		}
		ps.taskChan <- task
	}
}

func (ps *PersistentScheduler) dbWorker(id int) {
	defer ps.wg.Done()
	
	for task := range ps.taskChan {
		// 更新状态为处理中
		ps.updateTaskStatus(task.ID, "processing")
		
		// 处理任务
		if err := ps.processTask(task); err != nil {
			log.Printf("Worker %d 处理任务 %d 失败: %v", id, task.ID, err)
			ps.updateTaskStatus(task.ID, "failed")
		} else {
			ps.updateTaskStatus(task.ID, "completed")
		}
	}
}

func (ps *PersistentScheduler) processTask(task DBTask) error {
	// 模拟任务处理
	time.Sleep(50 * time.Millisecond)
	fmt.Printf("处理数据库任务 %d: %s\n", task.ID, task.Payload)
	return nil
}

func (ps *PersistentScheduler) updateTaskStatus(taskID int, status string) {
	_, err := ps.db.Exec(
		"UPDATE scheduler_tasks SET status = $1, updated_at = NOW() WHERE id = $2",
		status, taskID,
	)
	if err != nil {
		log.Printf("更新任务状态失败: %v", err)
	}
}

func (ps *PersistentScheduler) SubmitTask(taskType, payload string) error {
	_, err := ps.db.Exec(`
		INSERT INTO scheduler_tasks (type, payload, status, created_at)
		VALUES ($1, $2, 'pending', NOW())
	`, taskType, payload)
	return err
}

func (ps *PersistentScheduler) Stop() {
	close(ps.taskChan)
	ps.wg.Wait()
	ps.db.Close()
}

这些方案完全避免了HTTP和curl的限制,使用Go原生并发机制,可以高效处理数百万个调度器任务。

回到顶部