Rust Google Firestore gRPC客户端库firestore_grpc的使用,支持高效NoSQL数据库操作与实时数据同步

Rust Google Firestore gRPC客户端库firestore_grpc的使用,支持高效NoSQL数据库操作与实时数据同步

环境变量设置

export PROJECT_ID=YOUR_PROJECT_ID
export TOKEN=`gcloud auth print-access-token`

Cargo.toml配置

[dependencies]
firestore_grpc = "0.109"
tokio = {version = "1.0", features = ["full"]}

示例代码

use firestore_grpc::tonic::{
    codegen::InterceptedService,
    metadata::MetadataValue,
    transport::{Channel, ClientTlsConfig},
    Request, Status,
};
use firestore_grpc::v1::{
    firestore_client::FirestoreClient, value::ValueType, CreateDocumentRequest, Document, Value,
};
use std::env;

const URL: &'static str = "https://firestore.googleapis.com";
const DOMAIN: &'static str = "firestore.googleapis.com";

pub type BoxError = Box<dyn std::error::Error + Sync + Send + 'static>;

fn get_token() -> String {
    env::var("TOKEN").unwrap()
}

fn get_project_id() -> String {
    env::var("PROJECT_ID").unwrap()
}

async fn get_client() -> Result<FirestoreClient<InterceptedService<Channel, impl Fn(Request<()>) -> Result<Request<()>, Status>>>, BoxError> {
    let endpoint = Channel::from_static(URL).tls_config(ClientTlsConfig::new().domain_name(DOMAIN));

    let bearer_token = format!("Bearer {}", get_token());
    let header_value = MetadataValue::from_str(&bearer_token)?;

    let channel = endpoint?.connect().await?;

    let service = FirestoreClient::with_interceptor(channel, move |mut req: Request<()>| {
        req.metadata_mut()
            .insert("authorization", header_value.clone());
        Ok(req)
    });
    Ok(service)
}

async fn create_document() -> Result<Document, BoxError> {
    let parent = format!(
        "projects/{}/databases/(default)/documents",
        get_project_id()
    );
    let collection_id = "greetings".into();
    let document_id = "".into();
    let mut fields = std.collections::HashMap::new();
    fields.insert(
        "message".into(),
        Value {
            value_type: Some(ValueType::StringValue("Hello world!".into())),
        },
    );
    let document = Some(Document {
        name: "".into(),
        fields,
        create_time: None,
        update_time: None,
    });
    let res = get_client()
        .await?
        .create_document(CreateDocumentRequest {
            parent,
            collection_id,
            document_id,
            document,
            mask: None,
        })
        .await?;
    Ok(res.into_inner())
}

#[tokio::main]
async fn main() {
    create_document().await.unwrap();
}

完整示例说明

  1. 首先需要设置环境变量PROJECT_IDTOKEN
  2. 在Cargo.toml中添加依赖项
  3. 代码主要功能:
    • get_token()get_project_id()获取环境变量
    • get_client()创建Firestore gRPC客户端
    • create_document()创建新文档示例
    • 使用tokio::main宏运行异步主函数

这个示例展示了如何使用firestore_grpc库创建Firestore文档的基本操作,实际应用中还需要实现更多功能如token更新等。

完整示例代码

// 引入必要的库和模块
use firestore_grpc::tonic::{
    codegen::InterceptedService,
    metadata::MetadataValue,
    transport::{Channel, ClientTlsConfig},
    Request, Status,
};
use firestore_grpc::v1::{
    firestore_client::FirestoreClient, 
    value::ValueType, 
    CreateDocumentRequest, 
    Document, 
    Value,
    GetDocumentRequest,
    UpdateDocumentRequest,
    DeleteDocumentRequest,
    ListDocumentsRequest
};
use std::env;
use std::collections::HashMap;

// 定义常量和类型
const FIRESTORE_URL: &str = "https://firestore.googleapis.com";
const FIRESTORE_DOMAIN: &str = "firestore.googleapis.com";
pub type BoxError = Box<dyn std::error::Error + Sync + Send + 'static>;

