Rust实时数据同步库dittolive-ditto的使用,实现跨平台设备间高效数据同步与协作

Rust实时数据同步库dittolive-ditto的使用,实现跨平台设备间高效数据同步与协作

什么是Ditto?

Ditto是一个跨平台的对等网络(P2P)数据库,允许应用程序在有或没有互联网连接的情况下进行数据同步。

安装Ditto到您的应用程序中,然后使用API读写数据到其存储系统,它会自动将任何更改同步到其他设备。与其他同步解决方案不同,Ditto专为"对等网络"场景设计,可以在没有互联网连接时直接与其他设备通信。

此外,Ditto自动管理使用多种网络传输(如蓝牙、P2P Wi-Fi和局域网)的复杂性,来查找和连接其他设备并同步任何更改。

快速入门示例

以下是使用Ditto Playground模式的快速入门示例:

use std::sync::Arc;
use dittolive_ditto::prelude::*;

fn main() -> anyhow::Result<()> {
    let app_id = AppId::from_env("DITTO_APP_ID")?;
    let playground_token = std::env::var("DITTO_PLAYGROUND_TOKEN")?;
    let cloud_sync = true;
    let custom auth_url = None;

    // 初始化Ditto
    let ditto = Ditto::builder()
        .with_root(Arc::new(PersistentRoot::from_current_exe()?))
        .with_identity(|ditto_root| {
            identity::OnlinePlayground::new(
                ditto_root,
                app_id,
                playground_token,
                cloud_sync,
                custom_auth_url,
            )
        })?
        .build()?;

    // 开始与对等设备同步
    ditto.start_sync()?;

    Ok(())
}

使用DQL(Ditto查询语言)写入数据

使用DQL写入数据的首选方法如下:

use dittolive_ditto::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Car {
    color: String,
    make: String,
}

async fn dql_insert_car(ditto: &Ditto, car: &Car) -> anyhow::Result<()> {
    let store = ditto.store();
    let query_result = store.execute(
        // `cars`是集合名称
        "INSERT INTO cars DOCUMENTS (:newCar)",
        Some(serde_json::json!({
            "newCar": car
        }).into())
    ).await?;

    // 可选:查看插入的项目数量
    let item_count = query_result.item_count();

    // 可选:检查每个插入的项目
    for query_item in query_result.iter() {
        println!("Inserted: {}", query_item.json_string());
    }

    Ok(())
}

// 调用方法:
async fn call_dql_insert(ditto: Ditto) -> anyhow::Result<()> {
    let my_car = Car {
        color: "blue".to_string(),
        make: "ford".to_string(),
    };
    dql_insert_car(&ditto, &my_car).await?;
    Ok(())
}

使用DQL读取数据

use dittolive_ditto::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Car {
    color: String,
    make: String,
}

async fn dql_select_cars(ditto: &Ditto, color: &str) -> anyhow::Result<Vec<Car>> {
    let store = ditto.store();
    let query_result = store.execute(
        "SELECT * FROM cars WHERE color = :myColor",
        Some(serde_json::json!({
            "myColor": color
        }).into())
    ).await?;

    let cars = query_result.iter()
        .map(|query_item| query_item.deserialize_value::<Car>())
        .collect::<Result<Vec<Car>, _>>()?;

    Ok(cars)
}

// 调用方法:
async fn call_dql_select(ditto: Ditto) -> anyhow::Result<()> {
    let cars: Vec<Car> = dql_select_cars(&ditto, "blue").await?;
    Ok(())
}

完整示例

以下是一个完整的跨平台数据同步示例:

use std::sync::Arc;
use dittolive_ditto::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::time::{sleep, Duration};

#[derive(Serialize, Deserialize, Debug)]
struct Message {
    text: String,
    timestamp: i64,
    sender: String,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 初始化Ditto
    let app_id = AppId::from_env("DITTO_APP_ID")?;
    let playground_token = std::env::var("DITTO_PLAYGROUND_TOKEN")?;
    
    let ditto = Ditto::builder()
        .with_root(Arc::new(PPersistentRoot::from_current_exe()?))
        .with_identity(|ditto_root| {
            identity::OnlinePlayground::new(
                ditto_root,
                app_id,
                playground_token,
                true,  // 启用云同步
                None,  // 不使用自定义认证URL
            )
        })?
        .build()?;
    
