golang支持数据库的作业调度插件库cdule的使用

Golang支持数据库的作业调度插件库cdule的使用

简介

cdule(发音为Schedule)是一个基于Golang的调度库,支持数据库持久化。用户可以使用任何gorm.io支持的数据库。

cdule logo

安装

go get github.com/deepaksinghvi/cdule

使用说明

要使用cdule调度作业,用户需要:

  1. 配置持久化
  2. 实现cdule.Job接口
  3. 使用所需的cron表达式调度作业

作业将被持久化在jobs表中,下一次执行将被持久化在schedules表中,作业历史将被持久化并维护在job_histories表中。

配置

用户需要在项目主目录中创建resources/config.yml文件,包含以下键:

  • cduletype
  • dburl
  • cduleconsistency

cduletype用于指定是基于内存还是基于数据库的配置,可能的值是DATABASE和MEMORY。 dburl是数据库连接url。 cduleconsistency保留供将来使用。

PostgreSQL配置示例

cduletype: DATABASE
dburl: postgres://cduleuser:cdulepassword@localhost:5432/cdule?sslmode=disable
cduleconsistency: AT_MOST_ONCE

SQLite内存配置示例

cduletype: MEMORY
dburl: /Users/dsinghvi/sqlite.db
cduleconsistency: AT_MOST_ONCE

实现Job接口

var testJobData map[string]string

type TestJob struct {
	Job cdule.Job
}

func (m TestJob) Execute(jobData map[string]string) {
	log.Info("In TestJob")
	for k, v := range jobData {
		valNum, err := strconv.Atoi(v)
		if nil == err {
			jobData[k] = strconv.Itoa(valNum + 1)
		} else {
			log.Error(err)
		}
	}
	testJobData = jobData
}

func (m TestJob) JobName() string {
	return "job.TestJob"
}

func (m TestJob) GetJobData() map[string]string {
	return testJobData
}

调度作业

以下代码预期testJob将执行五次,每分钟一次,然后程序退出。TestJob的jobData map以map[string]string格式保存数据,每次执行时存储并在Execute()方法调用时更新为下一个计数器值。

cdule := cdule.Cdule{}
cdule.NewCdule()
testJob := TestJob{}
jobData := make(map[string]string)
jobData["one"] = "1"
jobData["two"] = "2"
jobData["three"] = "3"
cdule.NewJob(&testJob, jobData).Build(utils.EveryMinute)

time.Sleep(5 * time.Minute)
cdule.StopWatcher()

数据库模式

对于示例应用,使用了postgresql,但用户可以使用任何gorm.io支持的数据库。

数据库表

  • jobs: 存储唯一作业
  • job_histories: 存储带有状态结果的作业历史
  • schedules: 存储每次下一次运行的调度
  • workers: 存储工作节点及其健康检查

数据库模式

示例Cron表达式

用户可以使用预定义的cron或使用自己的标准cron表达式

EveryMinute              = "0 * * ? * *"
EveryEvenMinute          = "0 */2 * ? * *"
EveryUnEvenMinute        = "0 1/2 * ? * *"
EveryTwoMinutes          = "0 */2 * ? * *"
EveryHourAtMin153045     = "0 15,30,45 * ? * *"
EveryHour                = "0 0 * ? * *"
EveryEvenHour            = "0 0 0/2 ? * *"
EveryUnEvenHour          = "0 0 1/2 ? * *"
EveryThreeHours          = "0 0 */3 ? * *"
EveryTwelveHours         = "0 0 */12 ? * *"
EveryDayAtMidNight       = "0 0 0 * * ?"
EveryDayAtOneAM          = "0 0 1 * * ?"
EveryDayAtSixAM          = "0 0 6 * * ?"
EverySundayAtNoon        = "0 0 12 ? * "
EveryMondayAtNoon        = "0 0 12 ? *"
EveryWeekDayAtNoon       = "0 0 12 ? * MON-FRI"
EveryWeekEndAtNoon       = "0 0 12 ? * SUN,SAT"
EveryMonthOnFirstAtNoon  = "0 0 12 1 * ?"
EveryMonthOnSecondAtNoon = "0 0 12 2 * ?"

完整示例Demo

以下是一个完整的cdule使用示例:

package main

import (
	"log"
	"strconv"
	"time"

	"github.com/deepaksinghvi/cdule"
	"github.com/deepaksinghvi/cdule/pkg/utils"
)

var testJobData map[string]string

type TestJob struct {
	Job cdule.Job
}

func (m TestJob) Execute(jobData map[string]string) {
	log.Println("In TestJob")
	for k, v := range jobData {
		valNum, err := strconv.Atoi(v)
		if nil == err {
			jobData[k] = strconv.Itoa(valNum + 1)
		} else {
			log.Println(err)
		}
	}
	testJobData = jobData
	log.Printf("Job data after execution: %v\n", jobData)
}

func (m TestJob) JobName() string {
	return "job.TestJob"
}

func (m TestJob) GetJobData() map[string]string {
	return testJobData
}

func main() {
	// 初始化cdule
	cduleInstance := cdule.Cdule{}
	cduleInstance.NewCdule()

	// 创建测试作业
	testJob := TestJob{}
	
	// 初始化作业数据
	jobData := make(map[string]string)
	jobData["counter"] = "0"
	
	// 创建并调度作业,每分钟执行一次
	cduleInstance.NewJob(&testJob, jobData).Build(utils.EveryMinute)

	// 等待5分钟
	log.Println("Waiting for jobs to execute...")
	time.Sleep(5 * time.Minute)
	
	// 停止调度器
	cduleInstance.StopWatcher()
	log.Println("Scheduler stopped")
}

