Golang中groupcache死锁问题分析与解决

Golang中groupcache死锁问题分析与解决 大家好,

我发现下面的代码存在死锁问题,

程序 1

package main

import (
        "context"
        "fmt"
        "log"
        "time"
        "net/http"
        "github.com/mailgun/groupcache/v2"
)

func main() {
                pool := groupcache.NewHTTPPoolOpts("http://localhost:8380", &groupcache.HTTPPoolOptions{})
                pool.Set("http://localhost:8280")
                server := http.Server{
                        Addr:    "localhost:8380",
                        Handler: pool,
                }
                go func() {
                        log.Printf("Serving....\n")
                        if err := server.ListenAndServe(); err != nil {
                                log.Fatal(err)
                        }
                }()
                defer server.Shutdown(context.Background())
        group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
                func(ctx context.Context, id string, dest groupcache.Sink) error {
                        user := User {
                                Id:             1234,
                                Name:           "Balaji ",
                                Age:            41,
                                IsSuper:        true,
                        }

                        if err := dest.SetProto(&user, time.Now().Add(time.Second*3600)); err != nil {
                                return err
                        }
                        return nil
                },
        ))

        ctx, cancel := context.WithCancel(context.Background())
        for {
            var user User

            defer cancel()

            if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
                log.Fatal(err)
            }

            time.Sleep(time.Second*5)
            fmt.Printf("-- User --\n")
            fmt.Printf("Id: %d\n", user.Id)
            fmt.Printf("Name: %s\n", user.Name)
            fmt.Printf("Age: %d\n", user.Age)
            fmt.Printf("IsSuper: %t\n", user.IsSuper)
        }

}

程序 2

package main

import (
        "context"
        "log"
        "fmt"
        "time"
        "net/http"
        "github.com/mailgun/groupcache/v2"
)

func main() {
                pool := groupcache.NewHTTPPoolOpts("http://localhost:8280", &groupcache.HTTPPoolOptions{})
                pool.Set("http://localhost:8380")
                server := http.Server{
                        Addr:    "localhost:8280",
                        Handler: pool,
                }
                go func() {
                        log.Printf("Serving....\n")
                        if err := server.ListenAndServe(); err != nil {
                                log.Fatal(err)
                        }
                }()
                defer server.Shutdown(context.Background())
        group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
                func(ctx context.Context, id string, dest groupcache.Sink) error {
                        user := User {
                                Id:             1234,
                                Name:           "Balaji ",
                                Age:            41,
                                IsSuper:        true,
                        }

                        if err := dest.SetProto(&user, time.Now().Add(time.Second*3600)); err != nil {
                                return err
                        }
                        return nil
                },
        ))

        fmt.Println(group)
        ctx, cancel := context.WithCancel(context.Background())
        fmt.Println(ctx)
        for {
            time.Sleep(5*time.Second)
            defer cancel()
/*
            if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
                log.Fatal(err)
            }

            time.Sleep(time.Second*5)
            fmt.Printf("-- User --\n")
            fmt.Printf("Id: %d\n", user.Id)
            fmt.Printf("Name: %s\n", user.Name)
            fmt.Printf("Age: %d\n", user.Age)
            fmt.Printf("IsSuper: %t\n", user.IsSuper)
*/
        }

}

如果你看到上面的程序2,我定义了获取器(getter),但在 for{} 循环中我什么也没做。当我先启动程序2,再启动程序1时,程序就挂起了。看起来如果任何节点中的缓存不是热的,它就会进入一个循环或死锁。那么,是不是当我们指定对等节点时,不应该以那种顺序指定?我在这上面卡了很久,非常感谢任何帮助。

在库中添加一些调试信息… 我看到请求在循环。

Prog1:
go run testcache.go user.pb.go
I am here atleast
Getting from library
Sending Peer Request
http://localhost:8280/_groupcache/users/12345
2020/12/31 23:00:55 Serving....
I am here atleast

Prog2:
go run testcache1.go user.pb.go
&{users 0x7e6800 {0 {0 0}} <nil> 3000000 {{{0 0} 0 0 0 0} 0 <nil> 0 0 0} {{{0 0} 0 0 0 0} 0 <nil> 0 0 0} 0xc000020f30 0xc000020f40 0 {0 0 0 0 0 0 0 0 0 0}}
context.Background.WithCancel
2020/12/31 23:00:33 Serving....
I am here atleast
Getting from library
Sending Peer Request
http://localhost:8380/_groupcache/users/12345
I am done

