golang基于DRMAA标准的集群调度作业提交插件库drmaa的使用

Golang基于DRMAA标准的集群调度作业提交插件库drmaa的使用

简介

go-drmaa是一个兼容DRMAA标准的Golang作业提交库。该库是对许多分布式资源管理器(集群调度器)提供的DRMAA C库实现的封装。

该库完全支持Open Cluster Scheduler和Gridware Cluster Scheduler。最初是使用Univa Grid Engine的libdrmaa.so开发的,并已在不同版本的Grid Engine、Torque和SLURM上进行了测试。

安装

首先下载包:

export GOPATH=${GOPATH:-~/src/go}
mkdir -p $GOPATH
go get -d github.com/dgruber/drmaa
cd $GOPATH/github.com/dgruber/drmaa

不同调度器的编译方式

Univa Grid Engine和原始SGE:

source /path/to/grid/engine/installation/default/settings.sh
./build.sh
cd examples/simplesubmit
go build
export LD_LIBRARY_PATH=$SGE_ROOT/lib/lx-amd64
./simplesubmit

Son of Grid Engine:

source /path/to/grid/engine/installation/default/settings.sh
./build.sh --sog
cd examples/simplesubmit
go build
export LD_LIBRARY_PATH=$SGE_ROOT/lib/lx-amd64
./simplesubmit

Torque:

./build.sh --torque
cd examples/simplesubmit
go build
./simplesubmit

SLURM:

./build.sh --slurm /usr/local

使用示例

基本使用

package main

import (
	"fmt"
	"github.com/dgruber/drmaa"
)

func main() {
	// 创建DRMAA会话
	s, err := drmaa.MakeSession()
	if err != nil {
		fmt.Printf("Error creating session: %s\n", err)
		return
	}
	// 确保会话结束时关闭
	defer s.Exit()

	// 分配作业模板
	jt, errJT := s.AllocateJobTemplate()
	if errJT != nil {
		fmt.Printf("Error during allocating a new job template: %s\n", errJT)
		return
	}
	// 防止内存泄漏,释放分配的C作业模板
	defer s.DeleteJobTemplate(&jt)

	// 设置要提交的应用程序
	jt.SetRemoteCommand("sleep")
	// 设置参数(有多个参数时使用SetArgs())
	jt.SetArg("1")

	// 提交作业
	jobID, errSubmit := s.RunJob(&jt)
	if errSubmit != nil {
		fmt.Printf("Error submitting job: %s\n", errSubmit)
		return
	}
	fmt.Printf("Job submitted with ID: %s\n", jobID)

	// 等待作业完成并获取作业信息
	jinfo, errWait := s.Wait(jobID, drmaa.TimeoutWaitForever)
	if errWait != nil {
		fmt.Printf("Error waiting for job: %s\n", errWait)
		return
	}
	fmt.Printf("Job finished with exit status: %d\n", jinfo.ExitStatus)
}

批量作业提交

// 提交1000个相同作业的实例
jobIDs, errBulkSubmit := s.RunBulkJobs(&jt, 1, 1000, 1)
if errBulkSubmit != nil {
	fmt.Printf("Error submitting bulk jobs: %s\n", errBulkSubmit)
	return
}
fmt.Printf("Submitted bulk jobs with IDs: %v\n", jobIDs)

作业控制

// 终止作业
errTerm := s.TerminateJob(jobID)
if errTerm != nil {
	fmt.Printf("Error terminating job: %s\n", errTerm)
}

关键概念

  1. DRMAA会话:应用程序需要先打开DRMAA会话才能执行DRMAA调用。会话关闭时必须调用Exit()方法。

  2. 作业模板:包含作业的规范,如要执行的命令及其参数。使用完成后需要删除以防止内存泄漏。

  3. 作业状态:可以通过相应方法改变作业状态(暂停/恢复/保持/删除)。

  4. 作业信息:JobInfo数据结构包含作业的运行时信息,如退出状态或使用的资源量。

注意事项

  1. 确保总是调用Exit()方法关闭会话,否则可能会在集群调度器端留下通信句柄。

  2. 在Go中使用defer语句管理资源释放,但注意os.Exit()调用时不会执行defer函数。

  3. 作业模板底层是C分配的,超出Go系统范围,必须确保不再使用时删除它。


更多关于golang基于DRMAA标准的集群调度作业提交插件库drmaa的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于DRMAA标准的集群调度作业提交插件库drmaa的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 基于 DRMAA 标准的集群调度作业提交插件库 drmaa 使用指南

