Rust Substrate框架插件库polkadot-node-subsystem-types的使用:实现波卡节点子系统类型定义与消息交互
Rust Substrate框架插件库polkadot-node-subsystem-types的使用:实现波卡节点子系统类型定义与消息交互
安装
在项目目录中运行以下Cargo命令:
cargo add polkadot-node-subsystem-types
或者在Cargo.toml中添加以下行:
polkadot-node-subsystem-types = "25.0.0"
使用示例
下面是一个完整的示例代码,展示如何使用polkadot-node-subsystem-types
库实现波卡节点子系统类型定义与消息交互:
use polkadot_node_subsystem_types::{
messages::{AllMessages, AvailabilityDistributionMessage},
overseer::{self, SubsystemSender},
};
use polkadot_primitives::v2::CandidateHash;
use futures::channel::oneshot;
// 定义一个简单的子系统
struct MySubsystem;
#[overseer::subsystem(MySubsystem, error=overseer::SubsystemError)]
impl<Context> overseer::Subsystem<Context> for MySubsystem
where
Context: SubsystemSender<AllMessages> + overseer::SubsystemContext<Message = AllMessages>,
{
fn start(self, ctx: Context) -> futures::never::Never {
// 启动子系统逻辑
let mut sender = ctx.sender().clone();
// 异步处理消息
async move {
// 模拟接收消息
let (tx, rx) = oneshot::channel();
// 发送消息到可用性分发子系统
sender.send_message(
AllMessages::AvailabilityDistribution(
AvailabilityDistributionMessage::ChunkFetchingRequest {
candidate_hash: CandidateHash::default(),
index: 0,
response_sender: tx,
}
)
).await;
// 处理响应
if let Ok(response) = rx.await {
println!("Received chunk response: {:?}", response);
}
}.boxed();
unreachable!()
}
}
// 主函数设置overseer
#[tokio::main]
async fn main() {
use polkadot_node_subsystem_types::OverseerHandler;
// 创建overseer配置
let config = overseer::OverseerConfig {
spawner: Box::new(|task| tokio::spawn(task)),
// 其他配置...
};
// 构建overseer
let (overseer, handler) = overseer::Overseer::new(
config,
vec![Box::new(MySubsystem)],
// 其他子系统...
).unwrap();
// 运行overseer
tokio::spawn(overseer.run());
// 使用handler与子系统交互
let _ = handler.send_message(AllMessages::Dummy).await;
}
完整示例代码
use polkadot_node_subsystem_types::{
messages::{AllMessages, AvailabilityDistributionMessage},
overseer::{self, SubsystemSender},
};
use polkadot_primitives::v2::{CandidateHash, ValidatorIndex};
use futures::channel::{oneshot, mpsc};
use futures::StreamExt;
// 自定义子系统1 - 处理可用性数据
struct AvailabilitySubsystem;
#[overseer::subsystem(AvailabilitySubsystem, error=overseer::SubsystemError)]
impl<Context> overseer::Subsystem<Context> for AvailabilitySubsystem
where
Context: SubsystemSender<AllMessages> + overseer::SubsystemContext<Message = AllMessages>,
{
fn start(self, ctx: Context) -> futures::never::Never {
let mut sender = ctx.sender().clone();
let mut messages = ctx.message_stream();
async move {
while let Some(msg) = messages.next().await {
match msg {
AllMessages::AvailabilityDistribution(
AvailabilityDistributionMessage::ChunkFetchingRequest {
candidate_hash,
index,
response_sender,
}
) => {
println!("Received chunk request for {:?} at index {}", candidate_hash, index);
let _ = response_sender.send(vec![0u8; 256]); // 模拟返回数据
}
_ => {}
}
}
}.boxed();
unreachable!()
}
}
// 自定义子系统2 - 验证逻辑
struct ValidationSubsystem;
#[overseer::subsystem(ValidationSubsystem, error=overseer::SubsystemError)]
impl<Context> overseer::Subsystem<Context> for ValidationSubsystem
where
Context: SubsystemSender<AllMessages> + overseer::SubsystemContext<Message = AllMessages>,
{
fn start(self, ctx: Context) -> futures::never::Never {
let mut sender = ctx.sender().clone();
async move {
// 模拟定期发送验证消息
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let (tx, rx) = oneshot::channel();
sender.send_message(
AllMessages::AvailabilityDistribution(
AvailabilityDistributionMessage::ChunkFetchingRequest {
candidate_hash: CandidateHash::default(),
index: 0,
response_sender: tx,
}
)
).await;
if let Ok(chunk) = rx.await {
println!("Validating chunk of size {} bytes", chunk.len());
}
}
}.boxed();
unreachable!()
}
}
// 主函数
#[tokio::main]
async fn main() {
use polkadot_node_subsystem_types::OverseerHandler;
// 创建overseer配置
let config = overseer::OverseerConfig {
spawner: Box::new(|task| tokio::spawn(task)),
// 其他配置项...
};
// 构建overseer并注册子系统
let (overseer, handler) = overseer::Overseer::new(
config,
vec![
Box::new(AvailabilitySubsystem),
Box::new(ValidationSubsystem),
],
// 可以添加更多子系统...
).expect("Failed to create overseer");
// 启动overseer
tokio::spawn(overseer.run());
// 使用handler与子系统交互
let (tx, rx) = oneshot::channel();
handler.send_message(
AllMessages::AvailabilityDistribution(
AvailabilityDistributionMessage::ChunkFetchingRequest {
candidate_hash: CandidateHash::random(),
index: 1,
response_sender: tx,
}
)
).await;
if let Ok(chunk) = rx.await {
println!("Main received chunk: {} bytes", chunk.len());
}
// 让程序运行一段时间
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
关键组件说明
-
消息类型(AllMessages):
- 定义了所有子系统间交互的消息类型
- 包含多个子系统特定消息的枚举
-
Overseer:
- 管理所有子系统的中央协调器
- 负责消息路由和子系统生命周期管理
-
Subsystem trait:
- 所有子系统必须实现的核心特性
- 定义了子系统的启动和运行方式
-
SubsystemSender:
- 允许子系统发送消息到其他子系统的接口
典型使用场景
-
自定义子系统开发:
- 实现
Subsystem
trait创建新子系统 - 通过
SubsystemSender
与其他子系统交互
- 实现
-
消息处理:
- 定义子系统特定的消息类型
- 处理来自其他子系统的消息
-
跨子系统通信:
- 使用
AllMessages
枚举发送跨子系统消息 - 通过oneshot通道接收异步响应
- 使用
-
测试与模拟:
- 使用
OverseerHandler
在测试中模拟消息发送 - 验证子系统间的交互逻辑
- 使用
这个示例展示了polkadot-node-subsystem-types
库的核心功能,包括子系统定义、消息处理和overseer集成。开发者可以根据实际需求扩展此基础结构。
1 回复