Golang中如何控制HTTP处理程序的并发

Golang中如何控制HTTP处理程序的并发 大家好,我确实需要帮助——看起来我的思路有些冲突了 :)

我有一个用Go(通过标准库)编写的HTTP服务器,其中一个处理器用于生成元数据。

func handler(w http.ResponseWriter, r *http.Request) {
    path, ok := r.URL.Query()["path"]

    // ....
    // 检查和其他操作

    err := BuildMetadata(path)

    // ....
    // 检查和其他操作
}

每个客户端都可以触发元数据生成,但我必须确保,对于特定的路径同时只能有1个元数据生成进程在运行,而对于不同的路径,则可以同时运行多个。

我的 BuildMetadata() 函数签名如下:

func BuildMetadata(path string) error {

	tasks := make(chan string, 1)
	done := make(chan struct{})
	errors := make(chan error, 1)
	tasks <- path


	go func(tasks chan string, done chan struct{}, errors chan error) {
		fp := <-tasks
		log.Printf("New task for %s metadata has been created\n", fp)

        // 这里发生的是不太有趣的部分
		if err := rebuildMeta(fp); err != nil {
			errors <- err
		}
		done <- struct{}{}
	}(tasks, done, errors)

	select {
	case err := <-errors:
		return err
	case <-done:
		log.Printf("Metadata has been created\n")
		return nil
	}
}

我不确定我是否满足了自己的要求(对于特定路径,只能有一个元数据生成进程在运行)。

据我所知,每个处理器都在自己的goroutine中运行: https://go.dev/src/net/http/server.go#L3071

我担心我在并发处理上做错了——当执行 BuildMetadata 时,每个goroutine都会独立创建一堆新的通道,不是吗?

我该如何控制这种情况,并且不重复造轮子(这对作为Go学习者的我来说是最重要的)。

我非常感谢任何建议、链接或批评——请帮助我理解Go。

谢谢。


更多关于Golang中如何控制HTTP处理程序的并发的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

构建一个从路径到任务通道的映射。根据路径向通道写入数据。我认为您需要的是无缓冲通道。

更多关于Golang中如何控制HTTP处理程序的并发的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


感谢Jeff的回复,非常感谢。

我希望我理解了你的意思,大概是这样的:

type Task map[string]struct{}

func BuildMetadata(path string) error {

	tasks := make(chan Task)
	done := make(chan struct{})
	errors := make(chan error, 1)

	go func(tasks chan Task, done chan struct{}, errors chan error) {
		for fp := range <-tasks {
			log.Printf("New task for %s metadata has been created\n", fp)

			if err := f.rebuildMetadata(p); err != nil {
				errors <- err
			}
			done <- struct{}{}
		}
	}(tasks, done, errors)

	t := Task{path: struct{}{}}
	tasks <- t

	select {
	case err := <-errors:
		return err
	case <-done:
		log.Printf("Metadata has been created\n")
		return nil
	}
}

}

这段代码足够好,能被接受吗?(不太确定)=)

通过为每个请求创建一个新的通道和goroutine,你并没有强制执行任何互斥。

我会这样做,尽管未经测试:

type Task struct{
	resultChan chan Result
	errorChan chan error
	taskParameters interface{}
}

var chansPerPath map[string]chan Task = make(map[string]chan Task)

func addPath(path string) {
	ch, ok := chansPerPath[path]
	if !ok {
		pathChan = make(chan Task)
		resultChan = make(chan Task)
		chansPerPath[ch] = ch
		go func(taskChan chanTask) {
			for t:= range taskChan {
				result,err := performTask(t.taskParameters)
				if err!=nil {
					t.errorChan <- err
				} else {
					t.resultChan <- result
				}
			}
		}
	}
}

