golang高性能轻量级任务调度插件库kala的使用

Golang高性能轻量级任务调度插件库Kala的使用

Kala是一个用Go编写的简单、现代且高性能的任务调度器。它具有以下特点:

  • 单一二进制文件
  • 无依赖
  • 基于HTTP的JSON API
  • 任务统计信息
  • 可配置的重试机制
  • 使用ISO 8601日期和间隔符号进行调度
  • 依赖任务
  • 支持多种数据库驱动持久化
  • Web UI界面

安装Kala

使用Go Modules安装Kala:

go get github.com/ajvb/kala

运行Kala服务:

kala serve

快速开始

启动Kala服务器(默认端口8000):

$ kala serve
INFO[0000] Preparing cache
INFO[0000] Starting server on port :8000

数据库配置

Kala默认使用BoltDB,但也支持Redis、Consul、MongoDB、Postgres和MySQL/MariaDB:

# BoltDB
kala serve --jobdb=boltdb --boltpath=/path/to/dir

# Redis
kala serve --jobdb=redis --jobdb-address=127.0.0.1:6379 --jobdb-password=password

# Consul
kala serve --jobdb=consul --jobdb-address=127.0.0.1:8500

# MongoDB
kala serve --jobdb=mongo --jobdb-address=server1.example.com,server2.example.com --jobdb-username=admin --jobdb-password=password

# Postgres
kala serve --jobdb=postgres --jobdb-address=server1.example.com/kala --jobdb-username=admin --jobdb-password=password

# MySQL/MariaDB
kala serve --jobdb=mariadb --jobdb-address=(server1.example.com)/kala --jobdb-username=admin --jobdb-password=password

使用示例

创建任务

$ curl http://127.0.0.1:8000/api/v1/job/ -d '{
    "epsilon": "PT5S", 
    "command": "bash /path/to/command.sh", 
    "name": "test_job", 
    "schedule": "R2/2023-01-01T00:00:00Z/PT10S"
}'

获取所有任务

$ curl http://127.0.0.1:8000/api/v1/job/

获取特定任务

$ curl http://127.0.0.1:8000/api/v1/job/{job_id}/

删除任务

$ curl http://127.0.0.1:8000/api/v1/job/{job_id}/ -X DELETE

手动启动任务

$ curl http://127.0.0.1:8000/api/v1/job/start/{job_id}/ -X POST

Go客户端示例

package main

import (
	"fmt"
	"time"
	
	"github.com/ajvb/kala/client"
	"github.com/ajvb/kala/job"
)

func main() {
	// 创建Kala客户端
	c := client.New("http://127.0.0.1:8000")
	
	// 创建新任务
	j := job.NewJob()
	j.Name = "test_job"
	j.Command = "echo 'Hello Kala'"
	j.Schedule = "R2/" + time.Now().Add(time.Minute).Format(time.RFC3339) + "/PT10S"
	
	// 添加任务到Kala
	id, err := c.CreateJob(j)
	if err != nil {
		fmt.Println("Error creating job:", err)
		return
	}
	
	fmt.Println("Job created with ID:", id)
	
	// 获取任务列表
	jobs, err := c.ListJobs()
	if err != nil {
		fmt.Println("Error listing jobs:", err)
		return
	}
	
	fmt.Println("Current jobs:")
	for id, job := range jobs {
		fmt.Printf("%s: %s\n", id, job.Name)
	}
}

任务JSON结构示例

{
    "name": "test_job",
    "id": "93b65499-b211-49ce-57e0-19e735cc5abd",
    "command": "bash /path/to/command.sh",
    "owner": "",
    "disabled": false,
    "dependent_jobs": null,
    "parent_jobs": null,
    "schedule": "R2/2023-01-01T00:00:00Z/PT10S",
    "retries": 0,
    "epsilon": "PT5S",
    "success_count": 0,
    "last_success": "0001-01-01T00:00:00Z",
    "error_count": 0,
    "last_error": "0001-01-01T00:00:00Z",
    "last_attempted_run": "0001-01-01T00:00:00Z",
    "next_run_at": "2023-01-01T00:00:00Z"
}

