golang实现PostgreSQL高级任务调度插件pg_timetable的使用

Golang实现PostgreSQL高级任务调度插件pg_timetable的使用

简介

pg_timetable是一个高级的独立作业调度器,专为PostgreSQL设计,相比传统调度器(如cron)具有许多优势。它完全基于数据库驱动,提供多种高级功能,可以调度PostgreSQL命令、系统程序和内置操作。

主要特性

  • 任务可以排列成链式结构
  • 每个任务可以执行SQL、内置命令或可执行程序
  • 可以向任务传递参数
  • 自动重试因停机而错过的任务链
  • 支持可配置的重复执行
  • 内置任务如发送邮件、下载、导入文件等
  • 完全基于数据库驱动的配置
  • 完整的数据库驱动日志记录
  • 增强的cron风格调度
  • 可选的并发保护

安装方式

  • 官方发布包
  • Docker镜像
  • 从源代码构建

快速开始

1. 准备工作

确保PostgreSQL服务器已启动并运行,并为目标数据库创建具有CREATE权限的角色:

my_database=> CREATE ROLE scheduler PASSWORD 'somestrong';
my_database=> GRANT CREATE ON DATABASE my_database TO scheduler;

2. 创建作业示例

-- 每天00:30运行VACUUM
SELECT timetable.add_job('frequent-vacuum', '30 0 * * *', 'VACUUM');

-- 8月每天00:05运行public.my_func()
SELECT timetable.add_job('execute-func', '5 0 * 8 *', 'SELECT public.my_func()');

-- 每2小时刷新物化视图
SELECT timetable.add_job('refresh-matview', '@every 2 hours', 
  'REFRESH MATERIALIZED VIEW public.mat_view');

-- pg_timetable重启后清空日志表
SELECT timetable.add_job('clear-log', '@reboot', 'TRUNCATE public.log');

-- 每周日午夜使用reindexdb工具重建索引
SELECT timetable.add_job('reindex-job', '0 0 * * 7', 'reindexdb', job_kind := 'PROGRAM');

3. 运行pg_timetable

pg_timetable postgresql://scheduler:somestrong@localhost/my_database --clientname=vacuumer

Golang集成示例

以下是一个完整的Golang示例,展示如何通过Go程序与pg_timetable交互:

package main

import (
	"database/sql"
	"fmt"
	"log"
	
	_ "github.com/lib/pq"
)

func main() {
	// 连接到PostgreSQL数据库
	connStr := "user=scheduler password=somestrong dbname=my_database host=localhost sslmode=disable"
	db, err := sql.Open("postgres", connStr)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 创建每日VACUUM作业
	createVacuumJob(db)
	
	// 创建每周重建索引作业
	createReindexJob(db)
	
	// 查询当前作业列表
	listJobs(db)
}

func createVacuumJob(db *sql.DB) {
	_, err := db.Exec(`
		SELECT timetable.add_job('daily-vacuum', '30 0 * * *', 'VACUUM')
	`)
	if err != nil {
		log.Printf("创建VACUUM作业失败: %v", err)
	} else {
		fmt.Println("成功创建每日VACUUM作业")
	}
}

func createReindexJob(db *sql.DB) {
	_, err := db.Exec(`
		SELECT timetable.add_job('weekly-reindex', '0 0 * * 7', 'reindexdb', 
			'["--verbose"]'::jsonb, 'PROGRAM')
	`)
	if err != nil {
		log.Printf("创建重建索引作业失败: %v", err)
	} else {
		fmt.Println("成功创建每周重建索引作业")
	}
}

func listJobs(db *sql.DB) {
	rows, err := db.Query(`
		SELECT job_id, job_name, job_schedule, job_command 
		FROM timetable.job 
		ORDER BY job_id
	`)
	if err != nil {
		log.Printf("查询作业列表失败: %v", err)
		return
	}
	defer rows.Close()

	fmt.Println("\n当前作业列表:")
	fmt.Println("ID | 名称 | 调度时间 | 命令")
	for rows.Next() {
		var id int
		var name, schedule, command string
		err = rows.Scan(&id, &name, &schedule, &command)
		if err != nil {
			log.Printf("扫描作业行失败: %v", err)
			continue
		}
		fmt.Printf("%d | %s | %s | %s\n", id, name, schedule, command)
	}
}

