Golang中如何将API和调度器整合到同一个二进制文件/包

Golang中如何将API和调度器整合到同一个二进制文件/包 大家好

我正在尝试实现这样的功能:在同一个包/main中同时运行API和定时调度函数,定期执行某些操作,比如从数据库获取数据生成特定结构体数据,并使用生成的唯一请求ID调用外部API。它本身还带有一个API服务器,用于从其他服务获取数据,通过请求ID映射/参考来获取特定数据。

我能否使用两个goroutine配合waitgroup来控制它们的运行方式? 有没有什么好的方法以这种方式运行定时任务?谢谢。

1 回复

更多关于Golang中如何将API和调度器整合到同一个二进制文件/包的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go语言中,将API服务器和定时调度器整合到同一个二进制文件中是完全可行的,而且goroutine配合sync.WaitGroup是处理这种场景的常见方式。下面是一个完整的实现示例:

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"
)

// 全局存储,用于请求ID到数据的映射
var (
	dataStore = make(map[string]interface{})
	storeMux  sync.RWMutex
)

// 模拟从数据库获取数据
func fetchDataFromDB() interface{} {
	return map[string]interface{}{
		"id":      fmt.Sprintf("req_%d", time.Now().Unix()),
		"data":    "sample data",
		"created": time.Now(),
	}
}

// 模拟调用外部API
func callExternalAPI(requestID string, data interface{}) error {
	log.Printf("调用外部API,请求ID: %s, 数据: %v", requestID, data)
	// 这里实现实际的HTTP调用
	return nil
}

// 定时任务函数
func schedulerTask(wg *sync.WaitGroup, stopChan <-chan struct{}) {
	defer wg.Done()
	
	ticker := time.NewTicker(30 * time.Second) // 每30秒执行一次
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			// 从数据库获取数据
			data := fetchDataFromDB()
			
			// 生成唯一请求ID
			requestID := fmt.Sprintf("req_%d", time.Now().UnixNano())
			
			// 存储数据
			storeMux.Lock()
			dataStore[requestID] = data
			storeMux.Unlock()
			
			// 调用外部API
			if err := callExternalAPI(requestID, data); err != nil {
				log.Printf("调用外部API失败: %v", err)
			}
			
			log.Printf("定时任务执行完成,请求ID: %s", requestID)
			
		case <-stopChan:
			log.Println("定时任务停止")
			return
		}
	}
}

// API处理函数 - 根据请求ID获取数据
func getDataHandler(w http.ResponseWriter, r *http.Request) {
	requestID := r.URL.Query().Get("request_id")
	if requestID == "" {
		http.Error(w, "缺少request_id参数", http.StatusBadRequest)
		return
	}
	
	storeMux.RLock()
	data, exists := dataStore[requestID]
	storeMux.RUnlock()
	
	if !exists {
		http.Error(w, "未找到对应的数据", http.StatusNotFound)
		return
	}
	
	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(data)
}

// API处理函数 - 获取所有请求ID
func listRequestsHandler(w http.ResponseWriter, r *http.Request) {
	storeMux.RLock()
	defer storeMux.RUnlock()
	
	requestIDs := make([]string, 0, len(dataStore))
	for id := range dataStore {
		requestIDs = append(requestIDs, id)
	}
	
	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(requestIDs)
}

func main() {
	var wg sync.WaitGroup
	stopChan := make(chan struct{})
	
	// 启动定时调度器
	wg.Add(1)
	go schedulerTask(&wg, stopChan)
	
	// 设置API路由
	http.HandleFunc("/api/data", getDataHandler)
	http.HandleFunc("/api/requests", listRequestsHandler)
	
	// 启动HTTP服务器
	server := &http.Server{
		Addr:    ":8080",
		Handler: nil,
	}
	
	// 启动HTTP服务器goroutine
	wg.Add(1)
	go func() {
		defer wg.Done()
		log.Println("API服务器启动在 :8080")
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("HTTP服务器错误: %v", err)
		}
	}()
	
	// 等待中断信号
	fmt.Println("按Ctrl+C停止服务...")
	
	// 简单的信号处理
	<-make(chan struct{}) // 在实际应用中可以使用os.Signal通道
	
	// 优雅关闭
	log.Println("开始关闭服务...")
	close(stopChan)
	
	// 关闭HTTP服务器
	if err := server.Close(); err != nil {
		log.Printf("关闭HTTP服务器错误: %v", err)
	}
	
	// 等待所有goroutine完成
	wg.Wait()
	log.Println("服务已完全停止")
}

这个实现展示了:

  1. 定时调度器:使用time.Ticker每30秒执行一次任务,包括数据获取、存储和外部API调用
  2. API服务器:提供两个端点:
    • /api/data?request_id=<id> - 根据请求ID获取特定数据
    • /api/requests - 列出所有可用的请求ID
  3. 并发安全:使用sync.RWMutex保护共享的dataStore
  4. 优雅关闭:通过通道控制goroutine的停止

运行方式:

go run main.go

测试API:

# 获取所有请求ID
curl http://localhost:8080/api/requests

# 根据请求ID获取数据
curl "http://localhost:8080/api/data?request_id=req_123456789"

这种架构模式在Go中非常常见,能够有效地在同一进程中管理不同类型的任务。

回到顶部