Golang中如何为连接添加持久性支持

Golang中如何为连接添加持久性支持 我有一段Go代码,这是一个点对点的程序,它可以建立两个或多个节点之间的连接,并且这些节点可以交换数据。然而,问题在于,目前如果一个节点终止,所有其他节点也会停止工作。因此,针对这个问题,我想为它们添加持久性,这样如果某个节点因为X原因而终止,所有其他节点应该继续工作。

func handleStream(s net.Stream) {

	log.Println("Got a new stream!")

	// Create a buffer stream for non blocking read and write.
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

	go readData(rw)
	go writeData(rw)

	// stream 's' will stay open until you close it (or the other side closes it).
}
// Make a host that listens on the given multiaddress
	ha, err := makeBasicHost(*listenF, *secio, *seed)
	if err != nil {
		log.Fatal(err)
	}

	if *target == "" {
		log.Println("listening for connections")
		// Set a stream handler on host A. /p2p/1.0.0 is
		// a user-defined protocol name.
		ha.SetStreamHandler("/p2p/1.0.0", handleStream)

		select {} // hang forever
		/**** This is where the listener code ends ****/
	} else {
		ha.SetStreamHandler("/p2p/1.0.0", handleStream)

		// The following code extracts target's peer ID from the
		// given multiaddress
		ipfsaddr, err := ma.NewMultiaddr(*target)
		if err != nil {
			log.Fatalln(err)
		}

		pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
		if err != nil {
			log.Fatalln(err)
		}

		peerid, err := peer.IDB58Decode(pid)
		if err != nil {
			log.Fatalln(err)
		}

		// Decapsulate the /ipfs/<peerID> part from the target
		// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
		targetPeerAddr, _ := ma.NewMultiaddr(
			fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
		targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)

		// We have a peer ID and a targetAddr so we add it to the peerstore
		// so LibP2P knows how to contact it
		ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)

		log.Println("opening stream")
		// make a new stream from host B to host A
		// it should be handled on host A by the handler we set above because
		// we use the same /p2p/1.0.0 protocol
		s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
		if err != nil {
			log.Fatalln(err)

提前感谢。


更多关于Golang中如何为连接添加持久性支持的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何为连接添加持久性支持的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


要实现连接持久性,需要处理节点终止时的连接恢复机制。以下是关键改进点:

  1. 添加连接重试逻辑
func connectWithRetry(ha host.Host, peerid peer.ID, targetAddr ma.Multiaddr, maxRetries int) {
    for i := 0; i < maxRetries; i++ {
        ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
        
        s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
        if err == nil {
            go handleStream(s)
            return
        }
        
        log.Printf("连接失败,第%d次重试: %v", i+1, err)
        time.Sleep(time.Duration(i+1) * time.Second) // 指数退避
    }
    log.Println("达到最大重试次数,连接失败")
}
  1. 修改主逻辑使用重试连接
if *target != "" {
    ha.SetStreamHandler("/p2p/1.0.0", handleStream)
    
    ipfsaddr, err := ma.NewMultiaddr(*target)
    if err != nil {
        log.Fatalln(err)
    }
    
    pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
    if err != nil {
        log.Fatalln(err)
    }
    
    peerid, err := peer.IDB58Decode(pid)
    if err != nil {
        log.Fatalln(err)
    }
    
    targetPeerAddr, _ := ma.NewMultiaddr(
        fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
    targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
    
    // 使用带重试的连接
    go connectWithRetry(ha, peerid, targetAddr, 5)
}
  1. 增强流处理以检测断开
func handleStream(s net.Stream) {
    log.Println("Got a new stream!")
    
    rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
    
    // 创建上下文用于控制goroutine
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // 监控连接状态
    go func() {
        <-s.Closed()
        log.Println("连接关闭,准备重连")
        cancel()
        // 这里可以触发重连逻辑
    }()
    
    go readData(ctx, rw)
    go writeData(ctx, rw)
}
  1. 修改读写函数支持上下文
func readData(ctx context.Context, rw *bufio.ReadWriter) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            str, err := rw.ReadString('\n')
            if err != nil {
                log.Printf("读取错误: %v", err)
                return
            }
            if str != "" {
                log.Printf("收到: %s", str)
            }
        }
    }
}
  1. 添加心跳机制
func startHeartbeat(rw *bufio.ReadWriter, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for range ticker.C {
        _, err := rw.WriteString("ping\n")
        if err != nil {
            log.Println("心跳发送失败")
            return
        }
        rw.Flush()
    }
}

这些修改确保节点在连接断开时能够自动重连,同时通过心跳机制检测连接健康状态,实现真正的持久性连接。

回到顶部