Rust分布式存储与节点管理库sn_node的使用,实现高效安全的数据存储与网络通信

Rust分布式存储与节点管理库sn_node的使用,实现高效安全的数据存储与网络通信

概述

sn_node 目录提供了 safenode 二进制文件,这是 Safe Network 的节点实现。该目录包含节点操作的核心逻辑,包括 API 定义、错误处理、事件管理和数据验证。

安装

按照主项目的安装指南设置 safenode 二进制文件。

使用

要运行 safenode 二进制文件,请按照主项目的使用指南中的说明操作。

目录结构

  • src/: 源代码文件
    • api.rs: API 定义
    • error.rs: 错误类型和处理
    • event.rs: 事件相关逻辑
    • get_validation.rs: GET 请求的验证
    • put_validation.rs: PUT 请求的验证
    • replication.rs: 数据复制逻辑
    • spends.rs: 与花费代币或资源相关的逻辑
  • tests/: 测试文件
    • common/mod.rs: 测试的通用工具
    • data_with_churn.rs: 与数据流失相关的测试
    • sequential_transfers.rs: 顺序数据传输的测试
    • storage_payments.rs: 与存储支付相关的测试
    • verify_data_location.rs: 验证数据位置的测试

测试

要运行测试,请导航到 sn_node 目录并执行:

cargo test

贡献

请随意克隆和修改此项目。欢迎提交拉取请求。

常规提交

我们遵循常规提交规范进行所有提交。请确保您的提交消息符合此标准。

许可证

此 Safe Network 存储库根据通用公共许可证 (GPL) 第 3 版获得许可。

示例代码

以下是一个使用 sn_node 库的完整示例,演示如何启动一个节点并处理基本操作:

use sn_node::{
    api::NodeApi,
    error::NodeError,
    event::NodeEvent,
    replication::ReplicationManager,
};
use tokio::sync::mpsc;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), NodeError> {
    // 创建事件通道
    let (event_tx, mut event_rx) = mpsc::channel(100);
    
    // 初始化节点 API
    let node_api = NodeApi::new()
        .with_event_sender(event_tx)
        .build()?;
    
    // 启动节点
    let node_handle = node_api.start().await?;
    
    // 启动复制管理器
    let replication_manager = ReplicationManager::new(node_api.clone());
    replication_manager.start().await?;
    
    // 处理节点事件
    tokio::spawn(async move {
        while let Some(event) = event_rx.recv().await {
            match event {
                NodeEvent::DataStored { key, size } => {
                    println!("数据已存储 - 键: {:?}, 大小: {} 字节", key, size);
                }
                NodeEvent::DataRetrieved { key, data } => {
                    println!("数据已检索 - 键: {:?}, 数据长度: {}", key, data.len());
                }
                NodeEvent::ReplicationComplete { key, replicas } => {
                    println!("复制完成 - 键: {:?}, 副本数: {}", key, replicas);
                }
                NodeEvent::Error { error } => {
                    eprintln!("节点错误: {:?}", error);
                }
                _ => {}
            }
        }
    });
    
    // 示例:存储数据
    let test_data = b"Hello, Safe Network!";
    let storage_result = node_api.store_data(test_data.to_vec()).await?;
    println!("存储结果: {:?}", storage_result);
    
    // 示例:检索数据
    let retrieval_result = node_api.retrieve_data(&storage_result.key).await?;
    println!("检索到的数据: {:?}", String::from_utf8_lossy(&retrieval_result));
    
    // 保持节点运行
    node_handle.await??;
    
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    
    #[tokio::test]
    async fn test_basic_operations() {
        let (event_tx, _) = mpsc::channel(100);
        let node_api = NodeApi::new()
            .with_event_sender(event_tx)
            .build()
            .unwrap();
        
        // 测试数据存储
        let test_data = b"Test data";
        let result = node_api.store_data(test_data.to_vec()).await;
        assert!(result.is_ok());
        
        // 测试数据检索
        let key = result.unwrap().key;
        let retrieved = node_api.retrieve_data(&key).await;
        assert!(retrieved.is_ok());
        assert_eq!(retrieved.unwrap(), test_data.to_vec());
    }
}

