Golang中如何实现数据持久化
Golang中如何实现数据持久化
我正在使用Go语言构建一个点对点网络,代码位于 https://pastebin.com/wize4F80。这段代码的目的是连接一些节点并交换数据,但是当一个节点终止时,所有其他节点都会停止工作。
如何使其具有持久性,以便即使一个节点关闭,其他节点也能继续工作?
2 回复
你好 @thecow_milk,我对 libp2p 不熟悉,从代码上看不出它具体做了什么,或者问题出在哪里。
问题:
- 当一个节点终止时,其他节点究竟是如何停止工作的?你会收到错误信息吗?它们是静默终止的吗?(我怀疑后者会发生,因为如果不出错,main() 函数末尾的 select{} 应该会让程序永远运行下去。)
- 这里使用了哪些库?(例如,我看到了一个名为 host 的包但没有导入语句)
- writeData() 和 readData() 函数是做什么的?(代码片段中没有包含)
- 你是否查阅了所用包的文档,看看它们是否描述了当一个节点停止发送数据时其他节点的行为?
- 第一个节点是如何被终止的?这种终止机制是否也有可能意外地导致其他节点停止工作?
更多关于Golang中如何实现数据持久化的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Go中实现数据持久化和节点容错,需要结合持久化存储和连接重试机制。以下是针对你代码的改进方案:
1. 数据持久化存储
package main
import (
"encoding/json"
"os"
"sync"
)
type PersistentStore struct {
mu sync.RWMutex
data map[string]interface{}
file string
}
func NewPersistentStore(filename string) *PersistentStore {
store := &PersistentStore{
data: make(map[string]interface{}),
file: filename,
}
store.load()
return store
}
func (s *PersistentStore) Set(key string, value interface{}) error {
s.mu.Lock()
defer s.mu.Unlock()
s.data[key] = value
return s.save()
}
func (s *PersistentStore) Get(key string) (interface{}, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
val, ok := s.data[key]
return val, ok
}
func (s *PersistentStore) save() error {
data, err := json.Marshal(s.data)
if err != nil {
return err
}
return os.WriteFile(s.file, data, 0644)
}
func (s *PersistentStore) load() error {
data, err := os.ReadFile(s.file)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return json.Unmarshal(data, &s.data)
}
2. 节点连接容错处理
package main
import (
"context"
"fmt"
"net"
"time"
)
type ResilientNode struct {
address string
store *PersistentStore
connections map[string]net.Conn
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
func NewResilientNode(addr string, store *PersistentStore) *ResilientNode {
ctx, cancel := context.WithCancel(context.Background())
return &ResilientNode{
address: addr,
store: store,
connections: make(map[string]net.Conn),
ctx: ctx,
cancel: cancel,
}
}
func (n *ResilientNode) ConnectToNode(peerAddr string) error {
go n.maintainConnection(peerAddr)
return nil
}
func (n *ResilientNode) maintainConnection(peerAddr string) {
retryInterval := time.Second
maxRetryInterval := time.Minute
for {
select {
case <-n.ctx.Done():
return
default:
conn, err := net.DialTimeout("tcp", peerAddr, 10*time.Second)
if err != nil {
fmt.Printf("Failed to connect to %s: %v, retrying in %v\n",
peerAddr, err, retryInterval)
time.Sleep(retryInterval)
// Exponential backoff
retryInterval *= 2
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
}
continue
}
// Reset retry interval on successful connection
retryInterval = time.Second
n.mu.Lock()
n.connections[peerAddr] = conn
n.mu.Unlock()
// Store connection info
n.store.Set(fmt.Sprintf("peer_%s", peerAddr),
map[string]interface{}{
"address": peerAddr,
"connected": true,
"timestamp": time.Now().Unix(),
})
// Handle connection
n.handleConnection(conn, peerAddr)
// Connection lost, retry
n.mu.Lock()
delete(n.connections, peerAddr)
n.mu.Unlock()
}
}
}
func (n *ResilientNode) handleConnection(conn net.Conn, peerAddr string) {
defer conn.Close()
buffer := make([]byte, 1024)
for {
select {
case <-n.ctx.Done():
return
default:
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
n, err := conn.Read(buffer)
if err != nil {
fmt.Printf("Connection to %s lost: %v\n", peerAddr, err)
return
}
// Process received data
data := buffer[:n]
n.store.Set(fmt.Sprintf("data_from_%s", peerAddr),
map[string]interface{}{
"data": string(data),
"timestamp": time.Now().Unix(),
})
// Echo back or process data
conn.Write([]byte("ACK: " + string(data)))
}
}
}
func (n *ResilientNode) Broadcast(data []byte) {
n.mu.RLock()
defer n.mu.RUnlock()
for addr, conn := range n.connections {
go func(addr string, conn net.Conn) {
_, err := conn.Write(data)
if err != nil {
fmt.Printf("Failed to send to %s: %v\n", addr, err)
n.mu.Lock()
delete(n.connections, addr)
n.mu.Unlock()
}
}(addr, conn)
}
}
func (n *ResilientNode) Shutdown() {
n.cancel()
n.mu.Lock()
defer n.mu.Unlock()
for _, conn := range n.connections {
conn.Close()
}
}
3. 主程序集成示例
package main
import (
"fmt"
"net"
"sync"
"time"
)
func main() {
// 初始化持久化存储
store := NewPersistentStore("node_data.json")
// 启动节点
node := NewResilientNode(":8080", store)
// 恢复之前的连接
if peers, ok := store.Get("peers"); ok {
if peerList, ok := peers.([]string); ok {
for _, peer := range peerList {
go node.ConnectToNode(peer)
}
}
}
// 启动监听
go node.startServer()
// 示例:定期广播数据
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
node.Broadcast([]byte(fmt.Sprintf("Heartbeat at %v", time.Now())))
}
}
func (n *ResilientNode) startServer() {
listener, err := net.Listen("tcp", n.address)
if err != nil {
panic(err)
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go n.handleIncomingConnection(conn)
}
}
func (n *ResilientNode) handleIncomingConnection(conn net.Conn) {
defer conn.Close()
remoteAddr := conn.RemoteAddr().String()
n.mu.Lock()
n.connections[remoteAddr] = conn
n.mu.Unlock()
// 存储连接信息
n.store.Set(fmt.Sprintf("peer_%s", remoteAddr),
map[string]interface{}{
"address": remoteAddr,
"connected": true,
"timestamp": time.Now().Unix(),
})
// 更新对等节点列表
n.updatePeerList(remoteAddr)
// 处理连接
n.handleConnection(conn, remoteAddr)
}
func (n *ResilientNode) updatePeerList(newPeer string) {
var peers []string
if existing, ok := n.store.Get("peers"); ok {
if list, ok := existing.([]string); ok {
peers = list
}
}
// 检查是否已存在
for _, peer := range peers {
if peer == newPeer {
return
}
}
peers = append(peers, newPeer)
n.store.Set("peers", peers)
}
关键改进点:
- 持久化存储:使用JSON文件保存节点状态和接收的数据
- 连接重试机制:节点断开后自动重连,使用指数退避策略
- 连接管理:独立管理每个连接,一个节点断开不影响其他连接
- 数据恢复:重启时从持久化存储恢复连接信息
- 并发安全:使用读写锁保护共享数据
这个实现确保了即使一个节点关闭,其他节点也能继续工作,并在节点恢复后自动重新建立连接。

