Golang Web应用中如何调用命令行程序

Golang Web应用中如何调用命令行程序 我已经用Go语言编写了一个命令行应用程序,现在想为其添加一个Web前端界面。由于该应用程序可能需要长达10秒的运行时间,我不希望简单地将代码整合到Web框架中并在用户点击时直接调用。我希望采用某种方式将代码作为独立应用/线程/协程等进行调用,然后在任务完成后通知用户,以便他们可以返回查看结果。

显然,应用程序可能会被多次调用,为了避免服务器过载,我需要建立某种工作队列机制,将任务放入队列中,确保同时运行的任务数量不超过X个。

我能想到几种实现方式,但想知道是否已经有现成的框架可以完成这个功能。有什么建议吗?

6 回复

谢谢,我会看看的。

更多关于Golang Web应用中如何调用命令行程序的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


谢谢,我会看看的。

或者,你可以在 Lambda 中使用 JSON Web Tokens 实现基本身份验证,但你可能需要一个数据库,正如你所说,这可能是为你的用例重新发明轮子。

这听起来是其中的一部分,但是否有任何工具可以帮助追踪谁负责哪些任务?

我知道如何实现,但是否已有现成的方案来管理 Lambda 调用?不是实际调用它们,而是管理整个流程?只是想避免重复造轮子。

如果我理解您的使用场景,这听起来非常适合使用 AWS Lambda,或者更具体地说,使用 Serverless Framework

这可能符合也可能不符合以下情况:

我不想仅仅将代码合并到Web框架中,并在用户点击时调用它

但实际上,您可以做的是使用一个或多个Lambda函数作为命令行应用的HTTP API,并围绕它构建前端界面。

希望这对您有所帮助。

这听起来像是部分原因,但有没有什么方法可以帮助跟踪谁拥有哪些任务?

我想这取决于在多大程度上,本质上,向 lambda 发出请求的人可以被视为所有者

如果你指的是管理权限方面,AWS 提供了一些服务,具体取决于你希望权限在流程中的哪个位置。

如果你决定使用 Lambda,你可以通过 AWS Cognito 来控制对 REST API 的访问。

在Go语言中实现一个带有工作队列的Web应用来异步调用命令行程序是常见的需求。以下是使用标准库和常见包实现的解决方案:

核心实现方案

1. 工作队列和任务管理

package main

import (
    "context"
    "fmt"
    "log"
    "os/exec"
    "sync"
    "time"
)

type Task struct {
    ID        string
    Command   string
    Args      []string
    Status    string // "pending", "running", "completed", "failed"
    Result    string
    CreatedAt time.Time
}

type WorkerPool struct {
    maxWorkers int
    taskQueue  chan Task
    results    map[string]*Task
    mu         sync.RWMutex
    wg         sync.WaitGroup
}

func NewWorkerPool(maxWorkers int) *WorkerPool {
    return &WorkerPool{
        maxWorkers: maxWorkers,
        taskQueue:  make(chan Task, 100),
        results:    make(map[string]*Task),
    }
}

func (wp *WorkerPool) Start(ctx context.Context) {
    for i := 0; i < wp.maxWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(ctx, i)
    }
}

func (wp *WorkerPool) worker(ctx context.Context, id int) {
    defer wp.wg.Done()
    
    for {
        select {
        case task := <-wp.taskQueue:
            wp.executeTask(task)
        case <-ctx.Done():
            return
        }
    }
}

func (wp *WorkerPool) executeTask(task Task) {
    wp.mu.Lock()
    task.Status = "running"
    wp.results[task.ID] = &task
    wp.mu.Unlock()

    cmd := exec.Command(task.Command, task.Args...)
    output, err := cmd.CombinedOutput()

    wp.mu.Lock()
    defer wp.mu.Unlock()
    
    if err != nil {
        task.Status = "failed"
        task.Result = fmt.Sprintf("Error: %v\nOutput: %s", err, string(output))
    } else {
        task.Status = "completed"
        task.Result = string(output)
    }
    wp.results[task.ID] = &task
}

