Rust HDFS原生绑定库hdfs-native的使用,支持高性能分布式文件系统HDFS操作与数据读写
Rust原生HDFS客户端
hdfs-native
是一个用Rust原生编写的HDFS客户端。它支持HDFS客户端几乎所有主要功能,以及下面列出的几个关键客户端配置选项。
支持的HDFS功能
以下是当前支持和未来可能支持但尚未支持的功能列表。
HDFS操作
- [x] 列表
- [x] 读取
- [x] 写入
- [x] 重命名
- [x] 删除
- [x] 基本权限和所有权
- [x] ACL
- [x] 内容摘要
- [x] 设置复制因子
- [x] 设置时间戳
HDFS特性
- [x] 名称服务
- [x] 观察者读取
- [x] ViewFS
- [x] 基于路由器的联邦
- [x] 纠删码读写
- 仅支持RS模式,不支持RS-Legacy或XOR
安全特性
- [x] Kerberos认证(GSSAPI SASL支持)(需要libgssapi_krb5,见下文)
- [x] 令牌认证(DIGEST-MD5 SASL支持)
- [x] NameNode SASL连接
- [x] DataNode SASL连接
- [x] DataNode数据传输加密
- [ ] 静态加密(KMS支持)
Kerberos支持
Kerberos(SASL GSSAPI)机制通过运行时动态链接到libgssapi_krb5
来支持。这必须单独安装,但可能已经安装在您的系统上。如果没有,您可以通过以下方式安装:
Debian系统
apt-get install libgssapi-krb5-2
RHEL系统
yum install krb5-libs
MacOS
brew install krb5
Windows
从下载并安装Microsoft Kerberos包
复制<INSTALL FOLDER>\MIT\Kerberos\bin\gssapi64.dll
文件到%PATH%中的文件夹,并将名称更改为gssapi_krb5.dll
支持的HDFS设置
客户端将尝试在$HADOOP_CONF_DIR
目录中读取Hadoop配置core-site.xml
和hdfs-site.xml
,如果该目录不存在,则在$HADOOP_HOME/etc/hadoop
中读取。当前使用的支持的配置有:
fs.defaultFS
- Client::default()支持dfs.ha.namenodes
- 名称服务支持dfs.namenode.rpc-address.*
- 名称服务支持dfs.client.failover.resolve-needed.*
- 基于DNS的NameNode发现dfs.client.failover.resolver.useFQDN.*
- 基于DNS的NameNode发现dfs.client.failover.random.order.*
- 随机化尝试NameNode的顺序dfs.client.failover.proxy.provider.*
- 支持以下代理提供程序的行为。任何其他值将默认回退到ConfiguredFailoverProxyProvider
行为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider
org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider
dfs.client.block.write.replace-datanode-on-failure.enable
dfs.client.block.write.replace-datanode-on-failure.policy
dfs.client.block.write.replace-datanode-on-failure.best-effort
fs.viewfs.mounttable.*.link.*
- ViewFS链接fs.viewfs.mounttable.*.linkFallback
- ViewFS链接回退
所有其他设置通常假定为当前默认值。例如,安全性被假定为启用,并且始终进行SASL协商,但在不安全的集群上,这将仅进行SIMPLE认证。需要其他自定义Hadoop客户端配置的任何设置可能无法正常工作。
构建
cargo build
对象存储实现
HDFS的对象存储实现在hdfs-native-object-store crate中提供。
运行测试
测试主要是集成测试,利用rust/mindifs/
中的一个小型Java应用程序运行自定义MiniDFSCluster
。要运行测试,您需要安装Java、Maven、Hadoop二进制文件和Kerberos工具,并在您的路径上可用。任何Java版本8到17之间都应该可以工作。
cargo test -p hdfs-native --features intergation-test
Python测试
请参阅Python README
运行基准测试
一些基准测试通过fs-hdfs3 crate通过libhdfs与基于JVM的客户端进行性能比较。因此,运行基准测试需要一些额外的设置:
export HADOOP_CONF_DIR=$(pwd)/rust/target/test
export CLASSPATH=$(hadoop classpath)
然后您可以使用以下命令运行基准测试:
cargo bench -p hdfs-native --features benchmark
benchmark
功能需要暴露minidfs
和内部纠删码功能以进行基准测试。
运行示例
示例使用minidfs
模块创建一个简单的HDFS集群来运行示例。这需要包含integration-test
功能以启用minidfs
模块。或者,如果您想针对现有的HDFS集群运行示例,可以排除integration-test
功能,并确保您的HADOOP_CONF_DIR
指向包含与您的集群通信的HDFS配置的目录。
cargo run --example simple --features integration-test
完整示例代码
use hdfs_native::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建HDFS客户端
let client = Client::default()?;
// 列出根目录下的文件
let entries = client.list_status("/").await?;
for entry in entries {
println!("Name: {}, Type: {:?}", entry.path, entry.file_type);
}
// 创建并写入文件
let path = "/test_file.txt";
let mut writer = client.create(path).await?;
writer.write_all(b"Hello, HDFS from Rust!").await?;
writer.close().await?;
// 读取文件内容
let mut reader = client.read(path).await?;
let mut content = Vec::new();
reader.read_to_end(&mut content).await?;
println!("File content: {}", String::from_utf8_lossy(&content));
// 删除文件
client.delete(path, false).await?;
Ok(())
}
use hdfs_native::Client;
use std::io::Cursor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 使用特定配置创建客户端
let client = Client::new("hdfs://namenode:8020")?;
// 创建目录
client.mkdirs("/test_directory", 0o755).await?;
// 写入多个文件
for i in 0..5 {
let file_path = format!("/test_directory/file_{}.txt", i);
let content = format!("This is file number {}", i);
let mut writer = client.create(&file_path).await?;
writer.write_all(content.as_bytes()).await?;
writer.close().await?;
}
// 递归列出目录内容
let entries = client.list_status_recursive("/test_directory").await?;
for entry in entries {
println!("Path: {}, Size: {} bytes", entry.path, entry.length);
}
// 重命名目录
client.rename("/test_directory", "/renamed_directory").await?;
// 删除目录
client.delete("/renamed_directory", true).await?;
Ok(())
}
use hdfs_native::Client;
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建支持Kerberos认证的客户端
let client = Client::default()?;
// 检查文件是否存在
let test_file = "/large_file.dat";
if client.exists(test_file).await? {
println!("File already exists, deleting...");
client.delete(test_file, false).await?;
}
// 创建大文件并分块写入
let mut writer = client.create(test_file).await?;
let chunk_size = 1024 * 1024; // 1MB chunks
let total_size = 10 * 1024 * 1024; // 10MB total
for i in 0..(total_size / chunk
Rust HDFS原生绑定库hdfs-native使用指南
概述
hdfs-native是一个Rust语言的HDFS原生绑定库,提供了高性能的分布式文件系统操作接口。该库支持HDFS文件系统的连接管理、文件读写、目录操作等核心功能,为Rust开发者提供了访问Hadoop分布式文件系统的便捷方式。
安装配置
在Cargo.toml中添加依赖:
[dependencies]
hdfs-native = "0.3"
基本使用方法
1. 连接HDFS集群
use hdfs_native::Client;
async fn connect_hdfs() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("hdfs://namenode:9000")?;
// 使用client进行后续操作
Ok(())
}
2. 文件读写操作
use hdfs_native::{Client, WriteOptions};
async fn file_operations() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("hdfs://localhost:9000")?;
// 写入文件
let mut file = client
.create("/test/example.txt", WriteOptions::default())
.await?;
file.write(b"Hello, HDFS!").await?;
file.close().await?;
// 读取文件
let mut file = client.open("/test/example.txt").await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
println!("File content: {}", String::from_utf8_lossy(&contents));
Ok(())
}
3. 目录操作
async fn directory_operations() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("hdfs://localhost:9000")?;
// 创建目录
client.mkdir("/test/new_directory", 0o755).await?;
// 列出目录内容
let entries = client.list_status("/test").await?;
for entry in entries {
println!("{}", entry.path());
}
// 删除文件
client.delete("/test/example.txt", false).await?;
Ok(())
}
4. 文件信息查询
async fn file_info() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("hdfs://localhost:9000")?;
let status = client.get_file_status("/test/example.txt").await?;
println!("File size: {}", status.len());
println!("Is directory: {}", status.is_dir());
println!("Modification time: {:?}", status.modified());
Ok(())
}
高级特性
1. 异步流式读写
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn stream_operations() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("hdfs://localhost:9000")?;
// 大文件流式写入
let mut file = client.create("/large/file.bin", WriteOptions::default()).await?;
for _ in 0..1000 {
file.write(&[0u8; 1024]).await?;
}
file.close().await?;
Ok(())
}
2. 错误处理示例
use hdfs_native::HdfsError;
async fn error_handling() -> Result<(), HdfsError> {
let client = Client::new("hdfs://invalid-host:9000")?;
match client.open("/non-existent-file.txt").await {
Ok(file) => {
// 文件操作
Ok(())
}
Err(HdfsError::FileNotFound(_)) => {
eprintln!("文件不存在");
Ok(())
}
Err(e) => Err(e),
}
}
完整示例demo
use hdfs_native::{Client, WriteOptions, HdfsError};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接HDFS集群
let client = Client::new("hdfs://localhost:9000")?;
println!("成功连接到HDFS集群");
// 创建测试目录
client.mkdir("/test", 0o755).await?;
println!("创建目录 /test");
// 文件写入操作
let mut file = client
.create("/test/example.txt", WriteOptions::default())
.await?;
file.write(b"Hello, HDFS! This is a test file.").await?;
file.close().await?;
println!("文件写入完成");
// 文件读取操作
let mut file = client.open("/test/example.txt").await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
println!("文件内容: {}", String::from_utf8_lossy(&contents));
// 文件信息查询
let status = client.get_file_status("/test/example.txt").await?;
println!("文件大小: {} bytes", status.len());
println!("是否为目录: {}", status.is_dir());
println!("修改时间: {:?}", status.modified());
// 目录列表操作
println!("\n目录 /test 中的文件:");
let entries = client.list_status("/test").await?;
for entry in entries {
println!("- {}", entry.path());
}
// 错误处理示例
match client.open("/non-existent-file.txt").await {
Ok(_) => println!("文件存在"),
Err(HdfsError::FileNotFound(_)) => {
println!("文件不存在,这是预期的错误处理示例");
}
Err(e) => return Err(Box::new(e)),
}
// 清理操作
client.delete("/test/example.txt", false).await?;
client.delete("/test", false).await?;
println!("清理测试文件和目录完成");
Ok(())
}
性能优化建议
- 使用连接池管理HDFS连接
- 合理设置缓冲区大小
- 批量操作减少RPC调用次数
- 使用异步操作提高并发性能
注意事项
- 确保HDFS集群正常运行且网络可达
- 配置正确的HDFS服务地址和端口
- 处理适当的文件权限和用户认证
- 考虑网络延迟对性能的影响
该库提供了完整的HDFS操作接口,适合需要与Hadoop生态系统集成的Rust应用程序使用。