Golang新手如何处理并发数据库请求中的重复数据问题
Golang新手如何处理并发数据库请求中的重复数据问题 我的数据库中的数据量非常大,多个用户可能经常访问相同的数据,因此我正在尝试实现一个基本的缓存结构。所以想法是:
- 用户 A 请求参数为 (x, y, z) 的数据集——这通常需要 30 秒到 1 分钟。
- 在数据获取过程中,用户 B 也请求数据集 (x, y, z)。
- 代码应该知道已经在获取该数据,等待 A 的请求完成,然后两个用户都将得到结果。
同样地,用户 C 可能在 30 分钟后(在缓存时间限制内)请求,并立即获得 (x, y, z) 的结果,因为数据已经被检索过。我的代码已经完美处理了用户 A 和用户 C 的情况,因为数据库结果确实被缓存了。
我刚开始学习 Go 编程大约 3 天,所以还是个新手。我认为这应该通过 Mutex、锁与缓存配合来完成,但我不太确定……
以下是一些伪 Go 代码。这对于用户 C 的情况(数据已加载,我们只是从缓存中获取)运行完美。让我困惑的是用户 B 的情况……
func queryHandler(w http.ResponseWriter, r *http.Request) {
// 从请求中获取 x、y 和 z 参数
// 根据提供的参数创建数据库查询,并返回一个标识此查询的键
query,key := formQuery(x,y,z)
if (!cache.Has(key)) {
queryDatabaseAndCacheTheResult(query,key)
}
// 此时,无论结果已经在缓存中,还是我们从数据库获取,我们都应该拥有数据
processDataForDisplay(w,cache.Get(key))
}
更多关于Golang新手如何处理并发数据库请求中的重复数据问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html
访问不受限制。唯一的要求是我们不希望进行额外的数据库查询。如果我们已经在获取一个大型数据集,我不想开始第二次或第三次获取数据。
更多关于Golang新手如何处理并发数据库请求中的重复数据问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
代码应该知道它已经在获取该数据,等待用户A的请求完成,然后两个用户都将得到他们的结果。
能否进一步解释为什么两个用户不能获取相同的数据?限制访问的目的是什么?
geosoft1:
我认为事情没那么简单。
你说得对。物化视图必须通过触发器自动更新,以保持视图的最新状态。
我认为事情并非那么简单。本质上,服务器默认会缓存请求,但当数据库中的数据发生变化时,缓存必须刷新,这通常发生在下一次请求时,从而导致性能下降。我们的想法是由另一方来维护并定期更新缓存。
唯一的要求是我们不想进行额外的数据库访问。
听起来这可能是 SQL Server 上“物化视图”的一个候选方案?让服务器来处理缓存问题……
感谢Jakob。我尝试了一个使用sync.WaitGroup对象的解决方案,它似乎大部分情况下都能工作,但有一些例外情况,并且当我使用-race标志运行go时,它会抛出一些消息。我尝试实现这个方案,但遇到了一堆空指针异常。我认为可能是我对"创建新锁,将其设置在映射中和rwm"的实现不太正确。
我理解了关于第二个缓存的想法。我尝试了类似这样的方案,其中第一个缓存设置一种临时的“加载中”状态。第二个进程看到这个加载状态,因此它知道不应该去获取数据,但我遇到的问题是,它如何实现“等待”部分。它应该保持在等待状态,直到“加载中”状态消失,然后它就会知道第一个实际的数据缓存现在已经有了数据。
感谢Jakob的建议。我认为你说到点子上了。我会再多读一些关于互斥锁的资料,但你能不能提供一些伪代码,或者链接到一个示例,展示诸如“获取读锁”、“释放读锁”、“获取写锁”和“释放写锁”这些步骤。
让我困惑的部分是:进程A获取了写锁并开始获取数据。进程B开始获取相同的数据,但发现写锁已经被设置了。此时,B正在等待A释放锁,这部分的代码是什么样的?
从最初的帖子来看,这个请求似乎开销很大,可能需要长达一分钟的时间。所以我完全理解缓存后续查询的愿望。
不过我必须诚实地说,我目前还不知道如何精确地解决这个问题。
我可能会从第二个“缓存”开始,它只是将查询映射到一个 sync.Cond,当初始查询处理完毕并放入主缓存后,会在这个 sync.Cond 上调用 Broadcast()。剩下的 goroutine 就可以直接从第二个缓存中获取。
不过我从未使用过 sync.Cond,它的工作方式可能和我的理解有所不同。
好的,我完成了一个功能相近的项目,我的方法如下:
- 我使用了一个映射(map)来存储用户缓存,每个用户用一个键(key)标识,该键对应一个结构体,该结构体以某种方便的格式保存接收到的数据。
- 一个 goroutine 定期从数据源(在您的情况下是数据库)获取数据,并将结果放入
map[user_key]中。 - 一个 API 在用户请求时对外提供映射的内容。
使用这种技术,您可以让 N 个用户在过去的某个时刻(基于请求的周期)并发地访问相同的一致数据。

