Rust一致性哈希库pingora-ketama的使用,高性能分布式系统负载均衡解决方案

cargo add pingora-ketama

pingora-ketama = “0.6.0”

use pingora_ketama::{Ketama, Server};

fn main() {
    // 创建一致性哈希环
    let mut ring = Ketama::new();
    
    // 添加服务器节点,每个节点有相应的权重
    ring.add(Server::new("server1", 10));
    ring.add(Server::new("server2", 20));
    ring.add(Server::new("server3", 30));
    
    // 构建哈希环
    ring.build();
    
    // 根据键查找对应的服务器
    let key = "user_session_12345";
    let server = ring.get(key);
    
    println!("Key '{}' 被路由到服务器: {}", key, server.name());
    
    // 示例:模拟多个键的路由
    let keys = ["item_1", "item_2", "item_3", "item_4", "item_5"];
    for key in keys.iter() {
        let server = ring.get(key);
        println!("Key '{}' -> 服务器: {}", key, server.name());
    }
}
use pingora_ketama::{Ketama, Server};
use std::collections::HashMap;

// 分布式缓存节点结构
struct CacheNode {
    name: String,
    data: HashMap<String, String>,
}

impl CacheNode {
    fn new(name: &str) -> Self {
        Self {
            name: name.to_string(),
            data: HashMap::new(),
        }
    }
    
    fn store(&mut self, key: &str, value: &str) {
        self.data.insert(key.to_string(), value.to_string());
    }
    
    fn retrieve(&self, key: &str) -> Option<&String> {
        self.data.get(key)
    }
}

fn main() {
    // 初始化一致性哈希环
    let mut ring = Ketama::new();
    
    // 创建缓存节点
    let mut nodes = vec![
        CacheNode::new("cache-node-1"),
        CacheNode::new("cache-node-2"),
        CacheNode::new("cache-node-3"),
        CacheNode::new("cache-node-4"),
    ];
    
    // 添加服务器到哈希环,设置不同的权重
    for (i, node) in nodes.iter().enumerate() {
        ring.add(Server::new(&node.name, (i + 1) as u32 * 10));
    }
    ring.build();
    
    // 模拟数据存储
    let data_items = [
        ("user:1001", "用户数据A"),
        ("user:1002", "用户数据B"),
        ("product:2001", "产品数据A"),
        ("product:2002", "产品数据B"),
        ("session:3001", "会话数据A"),
    ];
    
    println!("=== 数据分布 ===");
    for (key, value) in data_items.iter() {
        let server_name = ring.get(key).name();
        if let Some(node) = nodes.iter_mut().find(|n| n.name == server_name) {
            node.store(key, value);
            println!("存储: {} -> {} (节点: {})", key, value, server_name);
        }
    }
    
    println!("\n=== 数据检索 ===");
    // 测试数据检索
    let test_keys = ["user:1001", "product:2002", "session:3001"];
    for key in test_keys.iter() {
        let server_name = ring.get(key).name();
        if let Some(node) = nodes.iter().find(|n| n.name == server_name) {
            if let Some(value) = node.retrieve(key) {
                println!("检索: {} -> {} (节点: {})", key, value, server_name);
            } else {
                println!("检索: {} -> 未找到 (节点: {})", key, server_name);
            }
        }
    }
    
    println!("\n=== 节点负载统计 ===");
    for node in nodes.iter() {
        println!("节点 {}: {} 个数据项", node.name, node.data.len());
    }
    
    // 模拟节点故障(移除一个节点)
    println!("\n=== 模拟节点故障 ===");
    ring.remove("cache-node-2");
    ring.build();
    
    // 测试故障后数据重新分布
    let key = "user:1001";
    let new_server = ring.get(key).name();
    println!("节点故障后,键 '{}' 被重新路由到: {}", key, new_server);
}

1 回复

Rust一致性哈希库pingora-ketama使用指南

概述

pingora-ketama是一个基于Rust实现的高性能一致性哈希库,专门为分布式系统负载均衡场景设计。该库采用Ketama算法实现,能够有效解决传统哈希算法在节点增减时导致的大量数据迁移问题。

核心特性

  • 高性能:基于Rust零成本抽象实现
  • 低内存占用:优化的数据结构设计
  • 线程安全:支持多线程并发访问
  • 易于集成:简洁的API接口设计

安装方法

在Cargo.toml中添加依赖:

[dependencies]
pingora-ketama = "0.3"

基本使用方法

1. 创建一致性哈希环

use pingora_ketama::Ketama;

fn main() {
    // 初始化哈希环
    let mut ketama = Ketama::new();
    
    // 添加节点及其权重
    ketama.add_node("server1", 1);
    ketama.add_node("server2", 2);
    ketama.add_node("server3", 1);
    
    // 构建哈希环
    ketama.build();
}

2. 查找节点

let key = "user_session_12345";
if let Some(server) = ketama.get_node(key) {
    println!("Key {} 被路由到服务器: {}", key, server);
}

3. 批量节点操作示例

use std::collections::HashMap;

// 批量添加节点
let nodes = vec![
    ("server1", 3),
    ("server2", 2),
    ("server3", 1),
    ("server4", 4),
];

for (node, weight) in nodes {
    ketama.add_node(node, weight);
}
ketama.build();

