Golang中如何在使用互斥锁时通过RPC调用传递map

Golang中如何在使用互斥锁时通过RPC调用传递map 我一直在尝试编写一个键值服务器,该服务器会定期向一个维护主备k/v服务器的视图服务发送ping请求。我不断收到fatal error: concurrent map writes错误,我认为这是由于我使用映射和互斥锁的方式导致的:

func (pb *PBServer) Transfer(args *TransferArgs, reply *TransferReply) error {
	pb.mu.Lock()
	defer pb.mu.Unlock()

	if pb.curview.Backup != pb.me {
		reply.Err = ErrWrongServer
		return nil
	}
	for k, v := range args.Database {
		pb.data[k] = {v.name}
	}
	reply.Err = OK
	return nil
}

// ping viewserver periodically.
func (pb *PBServer) tick() {
	pb.mu.Lock()
	defer pb.mu.Unlock()

	for {
		t, err := pb.vs.Ping(pb.curview.Viewnum)
		if err != nil {
			continue
		}
		pb.curview = t

		if pb.me == pb.curview.Primary && pb.curview.Backup != "" {
			args := &TransferArgs{pb.data}
			var reply TransferReply
			ok := call(pb.curview.Backup, "PBServer.Transfer", args, &reply)
		
			if ok == false || reply.Err != OK {
				continue
			} else {
				break
			}
		} else {
			break
		}
	}
}

我使用这个call函数来执行我的rpc调用

func call(srv string, rpcname string,
	args interface{}, reply interface{}) bool {
	c, errx := rpc.Dial("unix", srv)
	if errx != nil {
		return false
	}
	defer c.Close()

	err := c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

我怀疑可能是我试图将pb.data映射传递给Transfer的方式导致了concurrent map writes错误,但我不太确定。我是否需要做一个深拷贝,并将深拷贝传递给args,像&TransferArgs{deepcopy}这样?我应该使用指针吗?


更多关于Golang中如何在使用互斥锁时通过RPC调用传递map的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何在使用互斥锁时通过RPC调用传递map的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


问题出在你在持有互斥锁的情况下执行RPC调用,这会导致死锁或并发问题。当tick()函数持有pb.mu锁并调用call()时,如果RPC调用需要访问同一个映射(比如在接收方Transfer方法中),就会产生竞争条件。

以下是修正后的代码:

func (pb *PBServer) Transfer(args *TransferArgs, reply *TransferReply) error {
    pb.mu.Lock()
    defer pb.mu.Unlock()

    if pb.curview.Backup != pb.me {
        reply.Err = ErrWrongServer
        return nil
    }
    
    // 直接复制映射内容
    for k, v := range args.Database {
        pb.data[k] = v
    }
    reply.Err = OK
    return nil
}

// ping viewserver periodically.
func (pb *PBServer) tick() {
    pb.mu.Lock()
    
    // 在释放锁之前复制需要的数据
    var dataCopy map[string]string
    var backup string
    
    if pb.me == pb.curview.Primary && pb.curview.Backup != "" {
        // 创建数据的深拷贝
        dataCopy = make(map[string]string)
        for k, v := range pb.data {
            dataCopy[k] = v
        }
        backup = pb.curview.Backup
    }
    
    pb.mu.Unlock()  // 尽早释放锁

    if backup != "" {
        for {
            args := &TransferArgs{Database: dataCopy}
            var reply TransferReply
            ok := call(backup, "PBServer.Transfer", args, &reply)
            
            if ok == false || reply.Err != OK {
                continue
            } else {
                break
            }
        }
    }
    
    // 更新视图需要重新加锁
    pb.mu.Lock()
    defer pb.mu.Unlock()
    
    t, err := pb.vs.Ping(pb.curview.Viewnum)
    if err == nil {
        pb.curview = t
    }
}

关键修改:

  1. tick()中,先获取锁并复制需要的数据,然后立即释放锁
  2. 在无锁状态下执行RPC调用
  3. 创建数据的深拷贝传递给RPC参数,避免并发访问原始映射
  4. 将视图更新操作移到RPC调用之后,并重新加锁保护

对于TransferArgs结构体,确保它包含映射的副本而不是原始映射的引用:

type TransferArgs struct {
    Database map[string]string
}

这样修改后,主服务器在发送数据时不会持有锁,备份服务器在接收数据时正常加锁,避免了并发映射写入的问题。

回到顶部