调度字符串格式(ISO 8601)

调度字符串由三部分组成:重复次数/开始时间/运行间隔

示例:R2/2023-01-01T00:00:00Z/PT10S

重复次数

  • R - 无限重复
  • R1 - 重复1次
  • R231 - 重复231次

开始时间

使用ISO 8601格式,例如:

  • 2023-06-04T19:25:16
  • 2023-06-04T19:25:16.828696
  • 2023-06-04T19:25:16.828696-07:00

运行间隔

使用ISO 8601间隔符号:

  • PT1H - 1小时
  • P1DT1M - 1天加1分钟
  • P1W - 1周

部署方式

Supervisord配置

[program:kala]
command=kala serve
autorestart=true
stdout_logfile=/var/log/kala.stdout.log
stderr_logfile=/var/log/kala.stderr.log

Docker部署

docker build -t kala .
docker run -it -d -p 8000:8000 kala

API路由概览

任务 方法 路由
创建任务 POST /api/v1/job/
获取所有任务 GET /api/v1/job/
获取特定任务 GET /api/v1/job/{id}/
删除任务 DELETE /api/v1/job/{id}/
删除所有任务 DELETE /api/v1/job/all/
获取任务统计 GET /api/v1/job/stats/{id}/
手动启动任务 POST /api/v1/job/start/{id}/
禁用任务 POST /api/v1/job/disable/{id}/
启用任务 POST /api/v1/job/enable/{id}/
获取应用统计 GET /api/v1/stats/

调试任务

Kala提供了run命令来测试任务命令:

$ kala run "ruby /path/to/script.rb"
Command Succeeded!

如果命令失败,会显示错误信息:

$ kala run "ruby /path/to/broken_script.rb"
FATA[0000] Command Failed with err: exit status 1

依赖任务

Kala支持任务依赖关系,子任务会在父任务完成后运行。主要规则包括:

  • 遵循先进先出原则
  • 子任务必须等待父任务完成
  • 如果父任务不运行,子任务也不会运行
  • 如果子任务被禁用,父任务仍会运行
  • 如果子任务被删除,父任务仍会保留
  • 如果父任务被删除,没有其他父任务的子任务也会被删除

更多关于golang高性能轻量级任务调度插件库kala的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能轻量级任务调度插件库kala的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Kala - Golang高性能轻量级任务调度库

Kala是一个用Go编写的轻量级任务调度库,它提供了简单易用的API来执行定时任务。下面我将详细介绍Kala的使用方法和示例代码。

安装Kala

go get github.com/ajvb/kala

基本使用

1. 创建调度器

package main

import (
	"fmt"
	"time"

	"github.com/ajvb/kala/job"
	"github.com/ajvb/kala/utils/iso8601"
)

func main() {
	// 创建调度器
	scheduler := job.NewScheduler()
	
	// 启动调度器
	scheduler.Start()
	defer scheduler.Stop()
	
	// 创建任务
	job := &job.Job{
		Name:     "test_job",
		Command:  "echo 'Hello, Kala!'",
		Schedule: iso8601.MustParse("R/2015-06-04T19:25:16.828696-07:00/PT10S"),
	}
	
	// 添加任务到调度器
	scheduler.AddJob(job)
	
	// 等待一段时间查看任务执行
	time.Sleep(30 * time.Second)
}

2. 使用自定义处理函数

package main

import (
	"fmt"
	"time"

	"github.com/ajvb/kala/job"
	"github.com/ajvb/kala/utils/iso8601"
)

func myTask() {
	fmt.Println("Custom task executed at:", time.Now().Format(time.RFC3339))
}