Cargo.toml 配置

[package]
name = "sn_node_example"
version = "0.1.0"
edition = "2021"

[dependencies]
sn_node = "0.112.6"
tokio = { version = "1.0", features = ["full"] }

[dev-dependencies]
tokio = { version = "1.0", features = ["full"] }

这个示例展示了如何使用 sn_node 库的基本功能,包括节点启动、数据存储、数据检索和事件处理。代码包含了错误处理和基本的测试用例。

完整示例代码

以下是一个更完整的 sn_node 使用示例,包含更多高级功能和错误处理:

use sn_node::{
    api::NodeApi,
    error::NodeError,
    event::NodeEvent,
    replication::ReplicationManager,
    get_validation::GetValidator,
    put_validation::PutValidator,
};
use tokio::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), NodeError> {
    // 创建事件通道用于接收节点事件
    let (event_tx, mut event_rx) = mpsc::channel(100);
    
    // 初始化节点 API 配置
    let node_api = NodeApi::new()
        .with_event_sender(event_tx.clone())
        .with_network_timeout(Duration::from_secs(30))
        .build()?;
    
    println!("正在启动 Safe Network 节点...");
    
    // 启动节点并获取处理句柄
    let node_handle = node_api.start().await?;
    
    // 初始化数据验证器
    let get_validator = GetValidator::new();
    let put_validator = PutValidator::new();
    
    // 启动复制管理器
    let replication_manager = ReplicationManager::new(node_api.clone());
    replication_manager
        .with_replication_factor(3) // 设置副本数为3
        .start()
        .await?;
    
    // 异步处理节点事件
    let event_handler = tokio::spawn(async move {
        while let Some(event) = event_rx.recv().await {
            handle_node_event(event).await;
        }
    });
    
    // 示例:存储多个数据块
    let data_sets = vec![
        b"第一个测试数据块".to_vec(),
        b"第二个测试数据块,包含更多内容".to_vec(),
        b"第三个数据块用于验证复制功能".to_vec(),
    ];
    
    let mut storage_keys = Vec::new();
    
    for (index, data) in data_sets.into_iter().enumerate() {
        // 验证数据有效性
        if let Err(e) = put_validator.validate(&data) {
            eprintln!("数据验证失败 (索引 {}): {:?}", index, e);
            continue;
        }
        
        match node_api.store_data(data).await {
            Ok(result) => {
                println!("成功存储数据块 {} - 键: {:?}", index + 1, result.key);
                storage_keys.push(result.key);
            }
            Err(e) => {
                eprintln!("存储数据块 {} 失败: {:?}", index + 1, e);
            }
        }
        
        // 添加短暂延迟以避免请求过载
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
    
    // 示例:检索所有存储的数据
    println!("\n开始检索存储的数据...");
    
    for key in &storage_keys {
        match node_api.retrieve_data(key).await {
            Ok(data) => {
                // 验证检索到的数据
                if let Err(e) = get_validator.validate(&data) {
                    eprintln!("数据验证失败 (键 {:?}): {:?}", key, e);
                } else {
                    println!(
                        "成功检索数据 - 键: {:?}, 长度: {} 字节", 
                        key, 
                        data.len()
                    );
                }
            }
            Err(e) => {
                eprintln!("检索数据失败 (键 {:?}): {:?}", key, e);
            }
        }
    }
    
    // 保持节点运行,等待用户中断
    println!("\n节点运行中...按 Ctrl+C 退出");
    
    // 等待节点关闭或用户中断
    tokio::select! {
        _ = node_handle => {
            println!("节点正常关闭");
        }
        _ = tokio::signal::ctrl_c() => {
            println!("接收到中断信号,正在关闭节点...");
        }
    }
    
    // 等待事件处理程序完成
    event_handler.abort();
    
    Ok(())
}

/// 处理节点事件的辅助函数
async fn handle_node_event(event: NodeEvent) {
    match event {
        NodeEvent::DataStored { key, size } => {
            println!("📦 数据存储成功 - 键: {:?}, 大小: {} 字节", key, size);
        }
        NodeEvent::DataRetrieved { key, data } => {
            println!("🔍 数据检索成功 - 键: {:?}, 数据长度: {}", key, data.len());
        }
        NodeEvent::ReplicationComplete { key, replicas } => {
            println!("✅ 复制完成 - 键: {:?}, 副本数: {}", key, replicas);
        }
        NodeEvent::NetworkConnected { peer_count } => {
            println!("🌐 网络连接建立 - 对等节点数: {}", peer_count);
        }
        NodeEvent::Error { error } => {
            eprintln!("❌ 节点错误: {:?}", error);
        }
        NodeEvent::Warning { message } => {
            println!("⚠️  节点警告: {}", message);
        }
        _ => {
            // 忽略其他事件类型
        }
    }
}

/// 自定义错误处理模块
mod error_handling {
    use super::NodeError;
    use sn_node::error::NodeErrorKind;
    
    pub fn handle_storage_error(error: NodeError) {
        match error.kind() {
            NodeErrorKind::StorageFull => {
                eprintln!("存储空间不足,请清理空间或增加存储容量");
            }
            NodeErrorKind::NetworkTimeout => {
                eprintln!("网络超时,请检查网络连接");
            }
            NodeErrorKind::ValidationFailed => {
                eprintln!("数据验证失败,请检查数据完整性");
            }
            _ => {
                eprintln!("未知错误: {:?}", error);
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::time::timeout;
    use std::time::Duration;
    
    #[tokio::test]
    async fn test_comprehensive_operations() {
        let (event_tx, _) = mpsc::channel(100);
        let node_api = NodeApi::new()
            .with_event_sender(event_tx)
            .build()
            .expect("Failed to build node API");
        
        // 测试超时功能
        let result = timeout(
            Duration::from_secs(10),
            node_api.store_data(b"Timeout test data".to_vec())
        ).await;
        
        assert!(result.is_ok(), "操作不应超时");
        
        // 测试数据检索
        let test_data = b"Comprehensive test data for validation";
        let storage_result = node_api.store_data(test_data.to_vec()).await;
        assert!(storage_result.is_ok(), "数据存储失败");
        
        let key = storage_result.unwrap().key;
        let retrieved = node_api.retrieve_data(&key).await;
        assert!(retrieved.is_ok(), "数据检索失败");
        assert_eq!(retrieved.unwrap(), test_data.to_vec(), "检索数据不匹配");
    }
    
    #[tokio::test]
    async fn test_error_handling() {
        let (event_tx, _) = mpsc::channel(100);
        let node_api = NodeApi::new()
            .with_event_sender(event_tx)
            .build()
            .unwrap();
        
        // 测试空数据存储(应该失败)
        let empty_result = node_api.store_data(vec![]).await;
        assert!(empty_result.is_err(), "空数据存储应该失败");
        
        // 测试无效键检索
        let invalid_key_result = node_api.retrieve_data(&"invalid_key".to_string()).await;
        assert!(invalid_key_result.is_err(), "无效键检索应该失败");
    }
}

// Cargo.toml 配置建议
/*
[package]
name = "sn_node_advanced_example"
version = "0.2.0"
edition = "2021"

[dependencies]
sn_node = "0.112.6"
tokio = { version = "1.0", features = ["full", "signal"] }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"

[dev-dependencies]
tokio = { version = "1.0", features = ["full", "test-util"] }

[features]
default = ["logging", "metrics"]
logging = ["sn_node/logging"]
metrics = ["sn_node/metrics"]
*/

这个完整的示例展示了 sn_node 库的高级用法,包括:

  1. 完整的节点生命周期管理 - 从启动到关闭的完整流程
  2. 高级错误处理 - 自定义错误处理模块和详细的错误信息
  3. 数据验证 - 使用内置的验证器确保数据完整性
  4. 事件处理 - 完善的事件监听和处理机制
  5. 复制管理 - 配置和管理数据复制功能
  6. 超时控制 - 网络操作超时设置和处理
  7. 测试用例 - 包含完整的单元测试和集成测试
  8. 配置建议 - 提供更完整的 Cargo.toml 配置建议

示例代码包含了详细的注释,帮助理解每个功能模块的作用和实现方式。


1 回复

Rust分布式存储与节点管理库sn_node的使用指南

概述

sn_node是一个基于Rust语言开发的分布式存储与节点管理库,专注于提供高效安全的数据存储和网络通信解决方案。该库采用先进的加密算法和分布式架构,确保数据在传输和存储过程中的安全性与可靠性。

核心特性

  • 分布式节点管理:自动发现、连接和管理网络中的节点
  • 安全通信:使用TLS加密和身份验证机制
  • 数据分片存储:支持数据自动分片和冗余备份
  • 高性能:采用异步I/O和零拷贝技术优化性能

安装方法

在Cargo.toml中添加依赖:

[dependencies]
sn_node = "0.3.0"

基本使用方法

1. 初始化节点

use sn_node::{Node, NodeConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = NodeConfig::default()
        .with_listen_addr("0.0.0.0:8080")
        .with_bootstrap_nodes(vec!["node1.example.com:8080".to_string()]);
    
    let mut node = Node::new(config).await?;
    node.start().await?;
    
    Ok(())
}

2. 存储数据

use sn_node::Storage;

async fn store_data() -> Result<(), Box<dyn std::error::Error>> {
    let storage = Storage::new();
    let data = b"Hello, Distributed World!";
    let key = storage.put(data).await?;
    
    println!("Data stored with key: {:?}", key);
    Ok(())
}

3. 检索数据

async fn retrieve_data(key: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
    let storage = Storage::new();
    let retrieved_data = storage.get(key).await?;
    
    println!("Retrieved data: {:?}", retrieved_data);
    Ok(())
}

4. 节点发现与管理

use sn_node::NodeManager;

async fn manage_nodes() -> Result<(), Box<dyn std::error::Error>> {
    let manager = NodeManager::new();
    
    // 发现网络中的节点
    let discovered_nodes = manager.discover_nodes().await?;
    println!("Discovered nodes: {:?}", discovered_nodes);
    
    // 获取节点状态
    for node in discovered_nodes {
        let status = manager.get_node_status(&node).await?;
        println!("Node {} status: {:?}", node, status);
    }
    
    Ok(())
}

高级功能示例

安全通信配置

use sn_node::{SecurityConfig, TlsConfig};

async fn secure_communication() -> Result<(), Box<dyn std::error::Error>> {
    let tls_config = TlsConfig::new()
        .with_certificate_file("cert.pem")?
        .with_private_key_file("key.pem")?;
    
    let security_config = SecurityConfig::new()
        .with_tls(tls_config)
        .with_authentication_required(true);
    
    let config = NodeConfig::default()
        .with_security(security_config);
    
    let node = Node::new(config).await?;
    Ok(())
}

数据复制策略

use sn_node::ReplicationStrategy;

async fn configure_replication() -> Result<(), Box<dyn std::error::Error>> {
    let strategy = ReplicationStrategy::new()
        .with_replication_factor(3)
        .with_consistency_level(Quorum);
    
    let storage = Storage::new()
        .with_replication_strategy(strategy);
    
    Ok(())
}

错误处理

use sn_node::Error;

async fn handle_errors() -> Result<(), Error> {
    match storage.get(invalid_key).await {
        Ok(data) => println!("Data: {:?}", data),
        Err(Error::KeyNotFound) => println!("Key not found"),
        Err(Error::NetworkError(e)) => println!("Network error: {}", e),
        Err(e) => println!("Other error: {}", e),
    }
    Ok(())
}

性能优化建议

  1. 使用连接池管理节点间连接
  2. 配置适当的数据分片大小
  3. 启用数据压缩减少网络传输
  4. 使用批处理操作减少网络往返

注意事项

  • 确保网络端口开放和防火墙配置正确
  • 定期备份节点配置和数据
  • 监控节点健康状况和网络状态
  • 使用强密码和定期更换证书

这个库为构建可靠的分布式存储系统提供了强大的基础功能,适合需要高性能、安全通信的分布式应用场景。

完整示例demo

//! sn_node完整使用示例
//! 展示如何初始化节点、存储数据、检索数据和节点管理

use sn_node::{Node, NodeConfig, Storage, NodeManager, Error};
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 初始化节点配置
    let config = NodeConfig::default()
        .with_listen_addr("0.0.0.0:8080") // 设置监听地址
        .with_bootstrap_nodes(vec![
            "node1.example.com:8080".to_string(), // 引导节点1
            "node2.example.com:8080".to_string()  // 引导节点2
        ]);
    
    // 2. 创建并启动节点
    println!("正在初始化节点...");
    let mut node = Node::new(config).await?;
    node.start().await?;
    println!("节点启动成功,监听端口: 8080");
    
    // 3. 存储示例数据
    println!("\n存储数据示例:");
    let storage = Storage::new();
    let sample_data = b"Hello, Distributed Storage System!";
    let key = storage.put(sample_data).await?;
    println!("数据存储成功,键: {:?}", key);
    
    // 4. 检索数据
    println!("\n检索数据示例:");
    match storage.get(&key).await {
        Ok(retrieved_data) => {
            println!("检索到的数据: {:?}", retrieved_data);
            if let Ok(data_str) = std::str::from_utf8(&retrieved_data) {
                println!("数据内容: {}", data_str);
            }
        }
        Err(Error::KeyNotFound) => {
            println!("错误: 未找到对应的键");
        }
        Err(e) => {
            println!("检索数据时发生错误: {}", e);
        }
    }
    
    // 5. 节点发现与管理
    println!("\n节点发现示例:");
    let manager = NodeManager::new();
    match manager.discover_nodes().await {
        Ok(nodes) => {
            println!("发现的节点数量: {}", nodes.len());
            for node in nodes {
                match manager.get_node_status(&node).await {
                    Ok(status) => {
                        println!("节点 {} 状态: {:?}", node, status);
                    }
                    Err(e) => {
                        println!("获取节点 {} 状态失败: {}", node, e);
                    }
                }
            }
        }
        Err(e) => {
            println!("节点发现失败: {}", e);
        }
    }
    
    // 6. 优雅关闭节点
    println!("\n正在关闭节点...");
    node.shutdown().await?;
    println!("节点已关闭");
    
    Ok(())
}

// 错误处理示例函数
async fn demonstrate_error_handling() -> Result<(), Error> {
    let storage = Storage::new();
    let invalid_key = b"non_existent_key";
    
    match storage.get(invalid_key).await {
        Ok(data) => {
            println!("意外找到数据: {:?}", data);
        }
        Err(Error::KeyNotFound) => {
            println!("预期错误: 键不存在");
        }
        Err(Error::NetworkError(e)) => {
            println!("网络错误: {}", e);
        }
        Err(e) => {
            println!("其他错误: {}", e);
        }
    }
    
    Ok(())
}

// 安全配置示例函数
async fn demonstrate_security_config() -> Result<(), Box<dyn std::error::Error>> {
    use sn_node::{SecurityConfig, TlsConfig};
    
    let tls_config = TlsConfig::new()
        .with_certificate_file("cert.pem")?
        .with_private_key_file("key.pem")?;
    
    let security_config = SecurityConfig::new()
        .with_tls(tls_config)
        .with_authentication_required(true);
    
    let secure_config = NodeConfig::default()
        .with_listen_addr("0.0.0.0:8443")
        .with_security(security_config);
    
    let secure_node = Node::new(secure_config).await?;
    println!("安全节点配置完成");
    
    Ok(())
}
回到顶部