golang多协议消息流处理插件库Benthos的使用

Golang多协议消息流处理插件库Benthos的使用

Redpanda Connect简介

Redpanda Connect是一个高性能、弹性的流处理器,能够连接各种[输入源]和[输出目标],并执行[数据水合、丰富、转换和过滤]操作。它带有强大的映射语言,易于部署和监控,可以作为静态二进制文件或Docker镜像直接集成到您的管道中。

核心特性

  • 声明式配置,流管道可以用单个配置文件定义
  • 支持多种输入/输出协议
  • 内置强大的数据处理能力
  • 提供交付保证(至少一次交付)
  • 丰富的监控和追踪功能

快速开始示例

以下是一个完整的配置示例,展示如何从GCP Pub/Sub读取数据,处理后写入Redis Streams:

input:
  gcp_pubsub:
    project: foo
    subscription: bar

pipeline:
  processors:
    - mapping: |
        root.message = this
        root.meta.link_count = this.links.length()
        root.user.age = this.user.age.number()

output:
  redis_streams:
    url: tcp://TODO:6379
    stream: baz
    max_in_flight: 20

安装方式

Linux安装

curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip
unzip rpk-linux-amd64.zip -d ~/.local/bin/

Homebrew安装

brew install redpanda-data/tap/redpanda

Docker安装

docker pull docker.redpanda.com/redpandadata/connect

运行方式

直接运行

rpk connect run ./config.yaml

Docker运行

# 使用配置文件
docker run --rm -v /path/to/your/config.yaml:/connect.yaml docker.redpanda.com/redpandadata/connect run

# 使用参数
docker run --rm -p 4195:4195 docker.redpanda.com/redpandadata/connect run \
  -s "input.type=http_server" \
  -s "output.type=kafka" \
  -s "output.kafka.addresses=kafka-server:9092" \
  -s "output.kafka.topic=redpanda_topic"

监控功能

Redpanda Connect提供了丰富的监控功能:

  1. 健康检查

    • /ping - 用作存活探针,总是返回200
    • /ready - 用作就绪探针,当输入和输出都连接时返回200,否则返回503
  2. 指标监控

    • 支持Statsd、Prometheus、JSON HTTP端点等多种方式
  3. 追踪

    • 支持OpenTelemetry追踪事件

支持的协议

Redpanda Connect支持广泛的输入/输出协议,包括但不限于:

  • AWS (DynamoDB, Kinesis, S3, SQS, SNS)
  • Azure (Blob存储, 队列存储, 表存储)
  • GCP (Pub/Sub, 云存储, BigQuery)
  • Kafka, NATS, NSQ, MQTT
  • AMQP 0.91 (RabbitMQ), AMQP 1
  • Redis (流, 列表, pubsub, 哈希)
  • Cassandra, Elasticsearch, HDFS
  • HTTP (服务器和客户端, 包括websockets)
  • MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL)

构建自定义插件

Redpanda Connect允许开发者轻松创建自定义插件。可以参考[示例仓库]了解各种插件实现方式。

高级构建选项

要构建包含额外依赖的版本(如zmq4输入输出),可以使用:

# 使用go
go install -tags "x_benthos_extra" github.com/redpanda-data/connect/v4/cmd/redpanda-connect@latest

# 使用make
make TAGS=x_benthos_extra

贡献指南

欢迎贡献代码!提交PR前请确保:

  • 已通过单元测试 make test
  • 已通过lint检查 make lint
  • 已格式化代码 make fmt

注意:大多数集成测试需要启动Docker容器,因此make test会跳过这些测试。您可以通过以下命令单独运行它们:

go test -run "^Test.*Integration.*$" ./internal/impl/<connector directory>/...

更多关于golang多协议消息流处理插件库Benthos的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang多协议消息流处理插件库Benthos的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Benthos:Golang多协议消息流处理插件库

Benthos 是一个强大的流处理工具,用 Go 编写,支持多种消息协议和处理转换操作。它特别适合构建数据管道、ETL流程和消息路由系统。

核心特性

  1. 多协议支持:Kafka、RabbitMQ、HTTP、AWS SQS/SNS、NATS等
  2. 丰富处理能力:过滤、转换、批处理、缓存等
  3. 插件架构:易于扩展新输入/输出和处理组件
  4. 监控:内置Prometheus指标和健康检查

安装与基本使用

# 安装Benthos
go get github.com/benthosdev/benthos/v4

示例配置