DRMAA (Distributed Resource Management Application API) 是一个开放标准,用于与集群资源管理系统(如 Slurm、PBS、LSF 等)进行交互。在 Go 语言中,我们可以使用 github.com/dgruber/drmaa2os 库来实现基于 DRMAA 标准的作业提交。

安装

首先安装必要的库:

go get github.com/dgruber/drmaa2os

基本使用

1. 初始化 DRMAA 会话

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/dgruber/drmaa2os"
	"github.com/dgruber/drmaa2interface"
)

func main() {
	// 创建 DRMAA 会话管理器
	sessionManager, err := drmaa2os.NewDRMAASessionManager("")
	if err != nil {
		log.Fatalf("Failed to create session manager: %v", err)
	}

	// 创建新会话
	session, err := sessionManager.CreateSession("mysession")
	if err != nil {
		log.Fatalf("Failed to create session: %v", err)
	}
	defer session.Close()
	defer sessionManager.DestroySession("mysession")

2. 提交作业

	// 定义作业模板
	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "/bin/sleep",  // 要执行的命令
		Args:         []string{"10"}, // 命令参数
		JobName:      "testjob",      // 作业名称
		OutputPath:   "/tmp/job.out", // 标准输出路径
		ErrorPath:    "/tmp/job.err", // 标准错误路径
	}

	// 提交作业
	jobID, err := session.RunJob(jt)
	if err != nil {
		log.Fatalf("Failed to submit job: %v", err)
	}
	fmt.Printf("Submitted job with ID: %s\n", jobID)

3. 监控作业状态

	// 等待作业完成
	fmt.Println("Waiting for job to complete...")
	for {
		state, err := session.JobState(jobID)
		if err != nil {
			log.Fatalf("Failed to get job state: %v", err)
		}

		if state == drmaa2interface.Done || state == drmaa2interface.Failed {
			fmt.Printf("Job finished with state: %s\n", state)
			break
		}

		time.Sleep(1 * time.Second)
	}

	// 获取作业信息
	jobInfo, err := session.JobInfo(jobID)
	if err != nil {
		log.Fatalf("Failed to get job info: %v", err)
	}
	fmt.Printf("Job info:\nExitStatus: %d\nSubmissionTime: %v\nDispatchTime: %v\nFinishTime: %v\n",
		jobInfo.ExitStatus, jobInfo.SubmissionTime, jobInfo.DispatchTime, jobInfo.FinishTime)
}

高级功能

批量作业提交

func submitBatchJobs(session drmaa2interface.Session) {
	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "/bin/echo",
		Args:         []string{"Hello from DRMAA"},
		JobName:      "batchjob",
	}

	// 提交10个相同作业
	jobIDs, err := session.RunBulkJobs(jt, 1, 10, 1)
	if err != nil {
		log.Fatalf("Failed to submit batch jobs: %v", err)
	}

	fmt.Printf("Submitted batch jobs with IDs: %v\n", jobIDs)
}

作业阵列

func submitJobArray(session drmaa2interface.Session) {
	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "/bin/echo",
		Args:         []string{"Hello from job array task ${DRMAA_JOB_ARRAY_TASK_ID}"},
		JobName:      "arrayjob",
	}

	// 提交作业阵列,任务ID从1到5
	jobID, err := session.RunJobArray(jt, 1, 5, 1)
	if err != nil {
		log.Fatalf("Failed to submit job array: %v", err)
	}

	fmt.Printf("Submitted job array with ID: %s\n", jobID)
}

注意事项

  1. 环境配置:使用前需要确保系统已安装并正确配置了支持的集群调度系统(如 Slurm、PBS 等),并且设置了正确的环境变量。

  2. 资源管理器选择drmaa2os 支持多种资源管理器,可以通过环境变量 DRMAA2_DRM_SYSTEM 指定,如:

    os.Setenv("DRMAA2_DRM_SYSTEM", "slurm")
    
  3. 错误处理:DRMAA 操作可能会因各种原因失败,务必检查所有错误返回值。

  4. 会话管理:确保在使用后关闭会话并清理资源,避免资源泄漏。

  5. 模板选项JobTemplate 提供了许多选项来控制作业行为,如工作目录、环境变量、资源需求等。

通过这个 DRMAA 接口库,Go 程序可以方便地与各种集群调度系统集成,实现作业的提交、监控和管理功能。

回到顶部