golang分布式批量数据处理框架插件库capillaries的使用

Golang分布式批量数据处理框架插件库Capillaries的使用

Capillaries是一个数据处理框架,它解决了可扩展性问题并管理中间数据存储,让用户能够专注于数据转换和质量控制。

Capillaries的特点

Capillaries填补了分布式、可扩展的数据处理/集成解决方案与在SLA时间限制内生成丰富的、客户就绪的、生产质量的、人工管理的数据之间的空白。

Capillaries: before and after

BEFORE AFTER
云友好 取决于 可在几分钟内部署到云端;Docker就绪
数据聚合 SQL连接 Capillaries在Cassandra中的查找+Go表达式(可扩展性,并行执行)
数据过滤 SQL查询,自定义代码 Go表达式(可扩展性,可维护性)
数据转换 SQL表达式,自定义代码 Go表达式,Python公式(并行执行,可维护性)
中间数据存储 文件,关系数据库 即时创建的Cassandra键空间和表(可扩展性,可维护性)
工作流执行 Shell脚本,自定义代码,工作流框架 RabbitMQ作为调度器,工作流状态存储在Cassandra中(并行执行,容错,增量计算)
工作流监控和交互 自定义解决方案 Capillaries UI,Toolbelt实用程序,API,Web API(透明度,操作员验证支持)
工作流管理 Shell脚本,自定义代码 Capillaries配置:带有DAG的脚本文件,Python公式

快速开始

在Mac、WSL或Linux上,在bash shell中运行:

git clone https://github.com/capillariesio/capillaries.git
cd capillaries
./copy_demo_data.sh
docker compose -p "test_capillaries_containers" up

等待所有容器启动并且Cassandra完全初始化(它将记录类似Created default superuser role 'cassandra'的内容)。现在Capillaries已准备好根据示例演示脚本处理示例演示输入数据(以上由copy_demo_data.sh复制)。

导航到http://localhost:8080查看Capillaries UI。

通过点击"New run"并提供以下参数(不允许使用制表符或空格)启动新的Capillaries数据处理运行:

字段
Keyspace portfolio_quicktest
Script URI /tmp/capi_cfg/portfolio_quicktest/script.json
Script parameters URI /tmp/capi_cfg/portfolio_quicktest/script_params.json
Start nodes 1_read_accounts,1_read_txns,1_read_period_holdings

或者,您可以使用Capillaries toolbelt从Docker主机机器执行以下命令启动新的运行,它应该具有与从UI启动运行相同的效果:

docker exec -it capillaries_webapi /usr/local/bin/capitoolbelt start_run -script_file=/tmp/capi_cfg/portfolio_quicktest/script.json -params_file=/tmp/capi_cfg/portfolio_quicktest/script_params.json -keyspace=portfolio_quicktest -start_nodes=1_read_accounts,1_read_txns,1_read_period_holdings

在Capillaries UI中观察进度。一个新的键空间portfolio_quicktest将出现在键空间列表中。点击它并观察运行完成 - 节点7_file_account_period_sector_perf7_file_account_year_perf应该生成结果文件:

cat /tmp/capi_out/portfolio_quicktest/account_period_sector_perf.csv
cat /tmp/capi_out/portfolio_quicktest/account_year_perf.csv

监控测试运行

除了http://localhost:8080上的Capillaries UI外,您可能还想查看其他工具提供的统计信息。

由以下组件生成的日志消息:

  • Capillaries守护进程
  • Capillaries WebAPI
  • Capillaries UI
  • RabbitMQ
  • 带有Prometheus jmx-exporter的Cassandra
  • Prometheus

由fluentd收集并保存在/tmp/capi_log中。

要查看Cassandra集群状态,运行以下命令(重置JVM_OPTS,以便jmx-exporter不会尝试附加到nodetool JVM进程):

docker exec -e JVM_OPTS= capillaries_cassandra1 nodetool status

示例代码

以下是一个简单的Capillaries脚本配置示例(script.json):

{
  "nodes": {
    "1_read_accounts": {
      "type": "file_reader",
      "file_path": "/tmp/capi_in/portfolio_quicktest/accounts.csv",
      "file_format": "csv",
      "batch_size": 1000,
      "target": "accounts"
    },
    "2_process_data": {
      "type": "processor",
      "source": "accounts",
      "processor": {
        "type": "go_expression",
        "expression": "row['balance'] = row['balance'] * 1.1"
      },
      "target": "processed_accounts"
    },
    "3_write_output": {
      "type": "file_writer",
      "source": "processed_accounts",
      "file_path": "/tmp/capi_out/portfolio_quicktest/processed_accounts.csv",
      "file_format": "csv"
    }
  },
  "edges": [
    ["1_read_accounts", "2_process_data"],
    ["2_process_data", "3_write_output"]
  ]
}

这个简单的脚本定义了一个包含三个节点的工作流:

  1. 从CSV文件读取账户数据
  2. 使用Go表达式处理数据(将余额增加10%)
  3. 将处理后的数据写入新的CSV文件

要运行这个示例,您需要创建相应的输入文件并调整路径以匹配您的环境。

进一步学习

© 2022-2025 KH (kleines.hertz[at]protonmail.com)


更多关于golang分布式批量数据处理框架插件库capillaries的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang分布式批量数据处理框架插件库capillaries的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Capillaries - Golang分布式批量数据处理框架插件库

Capillaries是一个用于构建分布式批量数据处理管道的Golang框架,它提供了插件化的架构,可以方便地扩展各种数据源、转换器和目的地。

核心概念