// 获取认证token
fn get_token() -> String {
    env::var("TOKEN").expect("TOKEN环境变量未设置")
}

// 获取项目ID
fn get_project_id() -> String {
    env::var("PROJECT_ID").expect("PROJECT_ID环境变量未设置")
}

// 构建Firestore客户端
async fn get_client() -> Result<FirestoreClient<InterceptedService<Channel, impl Fn(Request<()>) -> Result<Request<()>, Status>>>, BoxError> {
    let endpoint = Channel::from_static(FIRESTORE_URL)
        .tls_config(ClientTlsConfig::new().domain_name(FIRESTORE_DOMAIN));

    let bearer_token = format!("Bearer {}", get_token());
    let header_value = MetadataValue::from_str(&bearer_token)?;

    let channel = endpoint?.connect().await?;

    let client = FirestoreClient::with_interceptor(channel, move |mut req: Request<()>| {
        req.metadata_mut()
            .insert("authorization", header_value.clone());
        Ok(req)
    });
    Ok(client)
}

// 创建文档
async fn create_document(collection: &str, data: HashMap<String, Value>) -> Result<Document, BoxError> {
    let parent = format!("projects/{}/databases/(default)/documents", get_project_id());
    
    let document = Some(Document {
        name: "".to_string(),
        fields: data,
        create_time: None,
        update_time: None,
    });
    
    let response = get_client()
        .await?
        .create_document(CreateDocumentRequest {
            parent,
            collection_id: collection.to_string(),
            document_id: "".to_string(), // 空字符串让Firestore自动生成ID
            document,
            mask: None,
        })
        .await?;
    
    Ok(response.into_inner())
}

// 获取文档
async fn get_document(document_path: &str) -> Result<Document, BoxError> {
    let response = get_client()
        .await?
        .get_document(GetDocumentRequest {
            name: document_path.to_string(),
            mask: None,
            consistency_selector: None,
        })
        .await?;
    
    Ok(response.into_inner())
}

// 更新文档
async fn update_document(document_path: &str, data: HashMap<String, Value>) -> Result<Document, BoxError> {
    let document = Document {
        name: document_path.to_string(),
        fields: data,
        create_time: None,
        update_time: None,
    };
    
    let response = get_client()
        .await?
        .update_document(UpdateDocumentRequest {
            document: Some(document),
            update_mask: None,
            mask: None,
            current_document: None,
        })
        .await?;
    
    Ok(response.into_inner())
}

// 删除文档
async fn delete_document(document_path: &str) -> Result<(), BoxError> {
    get_client()
        .await?
        .delete_document(DeleteDocumentRequest {
            name: document_path.to_string(),
            current_document: None,
        })
        .await?;
    
    Ok(())
}

// 列出集合中的文档
async fn list_documents(collection_path: &str) -> Result<Vec<Document>, BoxError> {
    let response = get_client()
        .await?
        .list_documents(ListDocumentsRequest {
            parent: collection_path.to_string(),
            collection_id: "".to_string(), // 从parent中解析
            page_size: 0,
            page_token: "".to_string(),
            order_by: "".to_string(),
            mask: None,
            show_missing: false,
            consistency_selector: None,
        })
        .await?;
    
    Ok(response.into_inner().documents)
}

#[tokio::main]
async fn main() {
    // 示例1: 创建文档
    let mut data = HashMap::new();
    data.insert("message".to_string(), Value {
        value_type: Some(ValueType::StringValue("Hello Firestore!".to_string())),
    });
    
    match create_document("test_collection", data).await {
        Ok(doc) => println!("创建文档成功: {:?}", doc),
        Err(e) => eprintln!("创建文档失败: {}", e),
    }
    
    // 示例2: 获取文档
    let doc_path = "projects/YOUR_PROJECT_ID/databases/(default)/documents/test_collection/auto_generated_id";
    match get_document(doc_path).await {
        Ok(doc) => println!("获取文档成功: {:?}", doc),
        Err(e) => eprintln!("获取文档失败: {}", e),
    }
}