另一个发现是,当我在程序2中移除对等节点设置(peer Set)时,它工作正常。

此致


更多关于Golang中groupcache死锁问题分析与解决的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

更多关于Golang中groupcache死锁问题分析与解决的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这是一个典型的分布式缓存死锁问题,根源在于两个节点相互依赖形成了循环请求。让我分析一下代码中的问题:

问题分析

  1. 循环依赖:两个节点互相设置为对等节点,当缓存未命中时,会向对方请求数据
  2. Getter函数相同:两个节点的Getter都会尝试设置缓存,但都没有实际的数据源
  3. 死锁场景
    • 程序2先启动,缓存为空
    • 程序1启动并向程序2请求数据
    • 程序2的Getter被调用,但程序2又向程序1请求数据
    • 形成无限循环

解决方案

方案1:使用主从架构(推荐)

// 主节点(有实际数据源)
func main() {
    pool := groupcache.NewHTTPPoolOpts("http://localhost:8380", &groupcache.HTTPPoolOptions{})
    // 主节点不设置对等节点,或只设置从节点
    pool.Set("http://localhost:8280")
    
    group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
        func(ctx context.Context, id string, dest groupcache.Sink) error {
            // 这里应该有实际的数据获取逻辑,比如从数据库查询
            user := User{
                Id:      1234,
                Name:    "Balaji",
                Age:     41,
                IsSuper: true,
            }
            return dest.SetProto(&user, time.Now().Add(time.Second*3600))
        },
    ))
    
    // ... 服务器启动代码
}

// 从节点(只缓存,不提供数据源)
func main() {
    pool := groupcache.NewHTTPPoolOpts("http://localhost:8280", &groupcache.HTTPPoolOptions{})
    pool.Set("http://localhost:8380") // 只指向主节点
    
    group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
        func(ctx context.Context, id string, dest groupcache.Sink) error {
            // 从节点不应该有Getter逻辑,或者应该返回错误
            return fmt.Errorf("data not available on this node")
        },
    ))
    
    // ... 服务器启动代码
}

方案2:使用单节点模式

// 如果不需要分布式缓存,可以只用一个节点
func main() {
    // 不设置对等节点
    pool := groupcache.NewHTTPPoolOpts("http://localhost:8380", &groupcache.HTTPPoolOptions{})
    
    group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
        func(ctx context.Context, id string, dest groupcache.Sink) error {
            // 直接返回数据
            user := User{
                Id:      1234,
                Name:    "Balaji",
                Age:     41,
                IsSuper: true,
            }
            return dest.SetProto(&user, time.Now().Add(time.Second*3600))
        },
    ))
    
    // 正常的获取逻辑
    var user User
    if err := group.Get(context.Background(), "12345", groupcache.ProtoSink(&user)); err != nil {
        log.Fatal(err)
    }
}

方案3:避免Getter中的循环调用

// 在Getter中添加防循环逻辑
var requestInProgress = make(map[string]bool)
var mu sync.RWMutex

group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
    func(ctx context.Context, id string, dest groupcache.Sink) error {
        mu.Lock()
        if requestInProgress[id] {
            mu.Unlock()
            return fmt.Errorf("circular request detected for key: %s", id)
        }
        requestInProgress[id] = true
        mu.Unlock()
        
        defer func() {
            mu.Lock()
            delete(requestInProgress, id)
            mu.Unlock()
        }()
        
        // 实际的数据获取逻辑
        user := User{
            Id:      1234,
            Name:    "Balaji",
            Age:     41,
            IsSuper: true,
        }
        return dest.SetProto(&user, time.Now().Add(time.Second*3600))
    },
))

关键修改点

  1. 移除程序2中的对等节点设置
// 在程序2中注释掉这行
// pool.Set("http://localhost:8380")
  1. 修复程序1中的循环逻辑
// 将defer cancel()移到for循环外部
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
    var user User
    // 每次使用新的context或复用
    if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
        log.Printf("Error getting user: %v", err)
        continue
    }
    // ... 处理user
    time.Sleep(time.Second * 5)
}

这个问题的本质是分布式缓存配置错误导致的循环依赖。在groupcache中,对等节点应该形成有向无环图,而不是双向依赖。

回到顶部