Golang中如何实现数据持久化

Golang中如何实现数据持久化 我正在使用Go语言构建一个点对点网络,代码位于 https://pastebin.com/wize4F80。这段代码的目的是连接一些节点并交换数据,但是当一个节点终止时,所有其他节点都会停止工作。

如何使其具有持久性,以便即使一个节点关闭,其他节点也能继续工作?

2 回复

你好 @thecow_milk,我对 libp2p 不熟悉,从代码上看不出它具体做了什么,或者问题出在哪里。

问题:

  • 当一个节点终止时,其他节点究竟是如何停止工作的?你会收到错误信息吗?它们是静默终止的吗?(我怀疑后者会发生,因为如果不出错,main() 函数末尾的 select{} 应该会让程序永远运行下去。)
  • 这里使用了哪些库?(例如,我看到了一个名为 host 的包但没有导入语句)
  • writeData() 和 readData() 函数是做什么的?(代码片段中没有包含)
  • 你是否查阅了所用包的文档,看看它们是否描述了当一个节点停止发送数据时其他节点的行为?
  • 第一个节点是如何被终止的?这种终止机制是否也有可能意外地导致其他节点停止工作?

更多关于Golang中如何实现数据持久化的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现数据持久化和节点容错,需要结合持久化存储和连接重试机制。以下是针对你代码的改进方案:

1. 数据持久化存储

package main

import (
    "encoding/json"
    "os"
    "sync"
)

type PersistentStore struct {
    mu    sync.RWMutex
    data  map[string]interface{}
    file  string
}

func NewPersistentStore(filename string) *PersistentStore {
    store := &PersistentStore{
        data: make(map[string]interface{}),
        file: filename,
    }
    store.load()
    return store
}

func (s *PersistentStore) Set(key string, value interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    s.data[key] = value
    return s.save()
}

func (s *PersistentStore) Get(key string) (interface{}, bool) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    val, ok := s.data[key]
    return val, ok
}

func (s *PersistentStore) save() error {
    data, err := json.Marshal(s.data)
    if err != nil {
        return err
    }
    return os.WriteFile(s.file, data, 0644)
}

func (s *PersistentStore) load() error {
    data, err := os.ReadFile(s.file)
    if err != nil {
        if os.IsNotExist(err) {
            return nil
        }
        return err
    }
    return json.Unmarshal(data, &s.data)
}

2. 节点连接容错处理

package main

import (
    "context"
    "fmt"
    "net"
    "time"
)

type ResilientNode struct {
    address     string
    store       *PersistentStore
    connections map[string]net.Conn
    mu          sync.RWMutex
    ctx         context.Context
    cancel      context.CancelFunc
}

func NewResilientNode(addr string, store *PersistentStore) *ResilientNode {
    ctx, cancel := context.WithCancel(context.Background())
    return &ResilientNode{
        address:     addr,
        store:       store,
        connections: make(map[string]net.Conn),
        ctx:         ctx,
        cancel:      cancel,
    }
}

func (n *ResilientNode) ConnectToNode(peerAddr string) error {
    go n.maintainConnection(peerAddr)
    return nil
}

func (n *ResilientNode) maintainConnection(peerAddr string) {
    retryInterval := time.Second
    maxRetryInterval := time.Minute
    
    for {
        select {
        case <-n.ctx.Done():
            return
        default:
            conn, err := net.DialTimeout("tcp", peerAddr, 10*time.Second)
            if err != nil {
                fmt.Printf("Failed to connect to %s: %v, retrying in %v\n", 
                    peerAddr, err, retryInterval)
                time.Sleep(retryInterval)
                
                // Exponential backoff
                retryInterval *= 2
                if retryInterval > maxRetryInterval {
                    retryInterval = maxRetryInterval
                }
                continue
            }
            
            // Reset retry interval on successful connection
            retryInterval = time.Second
            
            n.mu.Lock()
            n.connections[peerAddr] = conn
            n.mu.Unlock()
            
            // Store connection info
            n.store.Set(fmt.Sprintf("peer_%s", peerAddr), 
                map[string]interface{}{
                    "address":   peerAddr,
                    "connected": true,
                    "timestamp": time.Now().Unix(),
                })
            
            // Handle connection
            n.handleConnection(conn, peerAddr)
            
            // Connection lost, retry
            n.mu.Lock()
            delete(n.connections, peerAddr)
            n.mu.Unlock()
        }
    }
}

