Golang中如何为连接添加持久性支持
Golang中如何为连接添加持久性支持 我有一段Go代码,这是一个点对点的程序,它可以建立两个或多个节点之间的连接,并且这些节点可以交换数据。然而,问题在于,目前如果一个节点终止,所有其他节点也会停止工作。因此,针对这个问题,我想为它们添加持久性,这样如果某个节点因为X原因而终止,所有其他节点应该继续工作。
func handleStream(s net.Stream) {
log.Println("Got a new stream!")
// Create a buffer stream for non blocking read and write.
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go readData(rw)
go writeData(rw)
// stream 's' will stay open until you close it (or the other side closes it).
}
// Make a host that listens on the given multiaddress
ha, err := makeBasicHost(*listenF, *secio, *seed)
if err != nil {
log.Fatal(err)
}
if *target == "" {
log.Println("listening for connections")
// Set a stream handler on host A. /p2p/1.0.0 is
// a user-defined protocol name.
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
select {} // hang forever
/**** This is where the listener code ends ****/
} else {
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
// The following code extracts target's peer ID from the
// given multiaddress
ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)
if err != nil {
log.Fatalln(err)
}
// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
// We have a peer ID and a targetAddr so we add it to the peerstore
// so LibP2P knows how to contact it
ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
log.Println("opening stream")
// make a new stream from host B to host A
// it should be handled on host A by the handler we set above because
// we use the same /p2p/1.0.0 protocol
s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
if err != nil {
log.Fatalln(err)
提前感谢。
更多关于Golang中如何为连接添加持久性支持的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang中如何为连接添加持久性支持的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
要实现连接持久性,需要处理节点终止时的连接恢复机制。以下是关键改进点:
- 添加连接重试逻辑:
func connectWithRetry(ha host.Host, peerid peer.ID, targetAddr ma.Multiaddr, maxRetries int) {
for i := 0; i < maxRetries; i++ {
ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
if err == nil {
go handleStream(s)
return
}
log.Printf("连接失败,第%d次重试: %v", i+1, err)
time.Sleep(time.Duration(i+1) * time.Second) // 指数退避
}
log.Println("达到最大重试次数,连接失败")
}
- 修改主逻辑使用重试连接:
if *target != "" {
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)
if err != nil {
log.Fatalln(err)
}
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
// 使用带重试的连接
go connectWithRetry(ha, peerid, targetAddr, 5)
}
- 增强流处理以检测断开:
func handleStream(s net.Stream) {
log.Println("Got a new stream!")
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
// 创建上下文用于控制goroutine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 监控连接状态
go func() {
<-s.Closed()
log.Println("连接关闭,准备重连")
cancel()
// 这里可以触发重连逻辑
}()
go readData(ctx, rw)
go writeData(ctx, rw)
}
- 修改读写函数支持上下文:
func readData(ctx context.Context, rw *bufio.ReadWriter) {
for {
select {
case <-ctx.Done():
return
default:
str, err := rw.ReadString('\n')
if err != nil {
log.Printf("读取错误: %v", err)
return
}
if str != "" {
log.Printf("收到: %s", str)
}
}
}
}
- 添加心跳机制:
func startHeartbeat(rw *bufio.ReadWriter, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
_, err := rw.WriteString("ping\n")
if err != nil {
log.Println("心跳发送失败")
return
}
rw.Flush()
}
}
这些修改确保节点在连接断开时能够自动重连,同时通过心跳机制检测连接健康状态,实现真正的持久性连接。