完整功能说明

  1. 环境配置

    • 需要设置PROJECT_IDTOKEN环境变量
    • 在Cargo.toml中添加firestore_grpctokio依赖
  2. 核心功能

    • get_client(): 创建经过身份验证的Firestore gRPC客户端
    • create_document(): 在指定集合中创建新文档
    • get_document(): 根据路径获取特定文档
    • update_document(): 更新现有文档
    • delete_document(): 删除指定文档
    • list_documents(): 列出集合中的所有文档
  3. 使用说明

    • 所有操作都是异步的,需要使用tokio::main
    • 文档数据使用HashMap<String, Value>表示
    • 错误处理使用BoxError类型
  4. 扩展建议

    • 实现token自动刷新机制
    • 添加文档字段验证
    • 支持更复杂的查询操作
    • 实现实时数据监听功能

这个完整示例提供了Firestore的基本CRUD操作实现,可以作为开发更复杂应用的起点。


1 回复

以下是基于您提供的内容整理的完整示例demo,包含Firestore gRPC客户端库的主要使用场景:

完整示例demo

use firestore_grpc::v1::{
    firestore_client::FirestoreClient, 
    Document, DocumentMask, GetDocumentRequest, 
    CreateDocumentRequest, Value, MapValue,
    ListenRequest, ListenResponse,
    BatchWriteRequest, Write,
    BeginTransactionRequest, CommitRequest
};
use tonic::{transport::Channel, metadata::MetadataValue};
use futures::StreamExt;
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. 初始化认证客户端
    let mut client = create_authenticated_client().await?;
    let project_id = "your-project-id";
    let database_id = "(default)";
    let collection_id = "users";
    let document_id = "user123";

    // 2. 创建文档
    let mut fields = HashMap::new();
    fields.insert("name".to_string(), Value { value_type: Some(value::ValueType::StringValue("John".to_string())) });
    fields.insert("age".to_string(), Value { value_type: Some(value::ValueType::IntegerValue(30)) });
    
    let data = MapValue { fields };
    create_document(&mut client, project_id, database_id, collection_id, document_id, data).await?;

    // 3. 获取文档
    let doc_path = format!("{}/{}", collection_id, document_id);
    get_document(&mut client, project_id, database_id, &doc_path).await?;

    // 4. 批量操作
    let writes = vec![
        Write {
            operation: Some(write::Operation::Update(Document {
                name: format!("projects/{}/databases/{}/documents/{}/{}", project_id, database_id, collection_id, document_id),
                fields: HashMap::new(), // 实际字段
                create_time: None,
                update_time: None,
            })),
            update_mask: None,
            current_document: None,
        }
    ];
    batch_write(&mut client, project_id, database_id, writes).await?;

    // 5. 事务操作
    run_transaction(&mut client, project_id, database_id).await?;

    // 6. 实时监听 (在单独任务中运行)
    tokio::spawn(async move {
        listen_to_changes(&mut client, project_id, database_id, &doc_path).await.unwrap();
    });

    Ok(())
}

// 认证客户端创建
async fn create_authenticated_client() -> Result<FirestoreClient<Channel>, Box<dyn std::error::Error>> {
    let token = std::env::var("GOOGLE_APPLICATION_CREDENTIALS")?; // 从环境变量获取
    
    let channel = Channel::from_static("https://firestore.googleapis.com")
        .connect()
        .await?;
    
    let client = FirestoreClient::with_interceptor(channel, move |mut req| {
        let token: MetadataValue<_> = format!("Bearer {}", token).parse().unwrap();
        req.metadata_mut().insert("authorization", token);
        Ok(req)
    });
    
    Ok(client)
}

// 获取文档
async fn get_document(
    client: &mut FirestoreClient<Channel>,
    project_id: &str,
    database_id: &str,
    document_path: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tonic::Request::new(GetDocumentRequest {
        name: format!(
            "projects/{}/databases/{}/documents/{}",
            project_id, database_id, document_path
        ),
        mask: Some(DocumentMask {
            field_paths: vec!["name".to_string()], // 只获取name字段
        }),
        consistency_selector: None,
    });
    
    let response = client.get_document(request).await?;
    println!("获取的文档: {:?}", response.into_inner());
    
    Ok(())
}

