使用Golang将NATS Streaming数据存储到PostgreSQL和内存的最佳实践

使用Golang将NATS Streaming数据存储到PostgreSQL和内存的最佳实践 你好。我使用带有 -store sql -sql_driver postgres -sql_source 标志的 nats-streaming-server 将数据存储在 postgres 中。 我有一项任务需要将数据写入 postgres 内存,但 nats-streaming-server 只允许将数据写入 postgres 内存。如何将数据同时写入存储和内存?谢谢

1 回复

更多关于使用Golang将NATS Streaming数据存储到PostgreSQL和内存的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


一种常见的解决方案是使用双写模式,在应用层同时写入两个存储。以下是示例实现:

package main

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/nats-io/stan.go"
    _ "github.com/lib/pq"
)

type StorageManager struct {
    db        *sql.DB
    memoryStore *sync.Map
    mu        sync.RWMutex
}

func NewStorageManager(dbConnStr string) (*StorageManager, error) {
    db, err := sql.Open("postgres", dbConnStr)
    if err != nil {
        return nil, err
    }

    return &StorageManager{
        db:          db,
        memoryStore: &sync.Map{},
    }, nil
}

func (sm *StorageManager) StoreMessage(channel string, msg *stan.Msg) error {
    // 准备数据
    data := map[string]interface{}{
        "sequence":    msg.Sequence,
        "timestamp":   msg.Timestamp,
        "data":        msg.Data,
        "redelivered": msg.Redelivered,
    }

    jsonData, err := json.Marshal(data)
    if err != nil {
        return err
    }

    // 并发写入两个存储
    var wg sync.WaitGroup
    wg.Add(2)

    // 写入PostgreSQL
    go func() {
        defer wg.Done()
        err := sm.writeToPostgres(channel, msg.Sequence, jsonData)
        if err != nil {
            log.Printf("PostgreSQL写入失败: %v", err)
        }
    }()

    // 写入内存
    go func() {
        defer wg.Done()
        sm.writeToMemory(channel, msg.Sequence, data)
    }()

    wg.Wait()
    return nil
}

func (sm *StorageManager) writeToPostgres(channel string, seq uint64, data []byte) error {
    query := `INSERT INTO nats_messages (channel, sequence, data, created_at) 
              VALUES ($1, $2, $3, $4) 
              ON CONFLICT (channel, sequence) DO UPDATE SET data = $3`

    _, err := sm.db.Exec(query, channel, seq, data, time.Now())
    return err
}

func (sm *StorageManager) writeToMemory(channel string, seq uint64, data map[string]interface{}) {
    key := fmt.Sprintf("%s:%d", channel, seq)
    sm.memoryStore.Store(key, data)
}

func (sm *StorageManager) GetFromMemory(channel string, seq uint64) (interface{}, bool) {
    key := fmt.Sprintf("%s:%d", channel, seq)
    return sm.memoryStore.Load(key)
}

func (sm *StorageManager) GetFromPostgres(channel string, seq uint64) ([]byte, error) {
    var data []byte
    query := `SELECT data FROM nats_messages WHERE channel = $1 AND sequence = $2`
    err := sm.db.QueryRow(query, channel, seq).Scan(&data)
    return data, err
}

func main() {
    // 初始化存储管理器
    storage, err := NewStorageManager("host=localhost port=5432 user=postgres password=secret dbname=nats sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer storage.db.Close()

    // 连接到NATS Streaming
    sc, err := stan.Connect("test-cluster", "client-123", stan.NatsURL("nats://localhost:4222"))
    if err != nil {
        log.Fatal(err)
    }
    defer sc.Close()

    // 订阅消息并存储
    _, err = sc.Subscribe("orders", func(msg *stan.Msg) {
        err := storage.StoreMessage("orders", msg)
        if err != nil {
            log.Printf("存储消息失败: %v", err)
        } else {
            log.Printf("消息已存储到PostgreSQL和内存: 序列号=%d", msg.Sequence)
        }
    }, stan.DeliverAllAvailable())
    
    if err != nil {
        log.Fatal(err)
    }

    // 保持运行
    select {}
}

PostgreSQL表结构:

CREATE TABLE nats_messages (
    id SERIAL PRIMARY KEY,
    channel VARCHAR(255) NOT NULL,
    sequence BIGINT NOT NULL,
    data JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(channel, sequence)
);

CREATE INDEX idx_nats_messages_channel_seq ON nats_messages(channel, sequence);

如果需要更高级的同步机制,可以考虑使用通道缓冲:

type MessageBuffer struct {
    pgChan chan *StorageRequest
    memChan chan *StorageRequest
    storage *StorageManager
}

type StorageRequest struct {
    Channel string
    Msg     *stan.Msg
    Data    []byte
}

func NewMessageBuffer(storage *StorageManager, bufferSize int) *MessageBuffer {
    mb := &MessageBuffer{
        pgChan:  make(chan *StorageRequest, bufferSize),
        memChan: make(chan *StorageRequest, bufferSize),
        storage: storage,
    }
    
    // 启动处理协程
    go mb.processPostgresWrites()
    go mb.processMemoryWrites()
    
    return mb
}

func (mb *MessageBuffer) QueueMessage(channel string, msg *stan.Msg, data []byte) {
    req := &StorageRequest{
        Channel: channel,
        Msg:     msg,
        Data:    data,
    }
    
    mb.pgChan <- req
    mb.memChan <- req
}

func (mb *MessageBuffer) processPostgresWrites() {
    for req := range mb.pgChan {
        err := mb.storage.writeToPostgres(req.Channel, req.Msg.Sequence, req.Data)
        if err != nil {
            log.Printf("PostgreSQL异步写入失败: %v", err)
        }
    }
}

这种模式允许你在应用层同时维护两个存储,确保数据在PostgreSQL和内存中都有一份副本。

回到顶部