使用Golang将NATS Streaming数据存储到PostgreSQL和内存的最佳实践
使用Golang将NATS Streaming数据存储到PostgreSQL和内存的最佳实践
你好。我使用带有 -store sql -sql_driver postgres -sql_source 标志的 nats-streaming-server 将数据存储在 postgres 中。
我有一项任务需要将数据写入 postgres 和 内存,但 nats-streaming-server 只允许将数据写入 postgres 或 内存。如何将数据同时写入存储和内存?谢谢
1 回复
更多关于使用Golang将NATS Streaming数据存储到PostgreSQL和内存的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
一种常见的解决方案是使用双写模式,在应用层同时写入两个存储。以下是示例实现:
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/nats-io/stan.go"
_ "github.com/lib/pq"
)
type StorageManager struct {
db *sql.DB
memoryStore *sync.Map
mu sync.RWMutex
}
func NewStorageManager(dbConnStr string) (*StorageManager, error) {
db, err := sql.Open("postgres", dbConnStr)
if err != nil {
return nil, err
}
return &StorageManager{
db: db,
memoryStore: &sync.Map{},
}, nil
}
func (sm *StorageManager) StoreMessage(channel string, msg *stan.Msg) error {
// 准备数据
data := map[string]interface{}{
"sequence": msg.Sequence,
"timestamp": msg.Timestamp,
"data": msg.Data,
"redelivered": msg.Redelivered,
}
jsonData, err := json.Marshal(data)
if err != nil {
return err
}
// 并发写入两个存储
var wg sync.WaitGroup
wg.Add(2)
// 写入PostgreSQL
go func() {
defer wg.Done()
err := sm.writeToPostgres(channel, msg.Sequence, jsonData)
if err != nil {
log.Printf("PostgreSQL写入失败: %v", err)
}
}()
// 写入内存
go func() {
defer wg.Done()
sm.writeToMemory(channel, msg.Sequence, data)
}()
wg.Wait()
return nil
}
func (sm *StorageManager) writeToPostgres(channel string, seq uint64, data []byte) error {
query := `INSERT INTO nats_messages (channel, sequence, data, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (channel, sequence) DO UPDATE SET data = $3`
_, err := sm.db.Exec(query, channel, seq, data, time.Now())
return err
}
func (sm *StorageManager) writeToMemory(channel string, seq uint64, data map[string]interface{}) {
key := fmt.Sprintf("%s:%d", channel, seq)
sm.memoryStore.Store(key, data)
}
func (sm *StorageManager) GetFromMemory(channel string, seq uint64) (interface{}, bool) {
key := fmt.Sprintf("%s:%d", channel, seq)
return sm.memoryStore.Load(key)
}
func (sm *StorageManager) GetFromPostgres(channel string, seq uint64) ([]byte, error) {
var data []byte
query := `SELECT data FROM nats_messages WHERE channel = $1 AND sequence = $2`
err := sm.db.QueryRow(query, channel, seq).Scan(&data)
return data, err
}
func main() {
// 初始化存储管理器
storage, err := NewStorageManager("host=localhost port=5432 user=postgres password=secret dbname=nats sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer storage.db.Close()
// 连接到NATS Streaming
sc, err := stan.Connect("test-cluster", "client-123", stan.NatsURL("nats://localhost:4222"))
if err != nil {
log.Fatal(err)
}
defer sc.Close()
// 订阅消息并存储
_, err = sc.Subscribe("orders", func(msg *stan.Msg) {
err := storage.StoreMessage("orders", msg)
if err != nil {
log.Printf("存储消息失败: %v", err)
} else {
log.Printf("消息已存储到PostgreSQL和内存: 序列号=%d", msg.Sequence)
}
}, stan.DeliverAllAvailable())
if err != nil {
log.Fatal(err)
}
// 保持运行
select {}
}
PostgreSQL表结构:
CREATE TABLE nats_messages (
id SERIAL PRIMARY KEY,
channel VARCHAR(255) NOT NULL,
sequence BIGINT NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(channel, sequence)
);
CREATE INDEX idx_nats_messages_channel_seq ON nats_messages(channel, sequence);
如果需要更高级的同步机制,可以考虑使用通道缓冲:
type MessageBuffer struct {
pgChan chan *StorageRequest
memChan chan *StorageRequest
storage *StorageManager
}
type StorageRequest struct {
Channel string
Msg *stan.Msg
Data []byte
}
func NewMessageBuffer(storage *StorageManager, bufferSize int) *MessageBuffer {
mb := &MessageBuffer{
pgChan: make(chan *StorageRequest, bufferSize),
memChan: make(chan *StorageRequest, bufferSize),
storage: storage,
}
// 启动处理协程
go mb.processPostgresWrites()
go mb.processMemoryWrites()
return mb
}
func (mb *MessageBuffer) QueueMessage(channel string, msg *stan.Msg, data []byte) {
req := &StorageRequest{
Channel: channel,
Msg: msg,
Data: data,
}
mb.pgChan <- req
mb.memChan <- req
}
func (mb *MessageBuffer) processPostgresWrites() {
for req := range mb.pgChan {
err := mb.storage.writeToPostgres(req.Channel, req.Msg.Sequence, req.Data)
if err != nil {
log.Printf("PostgreSQL异步写入失败: %v", err)
}
}
}
这种模式允许你在应用层同时维护两个存储,确保数据在PostgreSQL和内存中都有一份副本。