构建该库使用的技术

  • 使用robfig/cron库的Cron解析器
  • 使用gorm.io库的ORM

更多关于golang支持数据库的作业调度插件库cdule的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang支持数据库的作业调度插件库cdule的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 数据库作业调度插件库 cdule 使用指南

cdule 是一个基于 Golang 的轻量级数据库作业调度库,它支持将作业信息存储在数据库中,并提供调度执行功能。下面我将介绍 cdule 的基本使用方法和示例代码。

安装 cdule

首先使用 go get 安装 cdule:

go get github.com/rohanthewiz/cdule

基本使用示例

1. 初始化 cdule

package main

import (
	"log"
	"github.com/rohanthewiz/cdule"
	"github.com/rohanthewiz/cdule/repository"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

func main() {
	// 初始化数据库连接
	dsn := "host=localhost user=postgres password=postgres dbname=cdule port=5432 sslmode=disable"
	db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}

	// 初始化 cdule 存储库
	repo, err := repository.NewCduleRepository(db)
	if err != nil {
		log.Fatalf("Failed to initialize cdule repository: %v", err)
	}

	// 创建 cdule 实例
	cduleInstance := cdule.NewCdule(repo)
	
	// 启动 cdule
	err = cduleInstance.Run()
	if err != nil {
		log.Fatalf("Failed to start cdule: %v", err)
	}
	
	// 在这里可以添加作业和调度...
}

2. 定义和注册作业

首先定义一个作业处理器:

type MyJob struct {
	Name string
}

func (j *MyJob) Execute() error {
	log.Printf("Executing job: %s", j.Name)
	// 这里实现你的作业逻辑
	return nil
}

然后注册作业:

// 注册作业处理器
cduleInstance.RegisterJob("my_job_type", func() (cdule.Job, error) {
	return &MyJob{Name: "Example Job"}, nil
})

// 创建作业
job := &model.Job{
	Name:        "example_job",
	JobType:     "my_job_type",
	Description: "This is an example job",
}

// 创建调度
schedule := &model.Schedule{
	JobName:      "example_job",
	CronSchedule: "*/5 * * * *", // 每5分钟执行一次
}

// 将作业和调度添加到 cdule
err = cduleInstance.AddJob(job)
if err != nil {
	log.Printf("Failed to add job: %v", err)
}

err = cduleInstance.AddSchedule(schedule)
if err != nil {
	log.Printf("Failed to add schedule: %v", err)
}

3. 完整示例

package main

import (
	"log"
	"time"
	
	"github.com/rohanthewiz/cdule"
	"github.com/rohanthewiz/cdule/model"
	"github.com/rohanthewiz/cdule/repository"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

// 定义作业处理器
type EmailJob struct {
	Recipient string
	Subject   string
}

func (j *EmailJob) Execute() error {
	log.Printf("Sending email to %s with subject: %s", j.Recipient, j.Subject)
	// 这里实现发送邮件的逻辑
	return nil
}

func main() {
	// 初始化数据库连接
	dsn := "host=localhost user=postgres password=postgres dbname=cdule port=5432 sslmode=disable"
	db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
	if err != nil {
		log.Fatalf("Failed to connect to database: %v", err)
	}

	// 自动迁移表结构
	err = db.AutoMigrate(&model.Job{}, &model.Schedule{})
	if err != nil {
		log.Fatalf("Failed to migrate tables: %v", err)
	}

	// 初始化 cdule 存储库
	repo, err := repository.NewCduleRepository(db)
	if err != nil {
		log.Fatalf("Failed to initialize cdule repository: %v", err)
	}

	// 创建 cdule 实例
	cduleInstance := cdule.NewCdule(repo)
	
	// 注册作业处理器
	cduleInstance.RegisterJob("email_job", func() (cdule.Job, error) {
		return &EmailJob{
			Recipient: "user@example.com",
			Subject:   "Daily Report",
		}, nil
	})

	// 创建作业
	job := &model.Job{
		Name:        "daily_email_report",
		JobType:     "email_job",
		Description: "Send daily report email",
	}

	// 创建调度
	schedule := &model.Schedule{
		JobName:      "daily_email_report",
		CronSchedule: "0 9 * * *", // 每天上午9点执行
	}

	// 添加作业和调度
	err = cduleInstance.AddJob(job)
	if err != nil {
		log.Printf("Failed to add job: %v", err)
	}

	err = cduleInstance.AddSchedule(schedule)
	if err != nil {
		log.Printf("Failed to add schedule: %v", err)
	}

	// 启动 cdule
	err = cduleInstance.Run()
	if err != nil {
		log.Fatalf("Failed to start cdule: %v", err)
	}

	// 保持程序运行
	select {}
}

主要功能

  1. 基于数据库的作业存储:所有作业和调度信息都存储在数据库中
  2. Cron 表达式支持:使用标准的 cron 表达式定义调度
  3. 自定义作业类型:可以注册任意类型的作业处理器
  4. 并发安全:支持并发作业执行

注意事项

  1. 确保数据库连接正常
  2. 作业处理器需要实现 cdule.Job 接口
  3. 调度使用标准 cron 表达式格式
  4. 生产环境中需要考虑错误处理和作业重试机制

cdule 是一个轻量级的解决方案,适合需要将调度信息持久化到数据库的简单场景。对于更复杂的企业级调度需求,可以考虑使用更成熟的解决方案如 Apache Airflow 等。

回到顶部