基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现
基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现
k_yrs_go 是一个用于 Yjs 文档的数据库服务器。它构建在 Postgres 和 Redis 之上。k_yrs_go 使用二进制 Redis 队列作为 YJS 文档更新的 I/O 缓冲区,并使用以下 PG 表来存储更新:
CREATE TABLE IF NOT EXISTS k_yrs_go_yupdates_store (
id TEXT PRIMARY KEY,
doc_id TEXT NOT NULL,
data BYTEA NOT NULL
);
CREATE INDEX IF NOT EXISTS k_yrs_go_yupdates_store_doc_id_idx ON k_yrs_go_yupdates_store (doc_id);
当获取文档状态时,如果某个 doc_id 的更新行数超过 100 条,k_yrs_go_yupdates_store 表中的行将进行压缩。压缩操作在一个可序列化的事务中执行,并且只有当已删除的更新数量与从数据库中获取的数量相等时,合并后的更新才会被插入到表中。
即使是读取和写入操作也发生在可序列化的事务中。根据我对数据库的所有了解,k_yrs_go 中的读取、写入和压缩操作应该彼此保持一致。
支持的最大文档大小为 1 GB。
更多关于基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
这是一个基于Postgres和Redis构建的Golang Yjs CRDT数据库服务器的专业实现。以下是关键实现细节的分析:
架构设计分析
1. 数据存储结构
// Postgres表结构已优化存储Yjs更新
type YUpdate struct {
ID string `db:"id"` // 主键
DocID string `db:"doc_id"` // 文档标识符
Data []byte `db:"data"` // Yjs二进制更新数据
}
2. Redis缓冲区实现
// Redis作为I/O缓冲区的典型实现
package main
import (
"context"
"github.com/go-redis/redis/v8"
)
type RedisBuffer struct {
client *redis.Client
prefix string
}
func (r *RedisBuffer) PushUpdate(ctx context.Context, docID string, update []byte) error {
key := r.prefix + ":" + docID + ":updates"
return r.client.RPush(ctx, key, update).Err()
}
func (r *RedisBuffer) PopUpdates(ctx context.Context, docID string, count int64) ([][]byte, error) {
key := r.prefix + ":" + docID + ":updates"
return r.client.LPopCount(ctx, key, int(count)).Result()
}
3. 压缩机制实现
// 文档更新压缩的核心逻辑
func compressUpdates(ctx context.Context, db *sql.DB, docID string) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return err
}
defer tx.Rollback()
// 获取需要压缩的更新
rows, err := tx.QueryContext(ctx,
`SELECT id, data FROM k_yrs_go_yupdates_store
WHERE doc_id = $1 ORDER BY id LIMIT 101`,
docID)
if err != nil {
return err
}
defer rows.Close()
var updates [][]byte
var ids []string
count := 0
for rows.Next() {
var id string
var data []byte
if err := rows.Scan(&id, &data); err != nil {
return err
}
updates = append(updates, data)
ids = append(ids, id)
count++
}
// 超过100条时执行压缩
if count > 100 {
// 合并Yjs更新
mergedUpdate := mergeYjsUpdates(updates)
// 在事务中删除旧记录并插入合并后的更新
_, err = tx.ExecContext(ctx,
`DELETE FROM k_yrs_go_yupdates_store WHERE id = ANY($1)`,
pq.Array(ids))
if err != nil {
return err
}
_, err = tx.ExecContext(ctx,
`INSERT INTO k_yrs_go_yupdates_store (id, doc_id, data)
VALUES ($1, $2, $3)`,
generateUUID(), docID, mergedUpdate)
if err != nil {
return err
}
}
return tx.Commit()
}
4. 文档状态获取
func getDocumentState(ctx context.Context, db *sql.DB, docID string) ([]byte, error) {
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return nil, err
}
defer tx.Rollback()
// 获取所有更新并应用
rows, err := tx.QueryContext(ctx,
`SELECT data FROM k_yrs_go_yupdates_store
WHERE doc_id = $1 ORDER BY id`,
docID)
if err != nil {
return nil, err
}
defer rows.Close()
var updates [][]byte
for rows.Next() {
var data []byte
if err := rows.Scan(&data); err != nil {
return nil, err
}
updates = append(updates, data)
}
// 检查是否需要压缩
if len(updates) > 100 {
go func() {
compressUpdates(context.Background(), db, docID)
}()
}
// 应用Yjs更新获取最终状态
return applyYjsUpdates(updates), nil
}
5. 写入操作实现
func writeUpdate(ctx context.Context, db *sql.DB, redisBuffer *RedisBuffer, docID string, update []byte) error {
// 先写入Redis缓冲区
if err := redisBuffer.PushUpdate(ctx, docID, update); err != nil {
return err
}
// 在可序列化事务中写入Postgres
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.ExecContext(ctx,
`INSERT INTO k_yrs_go_yupdates_store (id, doc_id, data)
VALUES ($1, $2, $3)`,
generateUUID(), docID, update)
if err != nil {
return err
}
return tx.Commit()
}
关键技术特点
- 可序列化事务隔离:确保读写操作的一致性
- Redis二进制队列:提供高性能的I/O缓冲
- 自动压缩机制:当更新超过100条时自动合并
- 索引优化:
doc_id索引加速文档查询 - 大文档支持:通过BYTEA类型支持最大1GB文档
这个实现充分利用了Postgres的事务特性和Redis的高性能缓冲,为Yjs CRDT文档提供了可靠的数据持久化方案。

