golang分布式批量数据处理框架插件库capillaries的使用
Golang分布式批量数据处理框架插件库Capillaries的使用
Capillaries是一个数据处理框架,它解决了可扩展性问题并管理中间数据存储,让用户能够专注于数据转换和质量控制。
Capillaries的特点
Capillaries填补了分布式、可扩展的数据处理/集成解决方案与在SLA时间限制内生成丰富的、客户就绪的、生产质量的、人工管理的数据之间的空白。
BEFORE | AFTER | |
---|---|---|
云友好 | 取决于 | 可在几分钟内部署到云端;Docker就绪 |
数据聚合 | SQL连接 | Capillaries在Cassandra中的查找+Go表达式(可扩展性,并行执行) |
数据过滤 | SQL查询,自定义代码 | Go表达式(可扩展性,可维护性) |
数据转换 | SQL表达式,自定义代码 | Go表达式,Python公式(并行执行,可维护性) |
中间数据存储 | 文件,关系数据库 | 即时创建的Cassandra键空间和表(可扩展性,可维护性) |
工作流执行 | Shell脚本,自定义代码,工作流框架 | RabbitMQ作为调度器,工作流状态存储在Cassandra中(并行执行,容错,增量计算) |
工作流监控和交互 | 自定义解决方案 | Capillaries UI,Toolbelt实用程序,API,Web API(透明度,操作员验证支持) |
工作流管理 | Shell脚本,自定义代码 | Capillaries配置:带有DAG的脚本文件,Python公式 |
快速开始
在Mac、WSL或Linux上,在bash shell中运行:
git clone https://github.com/capillariesio/capillaries.git
cd capillaries
./copy_demo_data.sh
docker compose -p "test_capillaries_containers" up
等待所有容器启动并且Cassandra完全初始化(它将记录类似Created default superuser role 'cassandra'
的内容)。现在Capillaries已准备好根据示例演示脚本处理示例演示输入数据(以上由copy_demo_data.sh复制)。
导航到http://localhost:8080
查看Capillaries UI。
通过点击"New run"并提供以下参数(不允许使用制表符或空格)启动新的Capillaries数据处理运行:
字段 | 值 |
---|---|
Keyspace | portfolio_quicktest |
Script URI | /tmp/capi_cfg/portfolio_quicktest/script.json |
Script parameters URI | /tmp/capi_cfg/portfolio_quicktest/script_params.json |
Start nodes | 1_read_accounts,1_read_txns,1_read_period_holdings |
或者,您可以使用Capillaries toolbelt从Docker主机机器执行以下命令启动新的运行,它应该具有与从UI启动运行相同的效果:
docker exec -it capillaries_webapi /usr/local/bin/capitoolbelt start_run -script_file=/tmp/capi_cfg/portfolio_quicktest/script.json -params_file=/tmp/capi_cfg/portfolio_quicktest/script_params.json -keyspace=portfolio_quicktest -start_nodes=1_read_accounts,1_read_txns,1_read_period_holdings
在Capillaries UI中观察进度。一个新的键空间portfolio_quicktest
将出现在键空间列表中。点击它并观察运行完成 - 节点7_file_account_period_sector_perf
和7_file_account_year_perf
应该生成结果文件:
cat /tmp/capi_out/portfolio_quicktest/account_period_sector_perf.csv
cat /tmp/capi_out/portfolio_quicktest/account_year_perf.csv
监控测试运行
除了http://localhost:8080
上的Capillaries UI外,您可能还想查看其他工具提供的统计信息。
由以下组件生成的日志消息:
- Capillaries守护进程
- Capillaries WebAPI
- Capillaries UI
- RabbitMQ
- 带有Prometheus jmx-exporter的Cassandra
- Prometheus
由fluentd收集并保存在/tmp/capi_log中。
要查看Cassandra集群状态,运行以下命令(重置JVM_OPTS,以便jmx-exporter不会尝试附加到nodetool JVM进程):
docker exec -e JVM_OPTS= capillaries_cassandra1 nodetool status
示例代码
以下是一个简单的Capillaries脚本配置示例(script.json):
{
"nodes": {
"1_read_accounts": {
"type": "file_reader",
"file_path": "/tmp/capi_in/portfolio_quicktest/accounts.csv",
"file_format": "csv",
"batch_size": 1000,
"target": "accounts"
},
"2_process_data": {
"type": "processor",
"source": "accounts",
"processor": {
"type": "go_expression",
"expression": "row['balance'] = row['balance'] * 1.1"
},
"target": "processed_accounts"
},
"3_write_output": {
"type": "file_writer",
"source": "processed_accounts",
"file_path": "/tmp/capi_out/portfolio_quicktest/processed_accounts.csv",
"file_format": "csv"
}
},
"edges": [
["1_read_accounts", "2_process_data"],
["2_process_data", "3_write_output"]
]
}
这个简单的脚本定义了一个包含三个节点的工作流:
- 从CSV文件读取账户数据
- 使用Go表达式处理数据(将余额增加10%)
- 将处理后的数据写入新的CSV文件
要运行这个示例,您需要创建相应的输入文件并调整路径以匹配您的环境。
进一步学习
- Capillaries是什么不是什么(用例讨论和图表)
- 入门指南(运行一个快速的基于Docker的演示,无需编译一行代码)
- 测试指南
- Toolbelt、Daemon和Webapi配置
- 脚本配置
- Capillaries UI
- Capillaries API
- 术语表
- 问答
© 2022-2025 KH (kleines.hertz[at]protonmail.com)
更多关于golang分布式批量数据处理框架插件库capillaries的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang分布式批量数据处理框架插件库capillaries的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Capillaries - Golang分布式批量数据处理框架插件库
Capillaries是一个用于构建分布式批量数据处理管道的Golang框架,它提供了插件化的架构,可以方便地扩展各种数据源、转换器和目的地。
核心概念
Capillaries基于以下几个核心概念构建:
- 工作流(Workflow):定义数据处理流程
- 节点(Node):工作流中的处理单元
- 插件(Plugin):可扩展的组件,用于实现特定功能
基本使用
安装
go get github.com/capillariesio/capillaries
简单示例
package main
import (
"context"
"fmt"
"log"
"github.com/capillariesio/capillaries/pkg/capillaries"
"github.com/capillariesio/capillaries/pkg/env"
"github.com/capillariesio/capillaries/pkg/wfmodel"
)
func main() {
// 初始化环境
envConfig := env.Config{
RunnerID: "local_runner",
}
env.Init(&envConfig)
// 创建工作流定义
workflowDef := wfmodel.WorkflowDef{
Name: "demo_workflow",
Nodes: []*wfmodel.NodeDef{
{
Name: "source_node",
Type: "file_reader",
Params: map[string]interface{}{
"file_path": "input.csv",
"format": "csv",
},
},
{
Name: "transform_node",
Type: "sql_transformer",
Params: map[string]interface{}{
"query": "SELECT col1, col2 FROM source_node WHERE col1 > 100",
},
DependsOn: []string{"source_node"},
},
{
Name: "target_node",
Type: "file_writer",
Params: map[string]interface{}{
"file_path": "output.csv",
"format": "csv",
},
DependsOn: []string{"transform_node"},
},
},
}
// 创建工作流运行器
runner, err := capillaries.NewRunner(context.Background(), &workflowDef)
if err != nil {
log.Fatal(err)
}
// 执行工作流
status, err := runner.Run()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Workflow completed with status: %s\n", status)
}
主要功能
1. 数据源插件
Capillaries支持多种数据源:
// 文件读取器示例
fileReaderNode := &wfmodel.NodeDef{
Name: "csv_reader",
Type: "file_reader",
Params: map[string]interface{}{
"file_path": "data/input.csv",
"format": "csv",
"has_header": true,
},
}
// 数据库读取器示例
dbReaderNode := &wfmodel.NodeDef{
Name: "db_reader",
Type: "db_reader",
Params: map[string]interface{}{
"dsn": "user:password@tcp(localhost:3306)/dbname",
"query": "SELECT * FROM table",
"db_type": "mysql",
},
}
2. 数据转换插件
// SQL转换器示例
sqlTransformerNode := &wfmodel.NodeDef{
Name: "transformer",
Type: "sql_transformer",
Params: map[string]interface{}{
"query": "SELECT col1, UPPER(col2) as col2_upper FROM source_node",
},
DependsOn: []string{"source_node"},
}
// 自定义转换器示例
customTransformerNode := &wfmodel.NodeDef{
Name: "custom_transformer",
Type: "custom_processor",
Params: map[string]interface{}{
"processor_name": "my_processor",
},
DependsOn: []string{"previous_node"},
}
3. 数据目的地插件
// 文件写入器示例
fileWriterNode := &wfmodel.NodeDef{
Name: "csv_writer",
Type: "file_writer",
Params: map[string]interface{}{
"file_path": "data/output.csv",
"format": "csv",
"has_header": true,
},
DependsOn: []string{"transformer"},
}
// 数据库写入器示例
dbWriterNode := &wfmodel.NodeDef{
Name: "db_writer",
Type: "db_writer",
Params: map[string]interface{}{
"dsn": "user:password@tcp(localhost:3306)/dbname",
"table": "target_table",
"db_type": "mysql",
"batch_size": 1000,
},
DependsOn: []string{"transformer"},
}
高级特性
1. 分布式执行
Capillaries支持分布式执行模式:
// 分布式配置
envConfig := env.Config{
RunnerID: "cluster_runner_1",
CoordinatorHost: "coordinator.example.com",
CoordinatorPort: 8080,
WorkerCount: 4,
}
env.Init(&envConfig)
2. 自定义插件开发
创建自定义处理器:
package myprocessor
import (
"context"
"github.com/capillariesio/capillaries/pkg/l"
"github.com/capillariesio/capillaries/pkg/processor"
"github.com/capillariesio/capillaries/pkg/sc"
)
type MyProcessor struct {
processor.DefaultProcessor
}
func (p *MyProcessor) Run(
ctx context.Context,
script *sc.ScriptContext,
logger *l.Logger,
params map[string]interface{},
recordsBatch *sc.ProcessorRecordsBatch) error {
// 处理逻辑
for i := range recordsBatch.Records {
// 对每条记录进行处理
recordsBatch.Records[i]["processed"] = true
}
return nil
}
func init() {
processor.RegisterProcessor("my_processor", func() processor.Processor {
return &MyProcessor{}
})
}
3. 监控和日志
// 配置日志
logger := l.NewLogger(l.VerboseLevel)
logger.Info("Starting workflow execution")
// 获取工作流状态
status, err := runner.GetStatus()
if err != nil {
logger.Error("Failed to get workflow status: %s", err.Error())
}
logger.Info("Workflow status: %s", status)
最佳实践
- 合理设计工作流:将复杂处理分解为多个节点
- 使用批处理:对于大数据量,设置适当的批处理大小
- 错误处理:实现健壮的错误处理和重试机制
- 资源管理:监控资源使用情况,避免OOM
- 测试:为自定义插件编写单元测试
Capillaries提供了灵活且强大的框架来构建分布式数据处理管道,通过其插件化架构,可以轻松扩展以满足各种数据处理需求。