支持的PostgreSQL版本和环境

pg_timetable支持PostgreSQL 10及以上版本,可在多种云服务和操作系统上运行:

云服务 支持 PostgreSQL版本 支持 操作系统 支持
Alibaba Cloud 18 (devel) Linux
Amazon RDS 17 (current) Darwin
Amazon Aurora 16 Windows
Azure 15 FreeBSD*
Citus Cloud 14 NetBSD*
Crunchy Bridge 13 OpenBSD*
DigitalOcean 12 Solaris*
Google Cloud 11
Heroku 10
Supabase
Tembo

*表示需要从源代码构建

总结

pg_timetable为PostgreSQL提供了强大的任务调度功能,通过Golang可以方便地与它交互。上面的示例展示了如何创建和管理调度作业,你可以根据需要扩展这些基础功能。


更多关于golang实现PostgreSQL高级任务调度插件pg_timetable的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现PostgreSQL高级任务调度插件pg_timetable的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Golang实现PostgreSQL高级任务调度插件pg_timetable

PostgreSQL的pg_timetable是一个功能强大的任务调度扩展,允许你在数据库中直接设置和管理定时任务。下面我将介绍如何在Golang应用中使用pg_timetable。

pg_timetable简介

pg_timetable提供了以下功能:

  • 基于cron语法的任务调度
  • 任务链(一个任务完成后触发另一个任务)
  • 内置任务类型(SQL执行、发送邮件等)
  • 自定义任务支持

安装pg_timetable扩展

首先需要在PostgreSQL中安装扩展:

CREATE EXTENSION pg_timetable;

Golang集成方式

1. 直接通过SQL接口调用

最简单的方式是通过Golang的数据库驱动直接执行SQL命令来管理pg_timetable。

package main

import (
	"database/sql"
	"fmt"
	"log"
	_ "github.com/lib/pq"
)

