Golang中如何规划并执行大量任务?

Golang中如何规划并执行大量任务? 我有一个关于服务的构想,每个客户端可以创建不限数量的任务,服务在检查后必须执行这些任务。

我该如何用 Go 语言实现这个想法?

是使用 cron 包,还是在内存中运行任务检查器?

任务存储在数据库中,因此我需要每分钟检查一次数据库,选择所有客户的任务并执行它们。

有谁了解吗?我可以听听您的意见吗?

3 回复

基本上,您可以通过cron作业或使用Go语言来实现这一点。然而,如果您想为任务使用数据库,Go语言则更为合适。

更多关于Golang中如何规划并执行大量任务?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


任务存储在数据库中,因此我需要每分钟检查一次数据库,选择所有客户的任务并执行它们。

这听起来像是一个定时任务。但也有可能WebSocket可以做同样的工作(Socket发布更改 > 订阅者收到通知)。

在Go中实现大规模任务调度和执行,推荐使用工作队列模式结合数据库持久化。以下是具体实现方案:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"

    _ "github.com/lib/pq"
)

type Task struct {
    ID        int
    ClientID  int
    Command   string
    Status    string
    CreatedAt time.Time
}

type TaskScheduler struct {
    db          *sql.DB
    workerCount int
    taskQueue   chan Task
    wg          sync.WaitGroup
    ctx         context.Context
    cancel      context.CancelFunc
}

func NewTaskScheduler(db *sql.DB, workerCount int) *TaskScheduler {
    ctx, cancel := context.WithCancel(context.Background())
    return &TaskScheduler{
        db:          db,
        workerCount: workerCount,
        taskQueue:   make(chan Task, 1000),
        ctx:         ctx,
        cancel:      cancel,
    }
}

// 数据库轮询器
func (ts *TaskScheduler) startPoller(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        select {
        case <-ts.ctx.Done():
            return
        case <-ticker.C:
            ts.fetchPendingTasks()
        }
    }
}

func (ts *TaskScheduler) fetchPendingTasks() {
    rows, err := ts.db.QueryContext(ts.ctx, `
        UPDATE tasks 
        SET status = 'processing'
        WHERE id IN (
            SELECT id FROM tasks 
            WHERE status = 'pending' 
            AND scheduled_at <= NOW()
            FOR UPDATE SKIP LOCKED
            LIMIT 100
        )
        RETURNING id, client_id, command, created_at
    `)
    if err != nil {
        log.Printf("查询任务失败: %v", err)
        return
    }
    defer rows.Close()

    for rows.Next() {
        var task Task
        if err := rows.Scan(&task.ID, &task.ClientID, &task.Command, &task.CreatedAt); err != nil {
            log.Printf("扫描任务失败: %v", err)
            continue
        }
        task.Status = "processing"
        ts.taskQueue <- task
    }
}

// 工作协程
func (ts *TaskScheduler) startWorker(id int) {
    defer ts.wg.Done()
    
    for {
        select {
        case <-ts.ctx.Done():
            return
        case task := <-ts.taskQueue:
            ts.processTask(task)
        }
    }
}

func (ts *TaskScheduler) processTask(task Task) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("任务处理异常: %v", r)
            ts.updateTaskStatus(task.ID, "failed")
        }
    }()

    // 执行任务逻辑
    log.Printf("Worker处理任务 %d: %s", task.ID, task.Command)
    
    // 模拟任务执行
    time.Sleep(100 * time.Millisecond)
    
    // 更新任务状态
    ts.updateTaskStatus(task.ID, "completed")
}

func (ts *TaskScheduler) updateTaskStatus(taskID int, status string) {
    _, err := ts.db.ExecContext(ts.ctx, 
        "UPDATE tasks SET status = $1, completed_at = NOW() WHERE id = $2",
        status, taskID,
    )
    if err != nil {
        log.Printf("更新任务状态失败: %v", err)
    }
}

func (ts *TaskScheduler) Start() {
    // 启动轮询器
    go ts.startPoller(time.Minute)
    
    // 启动工作协程
    ts.wg.Add(ts.workerCount)
    for i := 0; i < ts.workerCount; i++ {
        go ts.startWorker(i)
    }
}

func (ts *TaskScheduler) Stop() {
    ts.cancel()
    close(ts.taskQueue)
    ts.wg.Wait()
}

func main() {
    // 数据库连接
    db, err := sql.Open("postgres", "host=localhost port=5432 user=postgres dbname=tasks sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // 创建调度器
    scheduler := NewTaskScheduler(db, 10)
    scheduler.Start()
    
    // 模拟运行
    time.Sleep(5 * time.Minute)
    scheduler.Stop()
}

对于定时检查,可以使用robfig/cron包:

import "github.com/robfig/cron/v3"

func setupCronScheduler(scheduler *TaskScheduler) {
    c := cron.New()
    
    // 每分钟执行一次任务获取
    c.AddFunc("@every 1m", func() {
        scheduler.fetchPendingTasks()
    })
    
    // 每小时清理已完成的任务
    c.AddFunc("@hourly", func() {
        cleanupOldTasks(scheduler.db)
    })
    
    c.Start()
}

func cleanupOldTasks(db *sql.DB) {
    _, err := db.Exec(`
        DELETE FROM tasks 
        WHERE status = 'completed' 
        AND completed_at < NOW() - INTERVAL '7 days'
    `)
    if err != nil {
        log.Printf("清理任务失败: %v", err)
    }
}

数据库表结构建议:

CREATE TABLE tasks (
    id SERIAL PRIMARY KEY,
    client_id INTEGER NOT NULL,
    command TEXT NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    scheduled_at TIMESTAMP DEFAULT NOW(),
    created_at TIMESTAMP DEFAULT NOW(),
    completed_at TIMESTAMP,
    INDEX idx_status_scheduled (status, scheduled_at),
    INDEX idx_client_id (client_id)
);

这个方案提供了:

  1. 使用FOR UPDATE SKIP LOCKED避免任务重复执行
  2. 工作协程池处理并发任务
  3. 上下文管理实现优雅关闭
  4. 数据库持久化保证任务不丢失
  5. 可配置的轮询间隔和工作协程数量
回到顶部