Golang中如何控制HTTP处理程序的并发
Golang中如何控制HTTP处理程序的并发 大家好,我确实需要帮助——看起来我的思路有些冲突了 :)
我有一个用Go(通过标准库)编写的HTTP服务器,其中一个处理器用于生成元数据。
func handler(w http.ResponseWriter, r *http.Request) {
path, ok := r.URL.Query()["path"]
// ....
// 检查和其他操作
err := BuildMetadata(path)
// ....
// 检查和其他操作
}
每个客户端都可以触发元数据生成,但我必须确保,对于特定的路径,同时只能有1个元数据生成进程在运行,而对于不同的路径,则可以同时运行多个。
我的 BuildMetadata() 函数签名如下:
func BuildMetadata(path string) error {
tasks := make(chan string, 1)
done := make(chan struct{})
errors := make(chan error, 1)
tasks <- path
go func(tasks chan string, done chan struct{}, errors chan error) {
fp := <-tasks
log.Printf("New task for %s metadata has been created\n", fp)
// 这里发生的是不太有趣的部分
if err := rebuildMeta(fp); err != nil {
errors <- err
}
done <- struct{}{}
}(tasks, done, errors)
select {
case err := <-errors:
return err
case <-done:
log.Printf("Metadata has been created\n")
return nil
}
}
我不确定我是否满足了自己的要求(对于特定路径,只能有一个元数据生成进程在运行)。
据我所知,每个处理器都在自己的goroutine中运行:
https://go.dev/src/net/http/server.go#L3071
我担心我在并发处理上做错了——当执行 BuildMetadata 时,每个goroutine都会独立创建一堆新的通道,不是吗?
我该如何控制这种情况,并且不重复造轮子(这对作为Go学习者的我来说是最重要的)。
我非常感谢任何建议、链接或批评——请帮助我理解Go。
谢谢。
更多关于Golang中如何控制HTTP处理程序的并发的实战教程也可以访问 https://www.itying.com/category-94-b0.html
构建一个从路径到任务通道的映射。根据路径向通道写入数据。我认为您需要的是无缓冲通道。
更多关于Golang中如何控制HTTP处理程序的并发的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
感谢Jeff的回复,非常感谢。
我希望我理解了你的意思,大概是这样的:
type Task map[string]struct{}
func BuildMetadata(path string) error {
tasks := make(chan Task)
done := make(chan struct{})
errors := make(chan error, 1)
go func(tasks chan Task, done chan struct{}, errors chan error) {
for fp := range <-tasks {
log.Printf("New task for %s metadata has been created\n", fp)
if err := f.rebuildMetadata(p); err != nil {
errors <- err
}
done <- struct{}{}
}
}(tasks, done, errors)
t := Task{path: struct{}{}}
tasks <- t
select {
case err := <-errors:
return err
case <-done:
log.Printf("Metadata has been created\n")
return nil
}
}
}
这段代码足够好,能被接受吗?(不太确定)=)
通过为每个请求创建一个新的通道和goroutine,你并没有强制执行任何互斥。
我会这样做,尽管未经测试:
type Task struct{
resultChan chan Result
errorChan chan error
taskParameters interface{}
}
var chansPerPath map[string]chan Task = make(map[string]chan Task)
func addPath(path string) {
ch, ok := chansPerPath[path]
if !ok {
pathChan = make(chan Task)
resultChan = make(chan Task)
chansPerPath[ch] = ch
go func(taskChan chanTask) {
for t:= range taskChan {
result,err := performTask(t.taskParameters)
if err!=nil {
t.errorChan <- err
} else {
t.resultChan <- result
}
}
}
}
}
// 假设每个请求都并发运行此函数。
func handlePath(path string) {
addPath(path)
task := Task{
resultChan: make(chan Result)
errorChan: make(chan error)
taskParameters: getTaskParameters(path)
}
// 每个路径最多只能处理一个任务。
chansPerPath[path] <- task
select {
case result := <-task.resultChan:
// 对结果进行处理
case err := <-task.errChan:
// 对结果进行处理
}
}
要实现特定路径的并发控制,可以使用sync.Map配合互斥锁。以下是改进方案:
import (
"sync"
"net/http"
"log"
)
var (
// 存储每个路径的互斥锁
pathLocks sync.Map
)
func handler(w http.ResponseWriter, r *http.Request) {
path, ok := r.URL.Query()["path"]
if !ok || len(path) == 0 {
http.Error(w, "path parameter required", http.StatusBadRequest)
return
}
// 获取或创建该路径的锁
mu, _ := pathLocks.LoadOrStore(path[0], &sync.Mutex{})
mutex := mu.(*sync.Mutex)
// 锁定特定路径
mutex.Lock()
defer mutex.Unlock()
err := BuildMetadata(path[0])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Metadata built successfully"))
}
func BuildMetadata(path string) error {
log.Printf("Building metadata for path: %s", path)
// 模拟元数据构建过程
// 这里可以调用实际的rebuildMeta函数
// if err := rebuildMeta(path); err != nil {
// return err
// }
log.Printf("Metadata created for path: %s", path)
return nil
}
更高效的方案使用singleflight包,避免重复计算:
import (
"golang.org/x/sync/singleflight"
"net/http"
"log"
"time"
)
var (
metadataGroup singleflight.Group
)
func handler(w http.ResponseWriter, r *http.Request) {
path, ok := r.URL.Query()["path"]
if !ok || len(path) == 0 {
http.Error(w, "path parameter required", http.StatusBadRequest)
return
}
// 对相同路径的请求进行合并
result, err, shared := metadataGroup.Do(path[0], func() (interface{}, error) {
log.Printf("Starting metadata build for: %s", path[0])
// 模拟耗时操作
time.Sleep(2 * time.Second)
// 实际构建元数据
// if err := rebuildMeta(path[0]); err != nil {
// return nil, err
// }
log.Printf("Completed metadata build for: %s", path[0])
return "success", nil
})
log.Printf("Request for %s - shared result: %v", path[0], shared)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(result.(string)))
}
如果需要限制全局并发数,可以使用信号量:
import (
"net/http"
"log"
"golang.org/x/sync/semaphore"
"context"
"sync"
)
var (
globalSem = semaphore.NewWeighted(10) // 全局最多10个并发
pathLocks sync.Map
)
func handler(w http.ResponseWriter, r *http.Request) {
path, ok := r.URL.Query()["path"]
if !ok || len(path) == 0 {
http.Error(w, "path parameter required", http.StatusBadRequest)
return
}
// 获取全局并发许可
if err := globalSem.Acquire(context.Background(), 1); err != nil {
http.Error(w, "server busy", http.StatusServiceUnavailable)
return
}
defer globalSem.Release(1)
// 获取路径特定锁
mu, _ := pathLocks.LoadOrStore(path[0], &sync.Mutex{})
mutex := mu.(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
err := BuildMetadata(path[0])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Metadata built successfully"))
}
这些方案确保了:
- 相同路径的请求串行执行
- 不同路径的请求可以并行执行
- 避免重复构建相同路径的元数据