func main() {
	scheduler := job.NewScheduler()
	scheduler.Start()
	defer scheduler.Stop()

	// 创建自定义任务
	customJob := &job.Job{
		Name: "custom_function_job",
		Schedule: iso8601.MustParse("PT5S"), // 每5秒执行一次
		JobFunction: func() {
			myTask()
		},
	}

	scheduler.AddJob(customJob)

	// 等待一段时间查看任务执行
	time.Sleep(20 * time.Second)
}

高级功能

1. 一次性任务

oneTimeJob := &job.Job{
	Name:    "one_time_job",
	Command: "echo 'This will run once'",
	Schedule: &iso8601.OneTimeSchedule{
		RunAt: time.Now().Add(10 * time.Second),
	},
}
scheduler.AddJob(oneTimeJob)

2. 带重试机制的任务

retryJob := &job.Job{
	Name:     "retry_job",
	Command:  "some_command_that_might_fail",
	Schedule: iso8601.MustParse("PT1M"), // 每分钟执行一次
	Retries:  3,                         // 重试3次
}
scheduler.AddJob(retryJob)

3. 获取任务状态

// 获取所有任务
jobs := scheduler.GetAllJobs()
for _, j := range jobs {
	fmt.Printf("Job: %s, NextRun: %v\n", j.Name, j.NextRunAt)
}

// 获取单个任务
job, exists := scheduler.GetJob("test_job")
if exists {
	fmt.Println("Job found:", job.Name)
}

4. 删除任务

scheduler.DeleteJob("job_to_delete")

性能优化技巧

  1. 批量添加任务:当需要添加大量任务时,可以使用AddJobs方法批量添加
jobs := []*job.Job{
	{Name: "job1", Command: "cmd1", Schedule: iso8601.MustParse("PT10S")},
	{Name: "job2", Command: "cmd2", Schedule: iso8601.MustParse("PT20S")},
}
scheduler.AddJobs(jobs)
  1. 使用内存存储:Kala默认使用内存存储,对于不需要持久化的场景,这是最高效的选择

  2. 合理设置调度间隔:避免设置过于频繁的调度间隔,减少系统开销

实际应用示例

下面是一个完整的Web服务示例,通过HTTP API管理任务:

package main

import (
	"encoding/json"
	"fmt"
	"net/http"
	"time"

	"github.com/ajvb/kala/job"
	"github.com/ajvb/kala/utils/iso8601"
)

var scheduler = job.NewScheduler()

func main() {
	scheduler.Start()
	defer scheduler.Stop()

	http.HandleFunc("/jobs", listJobs)
	http.HandleFunc("/jobs/add", addJob)
	http.HandleFunc("/jobs/delete", deleteJob)

	fmt.Println("Server started at :8080")
	http.ListenAndServe(":8080", nil)
}

func listJobs(w http.ResponseWriter, r *http.Request) {
	jobs := scheduler.GetAllJobs()
	json.NewEncoder(w).Encode(jobs)
}

func addJob(w http.ResponseWriter, r *http.Request) {
	var newJob struct {
		Name     string `json:"name"`
		Command  string `json:"command"`
		Schedule string `json:"schedule"`
	}
	
	if err := json.NewDecoder(r.Body).Decode(&newJob); err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	job := &job.Job{
		Name:    newJob.Name,
		Command: newJob.Command,
		Schedule: iso8601.MustParse(newJob.Schedule),
	}

	scheduler.AddJob(job)
	w.WriteHeader(http.StatusCreated)
}

func deleteJob(w http.ResponseWriter, r *http.Request) {
	name := r.URL.Query().Get("name")
	if name == "" {
		http.Error(w, "name parameter is required", http.StatusBadRequest)
		return
	}

	scheduler.DeleteJob(name)
	w.WriteHeader(http.StatusOK)
}

总结

Kala是一个简单而强大的Go任务调度库,具有以下特点:

  • 轻量级,无外部依赖
  • 支持ISO8601时间格式
  • 支持一次性任务和周期性任务
  • 提供任务重试机制
  • 易于集成到现有应用中

通过合理使用Kala,你可以轻松地为你的Go应用添加可靠的任务调度功能。

回到顶部