func (wp *WorkerPool) SubmitTask(task Task) error {
    wp.mu.Lock()
    defer wp.mu.Unlock()
    
    if _, exists := wp.results[task.ID]; exists {
        return fmt.Errorf("task ID %s already exists", task.ID)
    }
    
    task.Status = "pending"
    task.CreatedAt = time.Now()
    wp.results[task.ID] = &task
    
    select {
    case wp.taskQueue <- task:
        return nil
    default:
        return fmt.Errorf("task queue is full")
    }
}

func (wp *WorkerPool) GetTaskStatus(taskID string) (*Task, bool) {
    wp.mu.RLock()
    defer wp.mu.RUnlock()
    
    task, exists := wp.results[taskID]
    return task, exists
}

2. Web接口实现

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/google/uuid"
    "github.com/gorilla/mux"
)

type WebServer struct {
    workerPool *WorkerPool
    router     *mux.Router
}

func NewWebServer(workerPool *WorkerPool) *WebServer {
    ws := &WebServer{
        workerPool: workerPool,
        router:     mux.NewRouter(),
    }
    ws.setupRoutes()
    return ws
}

func (ws *WebServer) setupRoutes() {
    ws.router.HandleFunc("/task", ws.submitTask).Methods("POST")
    ws.router.HandleFunc("/task/{id}", ws.getTaskStatus).Methods("GET")
    ws.router.HandleFunc("/tasks", ws.listTasks).Methods("GET")
}

func (ws *WebServer) submitTask(w http.ResponseWriter, r *http.Request) {
    var request struct {
        Command string   `json:"command"`
        Args    []string `json:"args"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        http.Error(w, "Invalid request body", http.StatusBadRequest)
        return
    }
    
    taskID := uuid.New().String()
    task := Task{
        ID:      taskID,
        Command: request.Command,
        Args:    request.Args,
    }
    
    if err := ws.workerPool.SubmitTask(task); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    response := map[string]string{
        "task_id": taskID,
        "status":  "submitted",
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

func (ws *WebServer) getTaskStatus(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    taskID := vars["id"]
    
    task, exists := ws.workerPool.GetTaskStatus(taskID)
    if !exists {
        http.Error(w, "Task not found", http.StatusNotFound)
        return
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(task)
}

func (ws *WebServer) listTasks(w http.ResponseWriter, r *http.Request) {
    // 实现获取所有任务列表的逻辑
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(map[string]string{"message": "Task list endpoint"})
}

func (ws *WebServer) Start(addr string) error {
    return http.ListenAndServe(addr, ws.router)
}

3. 主程序入口

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    workerPool := NewWorkerPool(3) // 最多同时运行3个任务
    workerPool.Start(ctx)

    webServer := NewWebServer(workerPool)

    go func() {
        if err := webServer.Start(":8080"); err != nil {
            log.Fatalf("Web server failed: %v", err)
        }
    }()

    log.Println("Server started on :8080")

    // 等待中断信号
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    cancel()
    workerPool.wg.Wait()
    log.Println("Server shutdown complete")
}

4. 使用示例

启动服务后,可以通过HTTP API提交任务:

# 提交任务
curl -X POST http://localhost:8080/task \
  -H "Content-Type: application/json" \
  -d '{
    "command": "sleep",
    "args": ["5"]
  }'

# 获取任务状态
curl http://localhost:8080/task/{task_id}

现有框架选项

对于更复杂的生产环境,可以考虑以下框架:

// 使用 machinery(基于Redis的分布式任务队列)
import (
    "github.com/RichardKnop/machinery/v1"
    "github.com/RichardKnop/machinery/v1/config"
)

func setupMachinery() (*machinery.Server, error) {
    cnf := &config.Config{
        Broker:        "redis://localhost:6379",
        ResultBackend: "redis://localhost:6379",
        DefaultQueue:  "machinery_tasks",
    }
    
    server, err := machinery.NewServer(cnf)
    if err != nil {
        return nil, err
    }
    
    return server, nil
}

这个实现提供了完整的工作队列机制、并发控制、任务状态跟踪和Web接口,可以直接集成到现有的Go Web应用中。

回到顶部