func main() {
	// 连接数据库
	db, err := sql.Open("postgres", "user=postgres dbname=postgres sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 添加一个每分钟执行的任务
	_, err = db.Exec(`
		INSERT INTO timetable.task (name, task_sql)
		VALUES ('test_task', 'SELECT NOW()');
		
		INSERT INTO timetable.chain (
			chain_name, 
			run_at, 
			max_instances, 
			live
		) VALUES (
			'test_chain', 
			'* * * * *', 
			1, 
			TRUE
		);
		
		INSERT INTO timetable.chain_execution_config (
			chain_id, 
			task_id, 
			order_id
		) VALUES (
			currval('timetable.chain_chain_id_seq'), 
			currval('timetable.task_task_id_seq'), 
			1
		);
	`)
	if err != nil {
		log.Fatal("Failed to create scheduled task: ", err)
	}

	fmt.Println("Scheduled task created successfully")
}

2. 使用Go封装pg_timetable操作

我们可以创建一个更友好的Go封装:

package pg_timetable

import (
	"database/sql"
	"fmt"
	"time"
)

type Scheduler struct {
	db *sql.DB
}

func NewScheduler(connStr string) (*Scheduler, error) {
	db, err := sql.Open("postgres", connStr)
	if err != nil {
		return nil, err
	}
	return &Scheduler{db: db}, nil
}

func (s *Scheduler) Close() error {
	return s.db.Close()
}

func (s *Scheduler) AddSQLJob(name, cronExpr, sql string) error {
	tx, err := s.db.Begin()
	if err != nil {
		return err
	}
	defer tx.Rollback()

	// 插入任务
	var taskID int
	err = tx.QueryRow(`
		INSERT INTO timetable.task (name, task_sql)
		VALUES ($1, $2)
		RETURNING task_id`, name, sql).Scan(&taskID)
	if err != nil {
		return fmt.Errorf("insert task failed: %v", err)
	}

	// 插入任务链
	var chainID int
	err = tx.QueryRow(`
		INSERT INTO timetable.chain (
			chain_name, run_at, max_instances, live
		) VALUES (
			$1, $2, 1, TRUE
		)
		RETURNING chain_id`, name, cronExpr).Scan(&chainID)
	if err != nil {
		return fmt.Errorf("insert chain failed: %v", err)
	}

	// 关联任务和链
	_, err = tx.Exec(`
		INSERT INTO timetable.chain_execution_config (
			chain_id, task_id, order_id
		) VALUES ($1, $2, 1)`, chainID, taskID)
	if err != nil {
		return fmt.Errorf("link task to chain failed: %v", err)
	}

	return tx.Commit()
}

// 使用示例
func main() {
	scheduler, err := NewScheduler("user=postgres dbname=postgres sslmode=disable")
	if err != nil {
		log.Fatal(err)
	}
	defer scheduler.Close()

	// 添加一个每5分钟执行的任务
	err = scheduler.AddSQLJob(
		"log_time",
		"*/5 * * * *",
		"INSERT INTO log_table (timestamp) VALUES (NOW())",
	)
	if err != nil {
		log.Fatal(err)
	}
}

3. 监控任务执行

func (s *Scheduler) GetJobStatus(name string) (lastRun time.Time, nextRun time.Time, err error) {
	query := `
		SELECT 
			MAX(execution_started), 
			MIN(run_at) 
		FROM timetable.chain_execution_stats ces
		JOIN timetable.chain c ON ces.chain_id = c.chain_id
		WHERE c.chain_name = $1
		GROUP BY c.chain_id`
	
	err = s.db.QueryRow(query, name).Scan(&lastRun, &nextRun)
	if err == sql.ErrNoRows {
		return time.Time{}, time.Time{}, fmt.Errorf("job not found")
	}
	return lastRun, nextRun, err
}

高级功能

1. 任务链

func (s *Scheduler) AddJobChain(name, cronExpr string, tasks []string) error {
	tx, err := s.db.Begin()
	if err != nil {
		return err
	}
	defer tx.Rollback()

	// 插入任务链
	var chainID int
	err = tx.QueryRow(`
		INSERT INTO timetable.chain (
			chain_name, run_at, max_instances, live
		) VALUES ($1, $2, 1, TRUE)
		RETURNING chain_id`, name, cronExpr).Scan(&chainID)
	if err != nil {
		return err
	}

	// 插入每个任务并关联到链
	for i, taskSQL := range tasks {
		var taskID int
		err = tx.QueryRow(`
			INSERT INTO timetable.task (name, task_sql)
			VALUES ($1, $2)
			RETURNING task_id`, fmt.Sprintf("%s_step_%d", name, i+1), taskSQL).Scan(&taskID)
		if err != nil {
			return err
		}

		_, err = tx.Exec(`
			INSERT INTO timetable.chain_execution_config (
				chain_id, task_id, order_id
			) VALUES ($1, $2, $3)`, chainID, taskID, i+1)
		if err != nil {
			return err
		}
	}

	return tx.Commit()
}

2. 使用自定义任务

pg_timetable支持通过外部程序执行任务:

// 在数据库中注册外部程序任务
func (s *Scheduler) AddExternalJob(name, cronExpr, program string, args []string) error {
	_, err := s.db.Exec(`
		SELECT timetable.add_job(
			job_name => $1,
			job_schedule => $2,
			job_command => $3,
			job_args => $4
		)`, name, cronExpr, program, args)
	return err
}

注意事项

  1. 确保pg_timetable worker正在运行:

    SELECT timetable.start_worker();
    
  2. 对于生产环境,考虑设置多个worker以提高可靠性

  3. pg_timetable提供了详细的日志记录,可以通过查询timetable.execution_log来监控任务执行情况

通过以上方法,你可以在Golang应用中方便地集成PostgreSQL的高级任务调度功能,实现定时任务、任务链等复杂调度需求。

回到顶部