基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现

基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现 GitHub - kapv89/k_yrs_go: YJS CRDT Database Server over Redis, Postgres

k_yrs_go 是一个用于 Yjs 文档的数据库服务器。它构建在 Postgres 和 Redis 之上。k_yrs_go 使用二进制 Redis 队列作为 YJS 文档更新的 I/O 缓冲区,并使用以下 PG 表来存储更新:

CREATE TABLE IF NOT EXISTS k_yrs_go_yupdates_store (
    id TEXT PRIMARY KEY,
    doc_id TEXT NOT NULL,
    data BYTEA NOT NULL
);

CREATE INDEX IF NOT EXISTS k_yrs_go_yupdates_store_doc_id_idx ON k_yrs_go_yupdates_store (doc_id);

当获取文档状态时,如果某个 doc_id 的更新行数超过 100 条,k_yrs_go_yupdates_store 表中的行将进行压缩。压缩操作在一个可序列化的事务中执行,并且只有当已删除的更新数量与从数据库中获取的数量相等时,合并后的更新才会被插入到表中。

即使是读取和写入操作也发生在可序列化的事务中。根据我对数据库的所有了解,k_yrs_go 中的读取、写入和压缩操作应该彼此保持一致。

支持的最大文档大小为 1 GB。


更多关于基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


这是一个基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器的专业实现。以下是关键实现细节的分析:

架构设计分析

1. 数据存储结构

// Postgres表结构已优化存储Yjs更新
type YUpdate struct {
    ID     string `db:"id"`      // 主键
    DocID  string `db:"doc_id"`  // 文档标识符
    Data   []byte `db:"data"`    // Yjs二进制更新数据
}

2. Redis缓冲区实现

// Redis作为I/O缓冲区的典型实现
package main

import (
    "context"
    "github.com/go-redis/redis/v8"
)

type RedisBuffer struct {
    client *redis.Client
    prefix string
}

func (r *RedisBuffer) PushUpdate(ctx context.Context, docID string, update []byte) error {
    key := r.prefix + ":" + docID + ":updates"
    return r.client.RPush(ctx, key, update).Err()
}

func (r *RedisBuffer) PopUpdates(ctx context.Context, docID string, count int64) ([][]byte, error) {
    key := r.prefix + ":" + docID + ":updates"
    return r.client.LPopCount(ctx, key, int(count)).Result()
}

3. 压缩机制实现

// 文档更新压缩的核心逻辑
func compressUpdates(ctx context.Context, db *sql.DB, docID string) error {
    tx, err := db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelSerializable,
    })
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 获取需要压缩的更新
    rows, err := tx.QueryContext(ctx, 
        `SELECT id, data FROM k_yrs_go_yupdates_store 
         WHERE doc_id = $1 ORDER BY id LIMIT 101`,
        docID)
    if err != nil {
        return err
    }
    defer rows.Close()

    var updates [][]byte
    var ids []string
    count := 0
    
    for rows.Next() {
        var id string
        var data []byte
        if err := rows.Scan(&id, &data); err != nil {
            return err
        }
        updates = append(updates, data)
        ids = append(ids, id)
        count++
    }

    // 超过100条时执行压缩
    if count > 100 {
        // 合并Yjs更新
        mergedUpdate := mergeYjsUpdates(updates)
        
        // 在事务中删除旧记录并插入合并后的更新
        _, err = tx.ExecContext(ctx,
            `DELETE FROM k_yrs_go_yupdates_store WHERE id = ANY($1)`,
            pq.Array(ids))
        if err != nil {
            return err
        }

        _, err = tx.ExecContext(ctx,
            `INSERT INTO k_yrs_go_yupdates_store (id, doc_id, data) 
             VALUES ($1, $2, $3)`,
            generateUUID(), docID, mergedUpdate)
        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

4. 文档状态获取

func getDocumentState(ctx context.Context, db *sql.DB, docID string) ([]byte, error) {
    tx, err := db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelSerializable,
    })
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    // 获取所有更新并应用
    rows, err := tx.QueryContext(ctx,
        `SELECT data FROM k_yrs_go_yupdates_store 
         WHERE doc_id = $1 ORDER BY id`,
        docID)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var updates [][]byte
    for rows.Next() {
        var data []byte
        if err := rows.Scan(&data); err != nil {
            return nil, err
        }
        updates = append(updates, data)
    }

    // 检查是否需要压缩
    if len(updates) > 100 {
        go func() {
            compressUpdates(context.Background(), db, docID)
        }()
    }

    // 应用Yjs更新获取最终状态
    return applyYjsUpdates(updates), nil
}

5. 写入操作实现

func writeUpdate(ctx context.Context, db *sql.DB, redisBuffer *RedisBuffer, docID string, update []byte) error {
    // 先写入Redis缓冲区
    if err := redisBuffer.PushUpdate(ctx, docID, update); err != nil {
        return err
    }

    // 在可序列化事务中写入Postgres
    tx, err := db.BeginTx(ctx, &sql.TxOptions{
        Isolation: sql.LevelSerializable,
    })
    if err != nil {
        return err
    }
    defer tx.Rollback()

    _, err = tx.ExecContext(ctx,
        `INSERT INTO k_yrs_go_yupdates_store (id, doc_id, data) 
         VALUES ($1, $2, $3)`,
        generateUUID(), docID, update)
    if err != nil {
        return err
    }

    return tx.Commit()
}

关键技术特点

  1. 可序列化事务隔离:确保读写操作的一致性
  2. Redis二进制队列:提供高性能的I/O缓冲
  3. 自动压缩机制:当更新超过100条时自动合并
  4. 索引优化doc_id索引加速文档查询
  5. 大文档支持:通过BYTEA类型支持最大1GB文档

这个实现充分利用了Postgres的事务特性和Redis的高性能缓冲,为Yjs CRDT文档提供了可靠的数据持久化方案。

回到顶部