// 测试多个键的路由
let test_keys = vec!["key1", "key2", "key3", "key4"];
for key in test_keys {
    if let Some(node) = ketama.get_node(key) {
        println!("{} -> {}", key, node);
    }
}

4. 动态节点管理

// 移除节点
ketama.remove_node("server1");
ketama.build(); // 需要重新构建

// 更新节点权重
ketama.update_node("server2", 5);
ketama.build();

高级用法

自定义哈希函数

use pingora_ketama::{Ketama, HashAlgorithm};
use std::hash::{Hash, Hasher};
use twox_hash::XxHash64;

let mut ketama = Ketama::with_hasher(|| {
    XxHash64::with_seed(0)
});

性能优化配置

let mut ketama = Ketama::new()
    .with_virtual_nodes(160)  // 设置虚拟节点数
    .with_hash_algorithm(HashAlgorithm::MD5); // 设置哈希算法

实际应用场景

Web服务器负载均衡

use pingora_ketama::Ketama;

struct LoadBalancer {
    ketama: Ketama<String>,
    servers: HashMap<String, ServerInfo>,
}

impl LoadBalancer {
    fn route_request(&self, session_id: &str) -> Option<&ServerInfo> {
        self.ketama.get_node(session_id)
            .and_then(|server_name| self.servers.get(server_name))
    }
}

分布式缓存系统

fn get_cached_data(key: &str, ketama: &Ketama) -> Option<Vec<u8>> {
    if let Some(cache_node) = ketama.get_node(key) {
        // 连接到对应的缓存节点获取数据
        connect_to_cache_node(cache_node)
            .get(key)
            .ok()
    } else {
        None
    }
}

完整示例demo

use pingora_ketama::Ketama;
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;

// 服务器信息结构
#[derive(Debug, Clone)]
struct ServerInfo {
    address: String,
    port: u16,
    healthy: bool,
}

fn main() {
    // 创建一致性哈希环实例
    let mut ketama = Ketama::new();
    
    // 添加服务器节点及其权重
    let servers = vec![
        ("server1", 3),  // 权重为3
        ("server2", 2),  // 权重为2  
        ("server3", 1),  // 权重为1
        ("server4", 4),  // 权重为4
    ];
    
    for (server_name, weight) in servers {
        ketama.add_node(server_name, weight);
    }
    
    // 构建哈希环
    ketama.build();
    
    // 创建服务器信息映射
    let mut server_info_map = HashMap::new();
    server_info_map.insert("server1", ServerInfo {
        address: "192.168.1.101".to_string(),
        port: 8080,
        healthy: true,
    });
    server_info_map.insert("server2", ServerInfo {
        address: "192.168.1.102".to_string(),
        port: 8080,
        healthy: true,
    });
    server_info_map.insert("server3", ServerInfo {
        address: "192.168.1.103".to_string(),
        port: 8080,
        healthy: true,
    });
    server_info_map.insert("server4", ServerInfo {
        address: "192.168.1.104".to_string(),
        port: 8080,
        healthy: true,
    });
    
    // 测试键的路由
    let test_keys = vec![
        "user_session_12345",
        "user_session_67890", 
        "cache_key_abc",
        "cache_key_def",
        "request_id_001",
        "request_id_002",
    ];
    
    println!("=== 初始路由测试 ===");
    for key in &test_keys {
        if let Some(server_name) = ketama.get_node(key) {
            if let Some(server_info) = server_info_map.get(server_name) {
                println!("键 '{}' -> 服务器: {}:{}", key, server_info.address, server_info.port);
            }
        }
    }
    
    // 模拟动态节点管理
    println!("\n=== 移除server1后的路由测试 ===");
    ketama.remove_node("server1");
    ketama.build(); // 重新构建哈希环
    
    for key in &test_keys {
        if let Some(server_name) = ketama.get_node(key) {
            if let Some(server_info) = server_info_map.get(server_name) {
                println!("键 '{}' -> 服务器: {}:{}", key, server_info.address, server_info.port);
            }
        }
    }
    
    // 多线程安全测试
    println!("\n=== 多线程并发访问测试 ===");
    let arc_ketama = Arc::new(ketama);
    
    let handles: Vec<_> = (0..4).map(|i| {
        let ketama_clone = Arc::clone(&arc_ketama);
        thread::spawn(move || {
            let key = format!("thread_{}_key", i);
            if let Some(server_name) = ketama_clone.get_node(&key) {
                println!("线程 {}: 键 '{}' -> 服务器: {}", i, key, server_name);
            }
        })
    }).collect();
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    // 性能优化配置示例
    println!("\n=== 性能优化配置示例 ===");
    let optimized_ketama = Ketama::new()
        .with_virtual_nodes(200)  // 设置200个虚拟节点
        .with_hash_algorithm(pingora_ketama::HashAlgorithm::MD5);
    
    println!("已创建优化配置的哈希环实例");
}

性能建议

  1. 虚拟节点数建议设置为100-200之间以获得最佳分布效果
  2. 在节点变更后务必调用build()方法重新构建哈希环
  3. 对于读多写少的场景,可以考虑使用Arc包装实现多线程共享

注意事项

  • 确保所有节点的权重值为正整数
  • 节点名称需要保证唯一性
  • 哈希环构建后,节点的增删改需要重新构建才能生效

这个库特别适合需要高性能负载均衡的分布式系统,如微服务架构、缓存集群、分布式存储等场景。

回到顶部