下面是一个简单的YAML配置示例,从STDIN读取,处理后输出到STDOUT:

input:
  stdin: {}

pipeline:
  processors:
    - bloblang: |
        root = this
        root.message = this.content.uppercase()
        root.metadata.link_count = this.links.length()

output:
  stdout: {}

Go代码集成示例

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/benthosdev/benthos/v4/public/service"

	// 导入所有标准组件
	_ "github.com/benthosdev/benthos/v4/public/components/all"
)

func main() {
	// 定义自定义Bloblang处理器
	processorConfig := `
bloblang: |
  root = this
  root.timestamp = now().format_timestamp("2006-01-02T15:04:05Z07:00")
  root.processed = true
`

	// 创建Benthos流构建器
	builder := service.NewStreamBuilder()

	// 设置输入
	err := builder.AddInputYAML(`
kafka:
  addresses: [ "localhost:9092" ]
  topics: [ "input-topic" ]
  consumer_group: "benthos-group"
`)
	if err != nil {
		fmt.Printf("Failed to add input: %v\n", err)
		os.Exit(1)
	}

	// 添加处理器
	err = builder.AddProcessorYAML(processorConfig)
	if err != nil {
		fmt.Printf("Failed to add processor: %v\n", err)
		os.Exit(1)
	}

	// 设置输出
	err = builder.AddOutputYAML(`
http_client:
  url: http://localhost:8080/ingest
  verb: POST
  headers:
    Content-Type: application/json
`)
	if err != nil {
		fmt.Printf("Failed to add output: %v\n", err)
		os.Exit(1)
	}

	// 构建流
	stream, err := builder.Build()
	if err != nil {
		fmt.Printf("Failed to build stream: %v\n", err)
		os.Exit(1)
	}

	// 运行流
	ctx := context.Background()
	if err := stream.Run(ctx); err != nil {
		fmt.Printf("Stream error: %v\n", err)
		os.Exit(1)
	}
}

高级功能示例

条件路由

input:
  kafka:
    addresses: [ "localhost:9092" ]
    topics: [ "input-topic" ]
    consumer_group: "benthos-group"

pipeline:
  processors:
    - switch:
      - check: this.type == "A"
        processors:
          - bloblang: 'root = this.message.uppercase()'
      - check: this.type == "B"
        processors:
          - bloblang: 'root = this.message.lowercase()'

output:
  switch:
    cases:
      - check: this.type == "A"
        output:
          kafka:
            addresses: [ "localhost:9092" ]
            topic: "output-topic-a"
      - check: this.type == "B"
        output:
          kafka:
            addresses: [ "localhost:9092" ]
            topic: "output-topic-b"

批处理

input:
  http_client:
    url: http://example.com/api/data
    verb: GET

pipeline:
  processors:
    - batch:
        count: 10
        processors:
          - archive:
              format: lines
          - compress:
              algorithm: gzip

output:
  s3:
    bucket: my-bucket
    path: "data/${!timestamp("2006-01-02")}/${!uuid_v4()}.json.gz"

自定义插件开发

Benthos允许开发自定义组件。以下是自定义处理器的示例:

package main

import (
	"context"
	"fmt"

	"github.com/benthosdev/benthos/v4/public/service"
)

func init() {
	configSpec := service.NewConfigSpec().
		Field(service.NewStringField("prefix"))

	err := service.RegisterProcessor(
		"prefix_adder",
		configSpec,
		func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
			prefix, err := conf.FieldString("prefix")
			if err != nil {
				return nil, err
			}
			return &prefixAdder{prefix: prefix}, nil
		})
	if err != nil {
		panic(err)
	}
}

type prefixAdder struct {
	prefix string
}

func (p *prefixAdder) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
	content, err := msg.AsBytes()
	if err != nil {
		return nil, err
	}

	newContent := fmt.Sprintf("%s%s", p.prefix, content)
	msg.SetBytes(newContent)

	return service.MessageBatch{msg}, nil
}

func (p *prefixAdder) Close(ctx context.Context) error {
	return nil
}

func main() {
	service.RunCLI()
}

监控与管理

Benthos提供内置的HTTP API用于监控和管理:

http:
  address: 0.0.0.0:4195
  enabled: true
  debug_endpoints: false

可以通过/metrics端点获取Prometheus指标,/ready用于健康检查。

Benthos是一个功能强大且灵活的工具,特别适合构建复杂的数据处理流水线。它的插件架构和丰富的内置组件使其能够轻松集成到各种系统中。

回到顶部