    // 开始同步
    ditto.start_sync()?;
    
    // 创建消息集合的实时订阅
    let mut live_query = ditto.store()
        .execute("SELECT * FROM messages ORDER BY timestamp DESC", None)
        .await?
        .live_query();
    
    // 在单独的任务中处理接收到的消息
    tokio::spawn(async move {
        while let Some(event) = live_query.next().await {
            for item in event.iter() {
                if let Ok(msg) = item.deserialize_value::<Message>() {
                    println!("[{}] {}: {}", 
                        msg.timestamp, 
                        msg.sender, 
                        msg.text);
                }
            }
        }
    });
    
    // 模拟发送消息
    for i in 0..5 {
        let message = Message {
            text: format!("Hello from device {}", i),
            timestamp: chrono::Utc::now().timestamp(),
            sender: std::env::var("USER").unwrap_or("unknown".to_string()),
        };
        
        ditto.store()
            .execute(
                "INSERT INTO messages DOCUMENTS (:msg)",
                Some(serde_json::json!({ "msg": message }).into()),
            )
            .await?;
        
        sleep(Duration::from_secs(1)).await;
    }
    
    // 保持程序运行以继续接收消息
    loop {
        sleep(Duration::from_secs(10)).await;
    }
}

依赖配置

在Cargo.toml中添加以下依赖:

[dependencies]
dittolive-ditto = "4.11.2"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
tokio = { version = "1.0", features = ["full"] }
chrono = "0.4"

注意事项

  1. 需要设置环境变量DITTO_APP_IDDITTO_PLAYGROUND_TOKEN
  2. 此示例展示了跨设备实时消息同步功能
  3. 数据会自动通过蓝牙、Wi-Fi或互联网在所有连接的设备间同步
  4. 即使在离线状态下,数据也会在设备重新连接时自动同步

Ditto特别适合需要实时数据同步和协作的跨平台应用场景,如聊天应用、多人协作编辑、物联网设备数据同步等。


1 回复

Rust实时数据同步库dittolive-ditto的使用指南

概述

dittolive-ditto是一个强大的实时数据同步库,专为跨平台设备间高效数据同步与协作而设计。它允许开发者在不同设备间实时同步数据,支持离线优先的工作模式,非常适合构建协作应用、多设备同步应用等场景。

主要特性

  • 实时数据同步:设备间数据变更即时同步
  • 离线优先:支持离线操作,网络恢复后自动同步
  • 多平台支持:可在iOS、Android、macOS、Linux、Windows等平台使用
  • 冲突解决:内置智能冲突解决机制
  • 端到端加密:保障数据安全

安装方法

在Cargo.toml中添加依赖:

[dependencies]
ditto = "0.9"  # 请检查最新版本

基本使用方法

1. 初始化Ditto实例

use ditto::Ditto;

async fn init_ditto() -> Result<Ditto, Box<dyn std::error::Error>> {
    let ditto = Ditto::builder()
        .with_identity(|identity| {
            identity.online_with_authentication(|auth| {
                auth.app_id("YOUR_APP_ID".to_string())
                    .token("YOUR_AUTH_TOKEN".to_string())
            })
        })
        .build()?;
    
    ditto.start_sync().await?;
    Ok(ditto)
}

2. 创建和操作集合

use ditto::document::DocumentId;

async fn work_with_collection(ditto: &Ditto) {
    // 获取或创建集合
    let collection = ditto.store().collection("tasks").unwrap();
    
    // 插入文档
    let doc_id = DocumentId::new();
    collection
        .insert(
            doc_id,
            serde_json::json!({
                "title": "Buy milk",
                "completed": false,
                "created_at": chrono::Utc::now().to_rfc3339()
            }),
        )
        .await
        .unwrap();
    
    // 查询文档
    let tasks = collection.find("{}".to_string()).exec().await.unwrap();
    for task in tasks {
        println!("Task: {:?}", task);
    }
}

3. 监听数据变更

use ditto::subscription::Subscription;

