golang多协议消息流处理插件库Benthos的使用
Golang多协议消息流处理插件库Benthos的使用
Redpanda Connect简介
Redpanda Connect是一个高性能、弹性的流处理器,能够连接各种[输入源]和[输出目标],并执行[数据水合、丰富、转换和过滤]操作。它带有强大的映射语言,易于部署和监控,可以作为静态二进制文件或Docker镜像直接集成到您的管道中。
核心特性
- 声明式配置,流管道可以用单个配置文件定义
- 支持多种输入/输出协议
- 内置强大的数据处理能力
- 提供交付保证(至少一次交付)
- 丰富的监控和追踪功能
快速开始示例
以下是一个完整的配置示例,展示如何从GCP Pub/Sub读取数据,处理后写入Redis Streams:
input:
gcp_pubsub:
project: foo
subscription: bar
pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
root.user.age = this.user.age.number()
output:
redis_streams:
url: tcp://TODO:6379
stream: baz
max_in_flight: 20
安装方式
Linux安装
curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip
unzip rpk-linux-amd64.zip -d ~/.local/bin/
Homebrew安装
brew install redpanda-data/tap/redpanda
Docker安装
docker pull docker.redpanda.com/redpandadata/connect
运行方式
直接运行
rpk connect run ./config.yaml
Docker运行
# 使用配置文件
docker run --rm -v /path/to/your/config.yaml:/connect.yaml docker.redpanda.com/redpandadata/connect run
# 使用参数
docker run --rm -p 4195:4195 docker.redpanda.com/redpandadata/connect run \
-s "input.type=http_server" \
-s "output.type=kafka" \
-s "output.kafka.addresses=kafka-server:9092" \
-s "output.kafka.topic=redpanda_topic"
监控功能
Redpanda Connect提供了丰富的监控功能:
-
健康检查:
/ping
- 用作存活探针,总是返回200/ready
- 用作就绪探针,当输入和输出都连接时返回200,否则返回503
-
指标监控:
- 支持Statsd、Prometheus、JSON HTTP端点等多种方式
-
追踪:
- 支持OpenTelemetry追踪事件
支持的协议
Redpanda Connect支持广泛的输入/输出协议,包括但不限于:
- AWS (DynamoDB, Kinesis, S3, SQS, SNS)
- Azure (Blob存储, 队列存储, 表存储)
- GCP (Pub/Sub, 云存储, BigQuery)
- Kafka, NATS, NSQ, MQTT
- AMQP 0.91 (RabbitMQ), AMQP 1
- Redis (流, 列表, pubsub, 哈希)
- Cassandra, Elasticsearch, HDFS
- HTTP (服务器和客户端, 包括websockets)
- MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL)
构建自定义插件
Redpanda Connect允许开发者轻松创建自定义插件。可以参考[示例仓库]了解各种插件实现方式。
高级构建选项
要构建包含额外依赖的版本(如zmq4输入输出),可以使用:
# 使用go
go install -tags "x_benthos_extra" github.com/redpanda-data/connect/v4/cmd/redpanda-connect@latest
# 使用make
make TAGS=x_benthos_extra
贡献指南
欢迎贡献代码!提交PR前请确保:
- 已通过单元测试
make test
- 已通过lint检查
make lint
- 已格式化代码
make fmt
注意:大多数集成测试需要启动Docker容器,因此make test
会跳过这些测试。您可以通过以下命令单独运行它们:
go test -run "^Test.*Integration.*$" ./internal/impl/<connector directory>/...
更多关于golang多协议消息流处理插件库Benthos的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang多协议消息流处理插件库Benthos的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Benthos:Golang多协议消息流处理插件库
Benthos 是一个强大的流处理工具,用 Go 编写,支持多种消息协议和处理转换操作。它特别适合构建数据管道、ETL流程和消息路由系统。
核心特性
- 多协议支持:Kafka、RabbitMQ、HTTP、AWS SQS/SNS、NATS等
- 丰富处理能力:过滤、转换、批处理、缓存等
- 插件架构:易于扩展新输入/输出和处理组件
- 监控:内置Prometheus指标和健康检查
安装与基本使用
# 安装Benthos
go get github.com/benthosdev/benthos/v4
示例配置
下面是一个简单的YAML配置示例,从STDIN读取,处理后输出到STDOUT:
input:
stdin: {}
pipeline:
processors:
- bloblang: |
root = this
root.message = this.content.uppercase()
root.metadata.link_count = this.links.length()
output:
stdout: {}
Go代码集成示例
package main
import (
"context"
"fmt"
"os"
"github.com/benthosdev/benthos/v4/public/service"
// 导入所有标准组件
_ "github.com/benthosdev/benthos/v4/public/components/all"
)
func main() {
// 定义自定义Bloblang处理器
processorConfig := `
bloblang: |
root = this
root.timestamp = now().format_timestamp("2006-01-02T15:04:05Z07:00")
root.processed = true
`
// 创建Benthos流构建器
builder := service.NewStreamBuilder()
// 设置输入
err := builder.AddInputYAML(`
kafka:
addresses: [ "localhost:9092" ]
topics: [ "input-topic" ]
consumer_group: "benthos-group"
`)
if err != nil {
fmt.Printf("Failed to add input: %v\n", err)
os.Exit(1)
}
// 添加处理器
err = builder.AddProcessorYAML(processorConfig)
if err != nil {
fmt.Printf("Failed to add processor: %v\n", err)
os.Exit(1)
}
// 设置输出
err = builder.AddOutputYAML(`
http_client:
url: http://localhost:8080/ingest
verb: POST
headers:
Content-Type: application/json
`)
if err != nil {
fmt.Printf("Failed to add output: %v\n", err)
os.Exit(1)
}
// 构建流
stream, err := builder.Build()
if err != nil {
fmt.Printf("Failed to build stream: %v\n", err)
os.Exit(1)
}
// 运行流
ctx := context.Background()
if err := stream.Run(ctx); err != nil {
fmt.Printf("Stream error: %v\n", err)
os.Exit(1)
}
}
高级功能示例
条件路由
input:
kafka:
addresses: [ "localhost:9092" ]
topics: [ "input-topic" ]
consumer_group: "benthos-group"
pipeline:
processors:
- switch:
- check: this.type == "A"
processors:
- bloblang: 'root = this.message.uppercase()'
- check: this.type == "B"
processors:
- bloblang: 'root = this.message.lowercase()'
output:
switch:
cases:
- check: this.type == "A"
output:
kafka:
addresses: [ "localhost:9092" ]
topic: "output-topic-a"
- check: this.type == "B"
output:
kafka:
addresses: [ "localhost:9092" ]
topic: "output-topic-b"
批处理
input:
http_client:
url: http://example.com/api/data
verb: GET
pipeline:
processors:
- batch:
count: 10
processors:
- archive:
format: lines
- compress:
algorithm: gzip
output:
s3:
bucket: my-bucket
path: "data/${!timestamp("2006-01-02")}/${!uuid_v4()}.json.gz"
自定义插件开发
Benthos允许开发自定义组件。以下是自定义处理器的示例:
package main
import (
"context"
"fmt"
"github.com/benthosdev/benthos/v4/public/service"
)
func init() {
configSpec := service.NewConfigSpec().
Field(service.NewStringField("prefix"))
err := service.RegisterProcessor(
"prefix_adder",
configSpec,
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
prefix, err := conf.FieldString("prefix")
if err != nil {
return nil, err
}
return &prefixAdder{prefix: prefix}, nil
})
if err != nil {
panic(err)
}
}
type prefixAdder struct {
prefix string
}
func (p *prefixAdder) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
content, err := msg.AsBytes()
if err != nil {
return nil, err
}
newContent := fmt.Sprintf("%s%s", p.prefix, content)
msg.SetBytes(newContent)
return service.MessageBatch{msg}, nil
}
func (p *prefixAdder) Close(ctx context.Context) error {
return nil
}
func main() {
service.RunCLI()
}
监控与管理
Benthos提供内置的HTTP API用于监控和管理:
http:
address: 0.0.0.0:4195
enabled: true
debug_endpoints: false
可以通过/metrics
端点获取Prometheus指标,/ready
用于健康检查。
Benthos是一个功能强大且灵活的工具,特别适合构建复杂的数据处理流水线。它的插件架构和丰富的内置组件使其能够轻松集成到各种系统中。