Golang中如何在使用互斥锁时通过RPC调用传递map
Golang中如何在使用互斥锁时通过RPC调用传递map
我一直在尝试编写一个键值服务器,该服务器会定期向一个维护主备k/v服务器的视图服务发送ping请求。我不断收到fatal error: concurrent map writes错误,我认为这是由于我使用映射和互斥锁的方式导致的:
func (pb *PBServer) Transfer(args *TransferArgs, reply *TransferReply) error {
pb.mu.Lock()
defer pb.mu.Unlock()
if pb.curview.Backup != pb.me {
reply.Err = ErrWrongServer
return nil
}
for k, v := range args.Database {
pb.data[k] = {v.name}
}
reply.Err = OK
return nil
}
// ping viewserver periodically.
func (pb *PBServer) tick() {
pb.mu.Lock()
defer pb.mu.Unlock()
for {
t, err := pb.vs.Ping(pb.curview.Viewnum)
if err != nil {
continue
}
pb.curview = t
if pb.me == pb.curview.Primary && pb.curview.Backup != "" {
args := &TransferArgs{pb.data}
var reply TransferReply
ok := call(pb.curview.Backup, "PBServer.Transfer", args, &reply)
if ok == false || reply.Err != OK {
continue
} else {
break
}
} else {
break
}
}
}
我使用这个call函数来执行我的rpc调用
func call(srv string, rpcname string,
args interface{}, reply interface{}) bool {
c, errx := rpc.Dial("unix", srv)
if errx != nil {
return false
}
defer c.Close()
err := c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
我怀疑可能是我试图将pb.data映射传递给Transfer的方式导致了concurrent map writes错误,但我不太确定。我是否需要做一个深拷贝,并将深拷贝传递给args,像&TransferArgs{deepcopy}这样?我应该使用指针吗?
更多关于Golang中如何在使用互斥锁时通过RPC调用传递map的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于Golang中如何在使用互斥锁时通过RPC调用传递map的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
问题出在你在持有互斥锁的情况下执行RPC调用,这会导致死锁或并发问题。当tick()函数持有pb.mu锁并调用call()时,如果RPC调用需要访问同一个映射(比如在接收方Transfer方法中),就会产生竞争条件。
以下是修正后的代码:
func (pb *PBServer) Transfer(args *TransferArgs, reply *TransferReply) error {
pb.mu.Lock()
defer pb.mu.Unlock()
if pb.curview.Backup != pb.me {
reply.Err = ErrWrongServer
return nil
}
// 直接复制映射内容
for k, v := range args.Database {
pb.data[k] = v
}
reply.Err = OK
return nil
}
// ping viewserver periodically.
func (pb *PBServer) tick() {
pb.mu.Lock()
// 在释放锁之前复制需要的数据
var dataCopy map[string]string
var backup string
if pb.me == pb.curview.Primary && pb.curview.Backup != "" {
// 创建数据的深拷贝
dataCopy = make(map[string]string)
for k, v := range pb.data {
dataCopy[k] = v
}
backup = pb.curview.Backup
}
pb.mu.Unlock() // 尽早释放锁
if backup != "" {
for {
args := &TransferArgs{Database: dataCopy}
var reply TransferReply
ok := call(backup, "PBServer.Transfer", args, &reply)
if ok == false || reply.Err != OK {
continue
} else {
break
}
}
}
// 更新视图需要重新加锁
pb.mu.Lock()
defer pb.mu.Unlock()
t, err := pb.vs.Ping(pb.curview.Viewnum)
if err == nil {
pb.curview = t
}
}
关键修改:
- 在
tick()中,先获取锁并复制需要的数据,然后立即释放锁 - 在无锁状态下执行RPC调用
- 创建数据的深拷贝传递给RPC参数,避免并发访问原始映射
- 将视图更新操作移到RPC调用之后,并重新加锁保护
对于TransferArgs结构体,确保它包含映射的副本而不是原始映射的引用:
type TransferArgs struct {
Database map[string]string
}
这样修改后,主服务器在发送数据时不会持有锁,备份服务器在接收数据时正常加锁,避免了并发映射写入的问题。