// 创建文档
async fn create_document(
    client: &mut FirestoreClient<Channel>,
    project_id: &str,
    database_id: &str,
    collection_id: &str,
    document_id: &str,
    data: MapValue,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tonic::Request::new(CreateDocumentRequest {
        parent: format!("projects/{}/databases/{}/documents", project_id, database_id),
        collection_id: collection_id.to_string(),
        document_id: document_id.to_string(),
        document: Some(Document {
            name: "".to_string(),
            fields: data.fields,
            create_time: None,
            update_time: None,
        }),
        mask: None,
    });
    
    let response = client.create_document(request).await?;
    println!("创建的文档: {:?}", response.into_inner());
    
    Ok(())
}

// 实时监听
async fn listen_to_changes(
    client: &mut FirestoreClient<Channel>,
    project_id: &str,
    database_id: &str,
    document_path: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tonic::Request::new(tonic::Streaming::new(futures::stream::iter(vec![
        ListenRequest {
            database: format!("projects/{}/databases/{}", project_id, database_id),
            target_change: Some(listen_request::TargetChange::AddTarget(
                Target {
                    target_id: 1234,
                    target_type: Some(target::TargetType::Documents(
                        Target::DocumentsTarget {
                            documents: vec![document_path.to_string()],
                        },
                    )),
                    resume_token: vec![],
                },
            )),
            labels: HashMap::new(),
        },
    ])));
    
    let mut stream = client.listen(request).await?.into_inner();
    
    while let Some(response) = stream.next().await {
        match response? {
            ListenResponse { response_type: Some(response) } => {
                match response {
                    listen_response::ResponseType::DocumentChange(doc_change) => {
                        println!("文档变更: {:?}", doc_change);
                    }
                    listen_response::ResponseType::DocumentDelete(doc_delete) => {
                        println!("文档删除: {:?}", doc_delete);
                    }
                    _ => {}
                }
            }
            _ => {}
        }
    }
    
    Ok(())
}

// 批量写入
async fn batch_write(
    client: &mut FirestoreClient<Channel>,
    project_id: &str,
    database_id: &str,
    writes: Vec<Write>,
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tonic::Request::new(BatchWriteRequest {
        database: format!("projects/{}/databases/{}", project_id, database_id),
        writes,
        labels: HashMap::new(),
    });
    
    let response = client.batch_write(request).await?;
    println!("批量写入结果: {:?}", response.into_inner());
    
    Ok(())
}

// 事务操作
async fn run_transaction(
    client: &mut FirestoreClient<Channel>,
    project_id: &str,
    database_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    // 开始事务
    let begin_request = tonic::Request::new(BeginTransactionRequest {
        database: format!("projects/{}/databases/{}", project_id, database_id),
        options: None,
    });
    
    let begin_response = client.begin_transaction(begin_request).await?;
    let transaction_id = begin_response.into_inner().transaction;
    
    // 提交事务
    let commit_request = tonic::Request::new(CommitRequest {
        database: format!("projects/{}/databases/{}", project_id, database_id),
        writes: vec![], // 实际写入操作
        transaction: transaction_id,
    });
    
    let commit_response = client.commit(commit_request).await?;
    println!("事务提交结果: {:?}", commit_response.into_inner());
    
    Ok(())
}

关键点说明

  1. 环境设置

    • 需要设置GOOGLE_APPLICATION_CREDENTIALS环境变量指向服务账号密钥文件
    • 需要安装firestore_grpctokio等依赖
  2. 功能覆盖

    • 包含认证客户端创建
    • 文档CRUD操作
    • 批量写入
    • 事务支持
    • 实时监听
  3. 最佳实践

    • 客户端复用
    • 错误处理
    • 字段掩码优化
  4. 注意事项

    • 生产环境应使用更安全的认证方式
    • 实时监听会保持长连接
    • 注意Firestore的配额限制

这个完整示例展示了如何使用firestore_grpc库进行常见的Firestore操作,您可以根据实际需求调整和扩展。

回到顶部