类似这样的代码
// var cacheLocksLock sync.Mutex 保护 cacheLocks
// var cacheLocks map[string]*sync.RWMutex 保护单个缓存键
// var cacheKey string 是当前请求的键
cacheLocksLock.Lock()
rwm, ok := cacheLocks[cacheKey]
if !ok { create new lock, set it in the map and `rwm` }
cacheLocksLock.Unlock()
// 尝试快速读取缓存,希望能成功。
rwm.RLock()
// 只要没有协程持有写锁,多个协程可以同时持有读锁。
data := cacheFor(cacheKey)
if data != nil {
rwm.RUnlock()
return data
}
rmw.RUnlock()
// 好吧,我们可能需要自己执行查询。
rwm.Lock()
// 只有一个协程可以持有写锁。
data := cacheFor(cacheKey)
if data != nil {
// 其他协程走了和我们相同的路径,但设法先获取了锁并为我们执行了查询。太好了!
rwm.Unlock()
return data
}
// 我们需要获取数据
data = performQuery()
setCachedData(cacheKey, data)
rmw.Unlock()
return data
我的做法是为每个缓存键使用一个单独的读写锁(RWMutex)。当你需要访问数据时,步骤如下:
- 获取对应缓存键的读锁。
- 获取缓存数据。
- 释放读锁。
- 检查从缓存获取的数据——它是否有效且是最新的?如果是,那么我们就完成了。如果数据没问题就返回,否则继续。
这个处理过程可以处理多个并发读取者。
但假设缓存尚未填充,所以我们继续:
- 获取该缓存键的写锁。
- 再次检查缓存?它是否有效且是最新的?如果是,那么我们就完成了。(可能有人在我们释放读锁和获取写锁的间隙,与我们并发地走了这个路径并修复了缓存——我们就是原始描述中的“用户B”。)
- 假设它仍然未被缓存,则执行查询,同时仍然持有写锁。我们就是上面提到的“用户A”。任何在我们查询期间进来请求数据的人,最终都会等待这个写锁,然后走上面提到的“B”分支。
- 更新缓存。
- 释放锁。
- 返回数据,现在数据也已缓存。
当然,你还需要一个 map[string]*sync.RWMutex // 缓存键 -> 锁,而这个映射本身也需要自己的锁。:slight_smile: 这真是锁套锁啊。
map[string]*sync.RWMutex // cache key -> lock
在并发场景下处理重复数据库请求,典型的解决方案是使用sync.Map配合sync.Once或通道。以下是针对你问题的具体实现:
type cacheEntry struct {
once sync.Once
value interface{}
err error
}
type QueryCache struct {
m sync.Map
}
func (c *QueryCache) GetOrLoad(key string, loadFn func() (interface{}, error)) (interface{}, error) {
entry, loaded := c.m.LoadOrStore(key, &cacheEntry{})
e := entry.(*cacheEntry)
if !loaded {
e.once.Do(func() {
e.value, e.err = loadFn()
})
} else {
// 等待其他goroutine完成加载
e.once.Do(func() {}) // 等待已经运行的once完成
}
return e.value, e.err
}
// 使用示例
var globalCache = &QueryCache{}
func queryHandler(w http.ResponseWriter, r *http.Request) {
// 获取参数
x := r.URL.Query().Get("x")
y := r.URL.Query().Get("y")
z := r.URL.Query().Get("z")
query, key := formQuery(x, y, z)
data, err := globalCache.GetOrLoad(key, func() (interface{}, error) {
// 这个函数只会执行一次
return queryDatabaseAndCacheTheResult(query, key)
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
processDataForDisplay(w, data)
}
func queryDatabaseAndCacheTheResult(query, key string) (interface{}, error) {
// 模拟耗时数据库查询
time.Sleep(30 * time.Second)
return fmt.Sprintf("Result for %s", query), nil
}
更完整的实现可以加入缓存过期机制:
type cacheEntryWithExpiry struct {
once sync.Once
value interface{}
err error
timestamp time.Time
expiry time.Duration
}
type QueryCacheWithExpiry struct {
m sync.Map
}
func (c *QueryCacheWithExpiry) GetOrLoad(key string, expiry time.Duration, loadFn func() (interface{}, error)) (interface{}, error) {
now := time.Now()
entry, loaded := c.m.Load(key)
if loaded {
e := entry.(*cacheEntryWithExpiry)
if now.Sub(e.timestamp) < expiry {
return e.value, e.err
}
// 缓存过期,删除旧条目
c.m.Delete(key)
}
newEntry := &cacheEntryWithExpiry{expiry: expiry}
actual, loaded := c.m.LoadOrStore(key, newEntry)
e := actual.(*cacheEntryWithExpiry)
if !loaded {
e.once.Do(func() {
e.value, e.err = loadFn()
e.timestamp = now
})
} else {
e.once.Do(func() {})
}
return e.value, e.err
}
这种实现确保了:
- 相同key的并发请求只执行一次数据库查询
- 后续请求等待第一个请求完成
- 支持缓存过期机制
- 避免了缓存击穿问题