// 假设每个请求都并发运行此函数。
func handlePath(path string) {
	addPath(path)

	task := Task{
		resultChan: make(chan Result)
		errorChan: make(chan error)
		taskParameters: getTaskParameters(path)
	}

	// 每个路径最多只能处理一个任务。
	chansPerPath[path] <- task
	select {
		case result := <-task.resultChan:
			// 对结果进行处理
		case err := <-task.errChan:
			// 对结果进行处理
	}

}

要实现特定路径的并发控制,可以使用sync.Map配合互斥锁。以下是改进方案:

import (
    "sync"
    "net/http"
    "log"
)

var (
    // 存储每个路径的互斥锁
    pathLocks sync.Map
)

func handler(w http.ResponseWriter, r *http.Request) {
    path, ok := r.URL.Query()["path"]
    if !ok || len(path) == 0 {
        http.Error(w, "path parameter required", http.StatusBadRequest)
        return
    }
    
    // 获取或创建该路径的锁
    mu, _ := pathLocks.LoadOrStore(path[0], &sync.Mutex{})
    mutex := mu.(*sync.Mutex)
    
    // 锁定特定路径
    mutex.Lock()
    defer mutex.Unlock()
    
    err := BuildMetadata(path[0])
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    w.WriteHeader(http.StatusOK)
    w.Write([]byte("Metadata built successfully"))
}

func BuildMetadata(path string) error {
    log.Printf("Building metadata for path: %s", path)
    
    // 模拟元数据构建过程
    // 这里可以调用实际的rebuildMeta函数
    // if err := rebuildMeta(path); err != nil {
    //     return err
    // }
    
    log.Printf("Metadata created for path: %s", path)
    return nil
}

更高效的方案使用singleflight包,避免重复计算:

import (
    "golang.org/x/sync/singleflight"
    "net/http"
    "log"
    "time"
)

var (
    metadataGroup singleflight.Group
)

func handler(w http.ResponseWriter, r *http.Request) {
    path, ok := r.URL.Query()["path"]
    if !ok || len(path) == 0 {
        http.Error(w, "path parameter required", http.StatusBadRequest)
        return
    }
    
    // 对相同路径的请求进行合并
    result, err, shared := metadataGroup.Do(path[0], func() (interface{}, error) {
        log.Printf("Starting metadata build for: %s", path[0])
        
        // 模拟耗时操作
        time.Sleep(2 * time.Second)
        
        // 实际构建元数据
        // if err := rebuildMeta(path[0]); err != nil {
        //     return nil, err
        // }
        
        log.Printf("Completed metadata build for: %s", path[0])
        return "success", nil
    })
    
    log.Printf("Request for %s - shared result: %v", path[0], shared)
    
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    w.WriteHeader(http.StatusOK)
    w.Write([]byte(result.(string)))
}

如果需要限制全局并发数,可以使用信号量:

import (
    "net/http"
    "log"
    "golang.org/x/sync/semaphore"
    "context"
    "sync"
)

var (
    globalSem = semaphore.NewWeighted(10) // 全局最多10个并发
    pathLocks sync.Map
)

func handler(w http.ResponseWriter, r *http.Request) {
    path, ok := r.URL.Query()["path"]
    if !ok || len(path) == 0 {
        http.Error(w, "path parameter required", http.StatusBadRequest)
        return
    }
    
    // 获取全局并发许可
    if err := globalSem.Acquire(context.Background(), 1); err != nil {
        http.Error(w, "server busy", http.StatusServiceUnavailable)
        return
    }
    defer globalSem.Release(1)
    
    // 获取路径特定锁
    mu, _ := pathLocks.LoadOrStore(path[0], &sync.Mutex{})
    mutex := mu.(*sync.Mutex)
    
    mutex.Lock()
    defer mutex.Unlock()
    
    err := BuildMetadata(path[0])
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    w.WriteHeader(http.StatusOK)
    w.Write([]byte("Metadata built successfully"))
}

这些方案确保了:

  1. 相同路径的请求串行执行
  2. 不同路径的请求可以并行执行
  3. 避免重复构建相同路径的元数据
回到顶部