func (n *ResilientNode) handleConnection(conn net.Conn, peerAddr string) {
    defer conn.Close()
    
    buffer := make([]byte, 1024)
    for {
        select {
        case <-n.ctx.Done():
            return
        default:
            conn.SetReadDeadline(time.Now().Add(30 * time.Second))
            n, err := conn.Read(buffer)
            if err != nil {
                fmt.Printf("Connection to %s lost: %v\n", peerAddr, err)
                return
            }
            
            // Process received data
            data := buffer[:n]
            n.store.Set(fmt.Sprintf("data_from_%s", peerAddr), 
                map[string]interface{}{
                    "data":      string(data),
                    "timestamp": time.Now().Unix(),
                })
            
            // Echo back or process data
            conn.Write([]byte("ACK: " + string(data)))
        }
    }
}

func (n *ResilientNode) Broadcast(data []byte) {
    n.mu.RLock()
    defer n.mu.RUnlock()
    
    for addr, conn := range n.connections {
        go func(addr string, conn net.Conn) {
            _, err := conn.Write(data)
            if err != nil {
                fmt.Printf("Failed to send to %s: %v\n", addr, err)
                n.mu.Lock()
                delete(n.connections, addr)
                n.mu.Unlock()
            }
        }(addr, conn)
    }
}

func (n *ResilientNode) Shutdown() {
    n.cancel()
    n.mu.Lock()
    defer n.mu.Unlock()
    
    for _, conn := range n.connections {
        conn.Close()
    }
}

3. 主程序集成示例

package main

import (
    "fmt"
    "net"
    "sync"
    "time"
)

func main() {
    // 初始化持久化存储
    store := NewPersistentStore("node_data.json")
    
    // 启动节点
    node := NewResilientNode(":8080", store)
    
    // 恢复之前的连接
    if peers, ok := store.Get("peers"); ok {
        if peerList, ok := peers.([]string); ok {
            for _, peer := range peerList {
                go node.ConnectToNode(peer)
            }
        }
    }
    
    // 启动监听
    go node.startServer()
    
    // 示例:定期广播数据
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        node.Broadcast([]byte(fmt.Sprintf("Heartbeat at %v", time.Now())))
    }
}

func (n *ResilientNode) startServer() {
    listener, err := net.Listen("tcp", n.address)
    if err != nil {
        panic(err)
    }
    defer listener.Close()
    
    for {
        conn, err := listener.Accept()
        if err != nil {
            continue
        }
        
        go n.handleIncomingConnection(conn)
    }
}

func (n *ResilientNode) handleIncomingConnection(conn net.Conn) {
    defer conn.Close()
    
    remoteAddr := conn.RemoteAddr().String()
    
    n.mu.Lock()
    n.connections[remoteAddr] = conn
    n.mu.Unlock()
    
    // 存储连接信息
    n.store.Set(fmt.Sprintf("peer_%s", remoteAddr), 
        map[string]interface{}{
            "address":   remoteAddr,
            "connected": true,
            "timestamp": time.Now().Unix(),
        })
    
    // 更新对等节点列表
    n.updatePeerList(remoteAddr)
    
    // 处理连接
    n.handleConnection(conn, remoteAddr)
}

func (n *ResilientNode) updatePeerList(newPeer string) {
    var peers []string
    if existing, ok := n.store.Get("peers"); ok {
        if list, ok := existing.([]string); ok {
            peers = list
        }
    }
    
    // 检查是否已存在
    for _, peer := range peers {
        if peer == newPeer {
            return
        }
    }
    
    peers = append(peers, newPeer)
    n.store.Set("peers", peers)
}

关键改进点:

  1. 持久化存储:使用JSON文件保存节点状态和接收的数据
  2. 连接重试机制:节点断开后自动重连,使用指数退避策略
  3. 连接管理:独立管理每个连接,一个节点断开不影响其他连接
  4. 数据恢复:重启时从持久化存储恢复连接信息
  5. 并发安全:使用读写锁保护共享数据

这个实现确保了即使一个节点关闭,其他节点也能继续工作,并在节点恢复后自动重新建立连接。

回到顶部