Capillaries基于以下几个核心概念构建:

  1. 工作流(Workflow):定义数据处理流程
  2. 节点(Node):工作流中的处理单元
  3. 插件(Plugin):可扩展的组件,用于实现特定功能

基本使用

安装

go get github.com/capillariesio/capillaries

简单示例

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/capillariesio/capillaries/pkg/capillaries"
	"github.com/capillariesio/capillaries/pkg/env"
	"github.com/capillariesio/capillaries/pkg/wfmodel"
)

func main() {
	// 初始化环境
	envConfig := env.Config{
		RunnerID: "local_runner",
	}
	env.Init(&envConfig)

	// 创建工作流定义
	workflowDef := wfmodel.WorkflowDef{
		Name: "demo_workflow",
		Nodes: []*wfmodel.NodeDef{
			{
				Name: "source_node",
				Type: "file_reader",
				Params: map[string]interface{}{
					"file_path": "input.csv",
					"format":    "csv",
				},
			},
			{
				Name: "transform_node",
				Type: "sql_transformer",
				Params: map[string]interface{}{
					"query": "SELECT col1, col2 FROM source_node WHERE col1 > 100",
				},
				DependsOn: []string{"source_node"},
			},
			{
				Name: "target_node",
				Type: "file_writer",
				Params: map[string]interface{}{
					"file_path": "output.csv",
					"format":    "csv",
				},
				DependsOn: []string{"transform_node"},
			},
		},
	}

	// 创建工作流运行器
	runner, err := capillaries.NewRunner(context.Background(), &workflowDef)
	if err != nil {
		log.Fatal(err)
	}

	// 执行工作流
	status, err := runner.Run()
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Workflow completed with status: %s\n", status)
}

主要功能

1. 数据源插件

Capillaries支持多种数据源:

// 文件读取器示例
fileReaderNode := &wfmodel.NodeDef{
	Name: "csv_reader",
	Type: "file_reader",
	Params: map[string]interface{}{
		"file_path": "data/input.csv",
		"format":    "csv",
		"has_header": true,
	},
}

// 数据库读取器示例
dbReaderNode := &wfmodel.NodeDef{
	Name: "db_reader",
	Type: "db_reader",
	Params: map[string]interface{}{
		"dsn":      "user:password@tcp(localhost:3306)/dbname",
		"query":    "SELECT * FROM table",
		"db_type":  "mysql",
	},
}

2. 数据转换插件

// SQL转换器示例
sqlTransformerNode := &wfmodel.NodeDef{
	Name: "transformer",
	Type: "sql_transformer",
	Params: map[string]interface{}{
		"query": "SELECT col1, UPPER(col2) as col2_upper FROM source_node",
	},
	DependsOn: []string{"source_node"},
}

// 自定义转换器示例
customTransformerNode := &wfmodel.NodeDef{
	Name: "custom_transformer",
	Type: "custom_processor",
	Params: map[string]interface{}{
		"processor_name": "my_processor",
	},
	DependsOn: []string{"previous_node"},
}

3. 数据目的地插件

// 文件写入器示例
fileWriterNode := &wfmodel.NodeDef{
	Name: "csv_writer",
	Type: "file_writer",
	Params: map[string]interface{}{
		"file_path": "data/output.csv",
		"format":    "csv",
		"has_header": true,
	},
	DependsOn: []string{"transformer"},
}

// 数据库写入器示例
dbWriterNode := &wfmodel.NodeDef{
	Name: "db_writer",
	Type: "db_writer",
	Params: map[string]interface{}{
		"dsn":      "user:password@tcp(localhost:3306)/dbname",
		"table":    "target_table",
		"db_type":  "mysql",
		"batch_size": 1000,
	},
	DependsOn: []string{"transformer"},
}

高级特性

1. 分布式执行

Capillaries支持分布式执行模式:

// 分布式配置
envConfig := env.Config{
	RunnerID:        "cluster_runner_1",
	CoordinatorHost: "coordinator.example.com",
	CoordinatorPort: 8080,
	WorkerCount:     4,
}
env.Init(&envConfig)

2. 自定义插件开发

创建自定义处理器:

package myprocessor

import (
	"context"

	"github.com/capillariesio/capillaries/pkg/l"
	"github.com/capillariesio/capillaries/pkg/processor"
	"github.com/capillariesio/capillaries/pkg/sc"
)

type MyProcessor struct {
	processor.DefaultProcessor
}

func (p *MyProcessor) Run(
	ctx context.Context,
	script *sc.ScriptContext,
	logger *l.Logger,
	params map[string]interface{},
	recordsBatch *sc.ProcessorRecordsBatch) error {
	
	// 处理逻辑
	for i := range recordsBatch.Records {
		// 对每条记录进行处理
		recordsBatch.Records[i]["processed"] = true
	}
	
	return nil
}

func init() {
	processor.RegisterProcessor("my_processor", func() processor.Processor {
		return &MyProcessor{}
	})
}

3. 监控和日志

// 配置日志
logger := l.NewLogger(l.VerboseLevel)
logger.Info("Starting workflow execution")

// 获取工作流状态
status, err := runner.GetStatus()
if err != nil {
	logger.Error("Failed to get workflow status: %s", err.Error())
}
logger.Info("Workflow status: %s", status)

最佳实践

  1. 合理设计工作流:将复杂处理分解为多个节点
  2. 使用批处理:对于大数据量,设置适当的批处理大小
  3. 错误处理:实现健壮的错误处理和重试机制
  4. 资源管理:监控资源使用情况,避免OOM
  5. 测试:为自定义插件编写单元测试

Capillaries提供了灵活且强大的框架来构建分布式数据处理管道,通过其插件化架构,可以轻松扩展以满足各种数据处理需求。

回到顶部