Golang中如何规划并执行大量任务?
Golang中如何规划并执行大量任务? 我有一个关于服务的构想,每个客户端可以创建不限数量的任务,服务在检查后必须执行这些任务。
我该如何用 Go 语言实现这个想法?
是使用 cron 包,还是在内存中运行任务检查器?
任务存储在数据库中,因此我需要每分钟检查一次数据库,选择所有客户的任务并执行它们。
有谁了解吗?我可以听听您的意见吗?
3 回复
基本上,您可以通过cron作业或使用Go语言来实现这一点。然而,如果您想为任务使用数据库,Go语言则更为合适。
更多关于Golang中如何规划并执行大量任务?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
任务存储在数据库中,因此我需要每分钟检查一次数据库,选择所有客户的任务并执行它们。
这听起来像是一个定时任务。但也有可能WebSocket可以做同样的工作(Socket发布更改 > 订阅者收到通知)。
在Go中实现大规模任务调度和执行,推荐使用工作队列模式结合数据库持久化。以下是具体实现方案:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"sync"
"time"
_ "github.com/lib/pq"
)
type Task struct {
ID int
ClientID int
Command string
Status string
CreatedAt time.Time
}
type TaskScheduler struct {
db *sql.DB
workerCount int
taskQueue chan Task
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewTaskScheduler(db *sql.DB, workerCount int) *TaskScheduler {
ctx, cancel := context.WithCancel(context.Background())
return &TaskScheduler{
db: db,
workerCount: workerCount,
taskQueue: make(chan Task, 1000),
ctx: ctx,
cancel: cancel,
}
}
// 数据库轮询器
func (ts *TaskScheduler) startPoller(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ts.ctx.Done():
return
case <-ticker.C:
ts.fetchPendingTasks()
}
}
}
func (ts *TaskScheduler) fetchPendingTasks() {
rows, err := ts.db.QueryContext(ts.ctx, `
UPDATE tasks
SET status = 'processing'
WHERE id IN (
SELECT id FROM tasks
WHERE status = 'pending'
AND scheduled_at <= NOW()
FOR UPDATE SKIP LOCKED
LIMIT 100
)
RETURNING id, client_id, command, created_at
`)
if err != nil {
log.Printf("查询任务失败: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var task Task
if err := rows.Scan(&task.ID, &task.ClientID, &task.Command, &task.CreatedAt); err != nil {
log.Printf("扫描任务失败: %v", err)
continue
}
task.Status = "processing"
ts.taskQueue <- task
}
}
// 工作协程
func (ts *TaskScheduler) startWorker(id int) {
defer ts.wg.Done()
for {
select {
case <-ts.ctx.Done():
return
case task := <-ts.taskQueue:
ts.processTask(task)
}
}
}
func (ts *TaskScheduler) processTask(task Task) {
defer func() {
if r := recover(); r != nil {
log.Printf("任务处理异常: %v", r)
ts.updateTaskStatus(task.ID, "failed")
}
}()
// 执行任务逻辑
log.Printf("Worker处理任务 %d: %s", task.ID, task.Command)
// 模拟任务执行
time.Sleep(100 * time.Millisecond)
// 更新任务状态
ts.updateTaskStatus(task.ID, "completed")
}
func (ts *TaskScheduler) updateTaskStatus(taskID int, status string) {
_, err := ts.db.ExecContext(ts.ctx,
"UPDATE tasks SET status = $1, completed_at = NOW() WHERE id = $2",
status, taskID,
)
if err != nil {
log.Printf("更新任务状态失败: %v", err)
}
}
func (ts *TaskScheduler) Start() {
// 启动轮询器
go ts.startPoller(time.Minute)
// 启动工作协程
ts.wg.Add(ts.workerCount)
for i := 0; i < ts.workerCount; i++ {
go ts.startWorker(i)
}
}
func (ts *TaskScheduler) Stop() {
ts.cancel()
close(ts.taskQueue)
ts.wg.Wait()
}
func main() {
// 数据库连接
db, err := sql.Open("postgres", "host=localhost port=5432 user=postgres dbname=tasks sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 创建调度器
scheduler := NewTaskScheduler(db, 10)
scheduler.Start()
// 模拟运行
time.Sleep(5 * time.Minute)
scheduler.Stop()
}
对于定时检查,可以使用robfig/cron包:
import "github.com/robfig/cron/v3"
func setupCronScheduler(scheduler *TaskScheduler) {
c := cron.New()
// 每分钟执行一次任务获取
c.AddFunc("@every 1m", func() {
scheduler.fetchPendingTasks()
})
// 每小时清理已完成的任务
c.AddFunc("@hourly", func() {
cleanupOldTasks(scheduler.db)
})
c.Start()
}
func cleanupOldTasks(db *sql.DB) {
_, err := db.Exec(`
DELETE FROM tasks
WHERE status = 'completed'
AND completed_at < NOW() - INTERVAL '7 days'
`)
if err != nil {
log.Printf("清理任务失败: %v", err)
}
}
数据库表结构建议:
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
client_id INTEGER NOT NULL,
command TEXT NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
scheduled_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
INDEX idx_status_scheduled (status, scheduled_at),
INDEX idx_client_id (client_id)
);
这个方案提供了:
- 使用
FOR UPDATE SKIP LOCKED避免任务重复执行 - 工作协程池处理并发任务
- 上下文管理实现优雅关闭
- 数据库持久化保证任务不丢失
- 可配置的轮询间隔和工作协程数量