async fn subscribe to_changes(ditto: &Ditto) {
    let collection = ditto.store().collection("tasks").unwrap();
    
    // 创建订阅
    let mut subscription = Subscription::new("live_tasks");
    subscription.set_query("{}".to_string());
    
    // 监听变更
    let mut live_query = collection
        .find("{}".to_string())
        .subscribe(&subscription)
        .await
        .unwrap();
    
    while let Some(event) = live_query.next().await {
        match event {
            ditto::live_query::LiveQueryEvent::Update(update) => {
                println!("Document updated: {:?}", update);
            }
            ditto::live_query::LiveQueryEvent::Insert(insert) => {
                println!("Document inserted: {:?}", insert);
            }
            ditto::live_query::LiveQueryEvent::Remove(remove) => {
                println!("Document removed: {:?}", remove);
            }
        }
    }
}

高级功能

1. 冲突解决

async fn handle_conflict(ditto: &Ditto) {
    let collection = ditto.store().collection("tasks").unwrap();
    
    // 设置冲突解决器
    collection
        .register_resolver("tasks", |conflict| {
            let local_doc = conflict.local();
            let remote_doc = conflict.remote();
            
            // 简单的基于时间戳的冲突解决策略
            let local_time = local_doc["updated_at"].as_str().unwrap();
            let remote_time = remote_doc["updated_at"].as_str().unwrap();
            
            if local_time > remote_time {
                conflict.resolve_with_local()
            } else {
                conflict.resolve_with_remote()
            }
        })
        .await;
}

2. 跨设备同步

async fn sync_across_devices(ditto: &Ditto) {
    // 连接到其他对等设备
    if let Err(e) = ditto.connect_peers().await {
        eprintln!("Failed to connect to peers: {}", e);
    }
    
    // 检查连接状态
    let peers = ditto.peers().await;
    println!("Connected peers: {:?}", peers);
}

完整示例代码

use ditto::{Ditto, DocumentId, live_query::LiveQueryEvent};
use serde_json::json;
use std::error::Error;
use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 1. 初始化Ditto实例
    let ditto = Ditto::builder()
        .with_identity(|identity| {
            identity.online_with_authentication(|auth| {
                auth.app_id("my-collab-app".to_string())
                    .token("secure-token-123".to_string())
            })
        })
        .build()?;
    
    // 启动同步
    ditto.start_sync().await?;

    // 2. 创建并操作集合
    let collection = ditto.store().collection("collab_docs").unwrap();
    
    // 插入示例文档
    let doc_id = DocumentId::new();
    collection.insert(
        doc_id,
        json!({
            "content": "Initial document content",
            "version": 1,
            "last_updated": chrono::Utc::now().to_rfc3339()
        }),
    ).await?;

    // 3. 监听数据变更
    let mut live_query = collection
        .find("{}".to_string())
        .subscribe(&Subscription::new("doc_updates"))
        .await?;

    tokio::spawn(async move {
        while let Some(event) = live_query.next().await {
            match event {
                LiveQueryEvent::Update(update) => {
                    println!("文档更新: 版本 {}", update.document["version"]);
                }
                LiveQueryEvent::Insert(insert) => {
                    println!("新文档创建: ID {}", insert.document_id);
                }
                LiveQueryEvent::Remove(remove) => {
                    println!("文档删除: ID {}", remove.document_id);
                }
            }
        }
    });

    // 4. 高级功能示例 - 冲突解决
    collection.register_resolver("collab_docs", |conflict| {
        let local_ver = conflict.local()["version"].as_u64().unwrap_or(0);
        let remote_ver = conflict.remote()["version"].as_u64().unwrap_or(0);
        
        if local_ver > remote_ver {
            conflict.resolve_with_local()
        } else {
            conflict.resolve_with_remote()
        }
    }).await;

    // 5. 跨设备同步
    if let Err(e) = ditto.connect_peers().await {
        eprintln!("Peer连接错误: {}", e);
    }

    // 保持应用运行直到收到Ctrl-C
    signal::ctrl_c().await?;
    println!("正在关闭应用...");
    Ok(())
}

注意事项

  1. 认证信息:生产环境中不要硬编码认证信息,应从安全配置中获取
  2. 错误处理:实际应用中应妥善处理所有可能的错误
  3. 性能考虑:大数据集时应使用适当的查询条件和索引
  4. 网络状态:考虑处理网络连接断开和恢复的情况

dittolive-ditto为Rust开发者提供了强大的实时数据同步能力,可以大大简化跨设备协作应用的开发工作。

回到顶部