golang支持数据库的作业调度插件库cdule的使用
Golang支持数据库的作业调度插件库cdule的使用
简介
cdule(发音为Schedule)是一个基于Golang的调度库,支持数据库持久化。用户可以使用任何gorm.io支持的数据库。
安装
go get github.com/deepaksinghvi/cdule
使用说明
要使用cdule调度作业,用户需要:
- 配置持久化
- 实现cdule.Job接口
- 使用所需的cron表达式调度作业
作业将被持久化在jobs表中,下一次执行将被持久化在schedules表中,作业历史将被持久化并维护在job_histories表中。
配置
用户需要在项目主目录中创建resources/config.yml文件,包含以下键:
- cduletype
- dburl
- cduleconsistency
cduletype用于指定是基于内存还是基于数据库的配置,可能的值是DATABASE和MEMORY。 dburl是数据库连接url。 cduleconsistency保留供将来使用。
PostgreSQL配置示例
cduletype: DATABASE
dburl: postgres://cduleuser:cdulepassword@localhost:5432/cdule?sslmode=disable
cduleconsistency: AT_MOST_ONCE
SQLite内存配置示例
cduletype: MEMORY
dburl: /Users/dsinghvi/sqlite.db
cduleconsistency: AT_MOST_ONCE
实现Job接口
var testJobData map[string]string
type TestJob struct {
Job cdule.Job
}
func (m TestJob) Execute(jobData map[string]string) {
log.Info("In TestJob")
for k, v := range jobData {
valNum, err := strconv.Atoi(v)
if nil == err {
jobData[k] = strconv.Itoa(valNum + 1)
} else {
log.Error(err)
}
}
testJobData = jobData
}
func (m TestJob) JobName() string {
return "job.TestJob"
}
func (m TestJob) GetJobData() map[string]string {
return testJobData
}
调度作业
以下代码预期testJob将执行五次,每分钟一次,然后程序退出。TestJob的jobData map以map[string]string格式保存数据,每次执行时存储并在Execute()方法调用时更新为下一个计数器值。
cdule := cdule.Cdule{}
cdule.NewCdule()
testJob := TestJob{}
jobData := make(map[string]string)
jobData["one"] = "1"
jobData["two"] = "2"
jobData["three"] = "3"
cdule.NewJob(&testJob, jobData).Build(utils.EveryMinute)
time.Sleep(5 * time.Minute)
cdule.StopWatcher()
数据库模式
对于示例应用,使用了postgresql,但用户可以使用任何gorm.io支持的数据库。
数据库表
- jobs: 存储唯一作业
- job_histories: 存储带有状态结果的作业历史
- schedules: 存储每次下一次运行的调度
- workers: 存储工作节点及其健康检查
示例Cron表达式
用户可以使用预定义的cron或使用自己的标准cron表达式
EveryMinute = "0 * * ? * *"
EveryEvenMinute = "0 */2 * ? * *"
EveryUnEvenMinute = "0 1/2 * ? * *"
EveryTwoMinutes = "0 */2 * ? * *"
EveryHourAtMin153045 = "0 15,30,45 * ? * *"
EveryHour = "0 0 * ? * *"
EveryEvenHour = "0 0 0/2 ? * *"
EveryUnEvenHour = "0 0 1/2 ? * *"
EveryThreeHours = "0 0 */3 ? * *"
EveryTwelveHours = "0 0 */12 ? * *"
EveryDayAtMidNight = "0 0 0 * * ?"
EveryDayAtOneAM = "0 0 1 * * ?"
EveryDayAtSixAM = "0 0 6 * * ?"
EverySundayAtNoon = "0 0 12 ? * "
EveryMondayAtNoon = "0 0 12 ? *"
EveryWeekDayAtNoon = "0 0 12 ? * MON-FRI"
EveryWeekEndAtNoon = "0 0 12 ? * SUN,SAT"
EveryMonthOnFirstAtNoon = "0 0 12 1 * ?"
EveryMonthOnSecondAtNoon = "0 0 12 2 * ?"
完整示例Demo
以下是一个完整的cdule使用示例:
package main
import (
"log"
"strconv"
"time"
"github.com/deepaksinghvi/cdule"
"github.com/deepaksinghvi/cdule/pkg/utils"
)
var testJobData map[string]string
type TestJob struct {
Job cdule.Job
}
func (m TestJob) Execute(jobData map[string]string) {
log.Println("In TestJob")
for k, v := range jobData {
valNum, err := strconv.Atoi(v)
if nil == err {
jobData[k] = strconv.Itoa(valNum + 1)
} else {
log.Println(err)
}
}
testJobData = jobData
log.Printf("Job data after execution: %v\n", jobData)
}
func (m TestJob) JobName() string {
return "job.TestJob"
}
func (m TestJob) GetJobData() map[string]string {
return testJobData
}
func main() {
// 初始化cdule
cduleInstance := cdule.Cdule{}
cduleInstance.NewCdule()
// 创建测试作业
testJob := TestJob{}
// 初始化作业数据
jobData := make(map[string]string)
jobData["counter"] = "0"
// 创建并调度作业,每分钟执行一次
cduleInstance.NewJob(&testJob, jobData).Build(utils.EveryMinute)
// 等待5分钟
log.Println("Waiting for jobs to execute...")
time.Sleep(5 * time.Minute)
// 停止调度器
cduleInstance.StopWatcher()
log.Println("Scheduler stopped")
}
构建该库使用的技术
- 使用robfig/cron库的Cron解析器
- 使用gorm.io库的ORM
更多关于golang支持数据库的作业调度插件库cdule的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang支持数据库的作业调度插件库cdule的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang 数据库作业调度插件库 cdule 使用指南
cdule 是一个基于 Golang 的轻量级数据库作业调度库,它支持将作业信息存储在数据库中,并提供调度执行功能。下面我将介绍 cdule 的基本使用方法和示例代码。
安装 cdule
首先使用 go get 安装 cdule:
go get github.com/rohanthewiz/cdule
基本使用示例
1. 初始化 cdule
package main
import (
"log"
"github.com/rohanthewiz/cdule"
"github.com/rohanthewiz/cdule/repository"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func main() {
// 初始化数据库连接
dsn := "host=localhost user=postgres password=postgres dbname=cdule port=5432 sslmode=disable"
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
// 初始化 cdule 存储库
repo, err := repository.NewCduleRepository(db)
if err != nil {
log.Fatalf("Failed to initialize cdule repository: %v", err)
}
// 创建 cdule 实例
cduleInstance := cdule.NewCdule(repo)
// 启动 cdule
err = cduleInstance.Run()
if err != nil {
log.Fatalf("Failed to start cdule: %v", err)
}
// 在这里可以添加作业和调度...
}
2. 定义和注册作业
首先定义一个作业处理器:
type MyJob struct {
Name string
}
func (j *MyJob) Execute() error {
log.Printf("Executing job: %s", j.Name)
// 这里实现你的作业逻辑
return nil
}
然后注册作业:
// 注册作业处理器
cduleInstance.RegisterJob("my_job_type", func() (cdule.Job, error) {
return &MyJob{Name: "Example Job"}, nil
})
// 创建作业
job := &model.Job{
Name: "example_job",
JobType: "my_job_type",
Description: "This is an example job",
}
// 创建调度
schedule := &model.Schedule{
JobName: "example_job",
CronSchedule: "*/5 * * * *", // 每5分钟执行一次
}
// 将作业和调度添加到 cdule
err = cduleInstance.AddJob(job)
if err != nil {
log.Printf("Failed to add job: %v", err)
}
err = cduleInstance.AddSchedule(schedule)
if err != nil {
log.Printf("Failed to add schedule: %v", err)
}
3. 完整示例
package main
import (
"log"
"time"
"github.com/rohanthewiz/cdule"
"github.com/rohanthewiz/cdule/model"
"github.com/rohanthewiz/cdule/repository"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// 定义作业处理器
type EmailJob struct {
Recipient string
Subject string
}
func (j *EmailJob) Execute() error {
log.Printf("Sending email to %s with subject: %s", j.Recipient, j.Subject)
// 这里实现发送邮件的逻辑
return nil
}
func main() {
// 初始化数据库连接
dsn := "host=localhost user=postgres password=postgres dbname=cdule port=5432 sslmode=disable"
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
// 自动迁移表结构
err = db.AutoMigrate(&model.Job{}, &model.Schedule{})
if err != nil {
log.Fatalf("Failed to migrate tables: %v", err)
}
// 初始化 cdule 存储库
repo, err := repository.NewCduleRepository(db)
if err != nil {
log.Fatalf("Failed to initialize cdule repository: %v", err)
}
// 创建 cdule 实例
cduleInstance := cdule.NewCdule(repo)
// 注册作业处理器
cduleInstance.RegisterJob("email_job", func() (cdule.Job, error) {
return &EmailJob{
Recipient: "user@example.com",
Subject: "Daily Report",
}, nil
})
// 创建作业
job := &model.Job{
Name: "daily_email_report",
JobType: "email_job",
Description: "Send daily report email",
}
// 创建调度
schedule := &model.Schedule{
JobName: "daily_email_report",
CronSchedule: "0 9 * * *", // 每天上午9点执行
}
// 添加作业和调度
err = cduleInstance.AddJob(job)
if err != nil {
log.Printf("Failed to add job: %v", err)
}
err = cduleInstance.AddSchedule(schedule)
if err != nil {
log.Printf("Failed to add schedule: %v", err)
}
// 启动 cdule
err = cduleInstance.Run()
if err != nil {
log.Fatalf("Failed to start cdule: %v", err)
}
// 保持程序运行
select {}
}
主要功能
- 基于数据库的作业存储:所有作业和调度信息都存储在数据库中
- Cron 表达式支持:使用标准的 cron 表达式定义调度
- 自定义作业类型:可以注册任意类型的作业处理器
- 并发安全:支持并发作业执行
注意事项
- 确保数据库连接正常
- 作业处理器需要实现
cdule.Job
接口 - 调度使用标准 cron 表达式格式
- 生产环境中需要考虑错误处理和作业重试机制
cdule 是一个轻量级的解决方案,适合需要将调度信息持久化到数据库的简单场景。对于更复杂的企业级调度需求,可以考虑使用更成熟的解决方案如 Apache Airflow 等。