golang基于HTTP API的goroutine进程管理插件pm的使用
Golang基于HTTP API的goroutine进程管理插件pm的使用
pm是一个带有HTTP接口的进程管理器,用于检查和管API服务器程序。它可以跟踪程序中的用户定义任务(如HTTP请求),并通过HTTP接口提供相关信息。
基本使用
启动HTTP服务
首先需要启动HTTP服务来监听任务信息的请求:
go pm.ListenAndServe(":8081") // 在8081端口启动HTTP服务
跟踪任务
要跟踪的任务需要调用Start()
方法,并传入任务ID和可选属性。任务完成后需要调用Done()
:
// 开始跟踪任务
pm.Start(requestID, nil, map[string]interface{}{
"host": req.RemoteAddr, // 客户端地址
"method": req.Method, // HTTP方法
"uri": req.RequestURI, // 请求URI
})
defer pm.Done(requestID) // 确保任务结束时调用Done
更新任务状态
任务可以随时更新其状态:
pm.Status(requestID, "processing") // 更新任务状态为"processing"
HTTP API接口
启动服务后,可以通过以下HTTP接口访问任务信息:
# 获取所有运行中的进程
curl http://localhost:8081/procs/
# 获取特定进程的历史记录
curl http://localhost:8081/procs/<id>/history
响应为JSON格式,包含当前运行的进程集或指定ID的完整历史记录。
任务取消功能
可以通过HTTP接口取消任务:
curl -X DELETE http://localhost:8081/procs/<id>
注意:在Go中取消运行中的goroutine是通过panic实现的。如果不需此功能,可以在启动任何任务前禁用:
pm.SetOptions(ProclistOptions{
ForbidCancel: true // 禁用取消功能
})
完整示例
下面是一个完整的HTTP服务器示例,使用pm跟踪每个请求:
package main
import (
"fmt"
"net/http"
"github.com/VividCortex/pm"
)
func handler(w http.ResponseWriter, req *http.Request) {
// 为每个请求生成唯一ID
requestID := fmt.Sprintf("req-%d", time.Now().UnixNano())
// 开始跟踪请求
pm.Start(requestID, nil, map[string]interface{}{
"host": req.RemoteAddr,
"method": req.Method,
"uri": req.RequestURI,
})
defer pm.Done(requestID) // 确保请求结束时调用Done
// 更新状态
pm.Status(requestID, "processing")
// 模拟处理
time.Sleep(100 * time.Millisecond)
// 更新状态
pm.Status(requestID, "completed")
w.Write([]byte("Request processed"))
}
func main() {
// 启动pm HTTP服务
go pm.ListenAndServe(":8081")
// 设置HTTP服务器
http.HandleFunc("/", handler)
http.ListenAndServe(":8080", nil)
}
启动后可以通过以下命令查看运行中的请求:
curl http://localhost:8081/procs/
更多关于golang基于HTTP API的goroutine进程管理插件pm的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang基于HTTP API的goroutine进程管理插件pm的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang基于HTTP API的goroutine进程管理插件pm的使用
在Golang中,goroutine是轻量级线程,但管理和监控大量goroutine可能会变得复杂。下面我将介绍如何使用一个简单的基于HTTP API的goroutine进程管理插件(pm),并提供完整的示例代码。
基本概念
pm
插件主要提供以下功能:
- 启动/停止goroutine
- 查看运行中的goroutine状态
- 通过HTTP API进行管理
实现代码
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// ProcessManager 管理所有goroutine
type ProcessManager struct {
processes map[string]*Process
mu sync.Mutex
}
// Process 表示一个goroutine进程
type Process struct {
ID string
Running bool
Stop chan struct{}
}
// NewProcessManager 创建新的进程管理器
func NewProcessManager() *ProcessManager {
return &ProcessManager{
processes: make(map[string]*Process),
}
}
// StartProcess 启动一个新进程
func (pm *ProcessManager) StartProcess(id string, task func(stop <-chan struct{})) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, exists := pm.processes[id]; exists {
return fmt.Errorf("process with ID %s already exists", id)
}
stop := make(chan struct{})
p := &Process{
ID: id,
Running: true,
Stop: stop,
}
pm.processes[id] = p
// 启动goroutine
go func() {
task(stop)
pm.mu.Lock()
p.Running = false
pm.mu.Unlock()
}()
return nil
}
// StopProcess 停止一个进程
func (pm *ProcessManager) StopProcess(id string) error {
pm.mu.Lock()
defer pm.mu.Unlock()
p, exists := pm.processes[id]
if !exists {
return fmt.Errorf("process with ID %s not found", id)
}
if !p.Running {
return nil
}
close(p.Stop)
p.Running = false
return nil
}
// ListProcesses 列出所有进程
func (pm *ProcessManager) ListProcesses() []Process {
pm.mu.Lock()
defer pm.mu.Unlock()
var list []Process
for _, p := range pm.processes {
list = append(list, *p)
}
return list
}
// HTTP API处理器
func (pm *ProcessManager) handleStart(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
type request struct {
ID string `json:"id"`
}
var req request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 示例任务 - 每秒打印一次
task := func(stop <-chan struct{}) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Printf("Process %s is running", req.ID)
case <-stop:
log.Printf("Process %s stopped", req.ID)
return
}
}
}
if err := pm.StartProcess(req.ID, task); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
fmt.Fprintf(w, "Process %s started", req.ID)
}
func (pm *ProcessManager) handleStop(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
type request struct {
ID string `json:"id"`
}
var req request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := pm.StopProcess(req.ID); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Process %s stopped", req.ID)
}
func (pm *ProcessManager) handleList(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
processes := pm.ListProcesses()
json.NewEncoder(w).Encode(processes)
}
func main() {
pm := NewProcessManager()
// 设置HTTP路由
http.HandleFunc("/process/start", pm.handleStart)
http.HandleFunc("/process/stop", pm.handleStop)
http.HandleFunc("/process/list", pm.handleList)
// 启动HTTP服务器
log.Println("Starting process manager API on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
使用说明
1. 启动进程
curl -X POST -H "Content-Type: application/json" -d '{"id":"task1"}' http://localhost:8080/process/start
2. 停止进程
curl -X POST -H "Content-Type: application/json" -d '{"id":"task1"}' http://localhost:8080/process/stop
3. 查看进程列表
curl http://localhost:8080/process/list
功能扩展建议
- 进程健康检查:可以定期检查goroutine是否还在运行
- 自动重启:当goroutine意外退出时自动重启
- 资源限制:限制每个goroutine的资源使用
- 日志记录:记录每个goroutine的活动日志
- 性能监控:监控每个goroutine的CPU和内存使用情况
注意事项
- 使用
sync.Mutex
确保线程安全 - 通过
channel
实现优雅停止 - 每个goroutine应该有明确的退出条件
- 避免goroutine泄漏
这个简单的pm插件提供了基本的goroutine管理功能,你可以根据实际需求进行扩展和完善。