Rust实时数据同步库dittolive-ditto的使用,实现跨平台设备间高效数据同步与协作
Rust实时数据同步库dittolive-ditto的使用,实现跨平台设备间高效数据同步与协作
什么是Ditto?
Ditto是一个跨平台的对等网络(P2P)数据库,允许应用程序在有或没有互联网连接的情况下进行数据同步。
安装Ditto到您的应用程序中,然后使用API读写数据到其存储系统,它会自动将任何更改同步到其他设备。与其他同步解决方案不同,Ditto专为"对等网络"场景设计,可以在没有互联网连接时直接与其他设备通信。
此外,Ditto自动管理使用多种网络传输(如蓝牙、P2P Wi-Fi和局域网)的复杂性,来查找和连接其他设备并同步任何更改。
快速入门示例
以下是使用Ditto Playground模式的快速入门示例:
use std::sync::Arc;
use dittolive_ditto::prelude::*;
fn main() -> anyhow::Result<()> {
let app_id = AppId::from_env("DITTO_APP_ID")?;
let playground_token = std::env::var("DITTO_PLAYGROUND_TOKEN")?;
let cloud_sync = true;
let custom auth_url = None;
// 初始化Ditto
let ditto = Ditto::builder()
.with_root(Arc::new(PersistentRoot::from_current_exe()?))
.with_identity(|ditto_root| {
identity::OnlinePlayground::new(
ditto_root,
app_id,
playground_token,
cloud_sync,
custom_auth_url,
)
})?
.build()?;
// 开始与对等设备同步
ditto.start_sync()?;
Ok(())
}
使用DQL(Ditto查询语言)写入数据
使用DQL写入数据的首选方法如下:
use dittolive_ditto::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Car {
color: String,
make: String,
}
async fn dql_insert_car(ditto: &Ditto, car: &Car) -> anyhow::Result<()> {
let store = ditto.store();
let query_result = store.execute(
// `cars`是集合名称
"INSERT INTO cars DOCUMENTS (:newCar)",
Some(serde_json::json!({
"newCar": car
}).into())
).await?;
// 可选:查看插入的项目数量
let item_count = query_result.item_count();
// 可选:检查每个插入的项目
for query_item in query_result.iter() {
println!("Inserted: {}", query_item.json_string());
}
Ok(())
}
// 调用方法:
async fn call_dql_insert(ditto: Ditto) -> anyhow::Result<()> {
let my_car = Car {
color: "blue".to_string(),
make: "ford".to_string(),
};
dql_insert_car(&ditto, &my_car).await?;
Ok(())
}
使用DQL读取数据
use dittolive_ditto::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Car {
color: String,
make: String,
}
async fn dql_select_cars(ditto: &Ditto, color: &str) -> anyhow::Result<Vec<Car>> {
let store = ditto.store();
let query_result = store.execute(
"SELECT * FROM cars WHERE color = :myColor",
Some(serde_json::json!({
"myColor": color
}).into())
).await?;
let cars = query_result.iter()
.map(|query_item| query_item.deserialize_value::<Car>())
.collect::<Result<Vec<Car>, _>>()?;
Ok(cars)
}
// 调用方法:
async fn call_dql_select(ditto: Ditto) -> anyhow::Result<()> {
let cars: Vec<Car> = dql_select_cars(&ditto, "blue").await?;
Ok(())
}
完整示例
以下是一个完整的跨平台数据同步示例:
use std::sync::Arc;
use dittolive_ditto::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::time::{sleep, Duration};
#[derive(Serialize, Deserialize, Debug)]
struct Message {
text: String,
timestamp: i64,
sender: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 初始化Ditto
let app_id = AppId::from_env("DITTO_APP_ID")?;
let playground_token = std::env::var("DITTO_PLAYGROUND_TOKEN")?;
let ditto = Ditto::builder()
.with_root(Arc::new(PPersistentRoot::from_current_exe()?))
.with_identity(|ditto_root| {
identity::OnlinePlayground::new(
ditto_root,
app_id,
playground_token,
true, // 启用云同步
None, // 不使用自定义认证URL
)
})?
.build()?;
// 开始同步
ditto.start_sync()?;
// 创建消息集合的实时订阅
let mut live_query = ditto.store()
.execute("SELECT * FROM messages ORDER BY timestamp DESC", None)
.await?
.live_query();
// 在单独的任务中处理接收到的消息
tokio::spawn(async move {
while let Some(event) = live_query.next().await {
for item in event.iter() {
if let Ok(msg) = item.deserialize_value::<Message>() {
println!("[{}] {}: {}",
msg.timestamp,
msg.sender,
msg.text);
}
}
}
});
// 模拟发送消息
for i in 0..5 {
let message = Message {
text: format!("Hello from device {}", i),
timestamp: chrono::Utc::now().timestamp(),
sender: std::env::var("USER").unwrap_or("unknown".to_string()),
};
ditto.store()
.execute(
"INSERT INTO messages DOCUMENTS (:msg)",
Some(serde_json::json!({ "msg": message }).into()),
)
.await?;
sleep(Duration::from_secs(1)).await;
}
// 保持程序运行以继续接收消息
loop {
sleep(Duration::from_secs(10)).await;
}
}
依赖配置
在Cargo.toml中添加以下依赖:
[dependencies]
dittolive-ditto = "4.11.2"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
tokio = { version = "1.0", features = ["full"] }
chrono = "0.4"
注意事项
- 需要设置环境变量
DITTO_APP_ID
和DITTO_PLAYGROUND_TOKEN
- 此示例展示了跨设备实时消息同步功能
- 数据会自动通过蓝牙、Wi-Fi或互联网在所有连接的设备间同步
- 即使在离线状态下,数据也会在设备重新连接时自动同步
Ditto特别适合需要实时数据同步和协作的跨平台应用场景,如聊天应用、多人协作编辑、物联网设备数据同步等。
1 回复
Rust实时数据同步库dittolive-ditto的使用指南
概述
dittolive-ditto是一个强大的实时数据同步库,专为跨平台设备间高效数据同步与协作而设计。它允许开发者在不同设备间实时同步数据,支持离线优先的工作模式,非常适合构建协作应用、多设备同步应用等场景。
主要特性
- 实时数据同步:设备间数据变更即时同步
- 离线优先:支持离线操作,网络恢复后自动同步
- 多平台支持:可在iOS、Android、macOS、Linux、Windows等平台使用
- 冲突解决:内置智能冲突解决机制
- 端到端加密:保障数据安全
安装方法
在Cargo.toml中添加依赖:
[dependencies]
ditto = "0.9" # 请检查最新版本
基本使用方法
1. 初始化Ditto实例
use ditto::Ditto;
async fn init_ditto() -> Result<Ditto, Box<dyn std::error::Error>> {
let ditto = Ditto::builder()
.with_identity(|identity| {
identity.online_with_authentication(|auth| {
auth.app_id("YOUR_APP_ID".to_string())
.token("YOUR_AUTH_TOKEN".to_string())
})
})
.build()?;
ditto.start_sync().await?;
Ok(ditto)
}
2. 创建和操作集合
use ditto::document::DocumentId;
async fn work_with_collection(ditto: &Ditto) {
// 获取或创建集合
let collection = ditto.store().collection("tasks").unwrap();
// 插入文档
let doc_id = DocumentId::new();
collection
.insert(
doc_id,
serde_json::json!({
"title": "Buy milk",
"completed": false,
"created_at": chrono::Utc::now().to_rfc3339()
}),
)
.await
.unwrap();
// 查询文档
let tasks = collection.find("{}".to_string()).exec().await.unwrap();
for task in tasks {
println!("Task: {:?}", task);
}
}
3. 监听数据变更
use ditto::subscription::Subscription;
async fn subscribe to_changes(ditto: &Ditto) {
let collection = ditto.store().collection("tasks").unwrap();
// 创建订阅
let mut subscription = Subscription::new("live_tasks");
subscription.set_query("{}".to_string());
// 监听变更
let mut live_query = collection
.find("{}".to_string())
.subscribe(&subscription)
.await
.unwrap();
while let Some(event) = live_query.next().await {
match event {
ditto::live_query::LiveQueryEvent::Update(update) => {
println!("Document updated: {:?}", update);
}
ditto::live_query::LiveQueryEvent::Insert(insert) => {
println!("Document inserted: {:?}", insert);
}
ditto::live_query::LiveQueryEvent::Remove(remove) => {
println!("Document removed: {:?}", remove);
}
}
}
}
高级功能
1. 冲突解决
async fn handle_conflict(ditto: &Ditto) {
let collection = ditto.store().collection("tasks").unwrap();
// 设置冲突解决器
collection
.register_resolver("tasks", |conflict| {
let local_doc = conflict.local();
let remote_doc = conflict.remote();
// 简单的基于时间戳的冲突解决策略
let local_time = local_doc["updated_at"].as_str().unwrap();
let remote_time = remote_doc["updated_at"].as_str().unwrap();
if local_time > remote_time {
conflict.resolve_with_local()
} else {
conflict.resolve_with_remote()
}
})
.await;
}
2. 跨设备同步
async fn sync_across_devices(ditto: &Ditto) {
// 连接到其他对等设备
if let Err(e) = ditto.connect_peers().await {
eprintln!("Failed to connect to peers: {}", e);
}
// 检查连接状态
let peers = ditto.peers().await;
println!("Connected peers: {:?}", peers);
}
完整示例代码
use ditto::{Ditto, DocumentId, live_query::LiveQueryEvent};
use serde_json::json;
use std::error::Error;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 1. 初始化Ditto实例
let ditto = Ditto::builder()
.with_identity(|identity| {
identity.online_with_authentication(|auth| {
auth.app_id("my-collab-app".to_string())
.token("secure-token-123".to_string())
})
})
.build()?;
// 启动同步
ditto.start_sync().await?;
// 2. 创建并操作集合
let collection = ditto.store().collection("collab_docs").unwrap();
// 插入示例文档
let doc_id = DocumentId::new();
collection.insert(
doc_id,
json!({
"content": "Initial document content",
"version": 1,
"last_updated": chrono::Utc::now().to_rfc3339()
}),
).await?;
// 3. 监听数据变更
let mut live_query = collection
.find("{}".to_string())
.subscribe(&Subscription::new("doc_updates"))
.await?;
tokio::spawn(async move {
while let Some(event) = live_query.next().await {
match event {
LiveQueryEvent::Update(update) => {
println!("文档更新: 版本 {}", update.document["version"]);
}
LiveQueryEvent::Insert(insert) => {
println!("新文档创建: ID {}", insert.document_id);
}
LiveQueryEvent::Remove(remove) => {
println!("文档删除: ID {}", remove.document_id);
}
}
}
});
// 4. 高级功能示例 - 冲突解决
collection.register_resolver("collab_docs", |conflict| {
let local_ver = conflict.local()["version"].as_u64().unwrap_or(0);
let remote_ver = conflict.remote()["version"].as_u64().unwrap_or(0);
if local_ver > remote_ver {
conflict.resolve_with_local()
} else {
conflict.resolve_with_remote()
}
}).await;
// 5. 跨设备同步
if let Err(e) = ditto.connect_peers().await {
eprintln!("Peer连接错误: {}", e);
}
// 保持应用运行直到收到Ctrl-C
signal::ctrl_c().await?;
println!("正在关闭应用...");
Ok(())
}
注意事项
- 认证信息:生产环境中不要硬编码认证信息,应从安全配置中获取
- 错误处理:实际应用中应妥善处理所有可能的错误
- 性能考虑:大数据集时应使用适当的查询条件和索引
- 网络状态:考虑处理网络连接断开和恢复的情况
dittolive-ditto为Rust开发者提供了强大的实时数据同步能力,可以大大简化跨设备协作应用的开发工作。