golang轻量级容错消息流处理插件库liftbridge的使用
Golang轻量级容错消息流处理插件库Liftbridge的使用
Liftbridge简介
Liftbridge通过实现持久化、可复制和可扩展的消息日志,提供轻量级、容错的消息流。Liftbridge的愿景是提供一个"简化版Kafka"解决方案,优先考虑Go社区的需求。与基于JVM构建的Kafka不同,Liftbridge及其官方客户端go-liftbridge是用Go实现的。Liftbridge的最终目标是提供一个轻量级的消息流解决方案,专注于简单性和可用性。您可以将其作为Kafka和Pulsar等系统的更简单、更轻量级的替代品,或者为现有的NATS部署添加流语义。
使用示例
下面是一个完整的Liftbridge使用示例,展示了如何创建生产者发送消息和消费者接收消息:
生产者示例
package main
import (
"context"
"log"
"time"
"github.com/liftbridge-io/go-liftbridge/v2"
)
func main() {
// 连接到Liftbridge服务器
client, err := liftbridge.Connect([]string{"localhost:9292"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 创建消息流(如果不存在)
ctx := context.Background()
if err := client.CreateStream(ctx, "example-stream", "example-stream"); err != nil {
if err != liftbridge.ErrStreamExists {
log.Fatal(err)
}
}
// 发送消息
for i := 0; i < 10; i++ {
_, err := client.Publish(ctx, "example-stream", []byte("Hello, Liftbridge!"),
liftbridge.ToPartition(0),
liftbridge.WithAckPolicyAll(),
)
if err != nil {
log.Fatal(err)
}
log.Printf("Sent message %d\n", i)
time.Sleep(1 * time.Second)
}
}
消费者示例
package main
import (
"context"
"log"
"github.com/liftbridge-io/go-liftbridge/v2"
)
func main() {
// 连接到Liftbridge服务器
client, err := liftbridge.Connect([]string{"localhost:9292"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 订阅消息流
ctx := context.Background()
err = client.Subscribe(ctx, "example-stream",
func(msg *liftbridge.Message, err error) {
if err != nil {
log.Printf("Error receiving message: %v", err)
return
}
log.Printf("Received message: %s, offset: %d", string(msg.Value()), msg.Offset())
},
liftbridge.StartAtEarliestReceived(),
)
if err != nil {
log.Fatal(err)
}
// 保持运行
select {}
}
主要特性
- 轻量级:专为Go设计,资源占用小
- 容错:通过复制提供高可用性
- 持久化:消息持久存储在日志中
- 可扩展:支持水平扩展
- 简单易用:API设计简洁直观
使用场景
- 需要轻量级消息队列/流的应用
- 需要为现有NATS系统添加持久化功能的场景
- 作为Kafka的简化替代方案
- 需要简单流处理功能的Go应用
安装
要使用Liftbridge Go客户端库,可以通过以下命令安装:
go get github.com/liftbridge-io/go-liftbridge/v2
高级功能示例
下面是一个展示Liftbridge更多功能的示例,包括分区、消息确认策略等:
package main
import (
"context"
"log"
"time"
"github.com/liftbridge-io/go-liftbridge/v2"
)
func main() {
// 连接到Liftbridge集群
client, err := liftbridge.Connect([]string{"localhost:9292", "localhost:9293"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx := context.Background()
// 创建带分区的流
streamConfig := &liftbridge.StreamConfig{
Name: "advanced-stream",
Subject: "advanced-stream",
ReplicationFactor: 2,
Partitions: 3,
}
if err := client.CreateStream(ctx, streamConfig); err != nil && err != liftbridge.ErrStreamExists {
log.Fatal(err)
}
// 发送带键的消息到特定分区
for i := 0; i < 10; i++ {
_, err := client.Publish(ctx, "advanced-stream", []byte("Advanced message"),
liftbridge.WithKey([]byte("message-key")),
liftbridge.ToPartition(i%3), // 轮询分区
liftbridge.WithAckPolicyAll(),
liftbridge.WithHeader("timestamp", time.Now().Format(time.RFC3339)),
)
if err != nil {
log.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
}
// 从特定分区订阅消息
err = client.Subscribe(ctx, "advanced-stream",
func(msg *liftbridge.Message, err error) {
if err != nil {
log.Printf("Error receiving message: %v", err)
return
}
log.Printf("Received message from partition %d: %s", msg.Partition(), string(msg.Value()))
},
liftbridge.StartAtEarliestReceived(),
liftbridge.Partition(1), // 只订阅分区1
)
if err != nil {
log.Fatal(err)
}
// 保持运行
select {}
}
Liftbridge为Go开发者提供了一个简单而强大的消息流处理解决方案,特别适合需要轻量级但功能完整的流处理系统的场景。
更多关于golang轻量级容错消息流处理插件库liftbridge的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang轻量级容错消息流处理插件库liftbridge的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Liftbridge Golang 客户端使用指南
Liftbridge 是一个轻量级的容错消息流处理系统,构建在 NATS 之上,提供了类似 Kafka 的日志流功能。下面我将介绍如何在 Golang 中使用 Liftbridge 客户端库。
安装
首先安装 Liftbridge Go 客户端:
go get github.com/liftbridge-io/go-liftbridge/v2
基本使用
1. 连接到 Liftbridge 服务器
package main
import (
"context"
"log"
"time"
"github.com/liftbridge-io/go-liftbridge/v2"
)
func main() {
// 创建客户端连接
client, err := liftbridge.Connect([]string{"localhost:9292"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 其他操作...
}
2. 创建流
// 创建流
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := client.CreateStream(ctx, "example-stream", "example-subject")
if err != nil && err != liftbridge.ErrStreamExists {
log.Fatal(err)
}
3. 发布消息
// 发布消息
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = client.Publish(ctx, "example-stream", []byte("Hello, Liftbridge!"))
if err != nil {
log.Fatal(err)
}
4. 订阅消息
// 创建订阅
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
err = client.Subscribe(ctx, "example-stream", func(msg *liftbridge.Message, err error) {
if err != nil {
log.Printf("Subscription error: %v", err)
return
}
log.Printf("Received message: %s", string(msg.Value()))
}, liftbridge.StartAtEarliestReceived())
if err != nil {
log.Fatal(err)
}
// 保持程序运行
select {}
高级特性
1. 分区支持
// 创建分区流
err := client.CreateStream(ctx, "partitioned-stream", "partitioned-subject",
liftbridge.Partitions(3))
if err != nil && err != liftbridge.ErrStreamExists {
log.Fatal(err)
}
// 发布到特定分区
_, err = client.Publish(ctx, "partitioned-stream", []byte("Partitioned message"),
liftbridge.Partition(1))
if err != nil {
log.Fatal(err)
}
2. 消息确认
// 发布需要确认的消息
ack, err := client.Publish(ctx, "example-stream", []byte("Message requiring ack"),
liftbridge.AckPolicyAll())
if err != nil {
log.Fatal(err)
}
// 等待确认
select {
case <-ack:
log.Println("Message was acked by all replicas")
case <-time.After(5 * time.Second):
log.Println("Timed out waiting for ack")
}
3. 消费者组
// 使用消费者组订阅
err = client.Subscribe(ctx, "example-stream", func(msg *liftbridge.Message, err error) {
// 处理消息
},
liftbridge.StartAtLatestReceived(),
liftbridge.ConsumerGroup("my-group"))
if err != nil {
log.Fatal(err)
}
容错处理
// 带重试的连接
client, err := liftbridge.Connect([]string{"localhost:9292"},
liftbridge.MaxReconnects(5),
liftbridge.ReconnectDelay(2 * time.Second))
if err != nil {
log.Fatal(err)
}
// 订阅时处理错误
err = client.Subscribe(ctx, "example-stream", func(msg *liftbridge.Message, err error) {
if err != nil {
if err == liftbridge.ErrUnavailable {
log.Println("Server unavailable, will retry...")
return
}
log.Printf("Fatal error: %v", err)
return
}
// 处理消息
})
完整示例
package main
import (
"context"
"log"
"time"
"github.com/liftbridge-io/go-liftbridge/v2"
)
func main() {
// 连接
client, err := liftbridge.Connect([]string{"localhost:9292"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 创建流
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = client.CreateStream(ctx, "demo-stream", "demo.subject")
if err != nil && err != liftbridge.ErrStreamExists {
log.Fatal(err)
}
// 订阅
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
go func() {
err := client.Subscribe(ctx, "demo-stream", func(msg *liftbridge.Message, err error) {
if err != nil {
log.Printf("Error: %v", err)
return
}
log.Printf("Received: %s", string(msg.Value()))
}, liftbridge.StartAtEarliestReceived())
if err != nil {
log.Fatal(err)
}
}()
// 发布消息
for i := 0; i < 10; i++ {
_, err = client.Publish(ctx, "demo-stream", []byte("Message "+string(i)))
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
// 等待
time.Sleep(10 * time.Second)
}
总结
Liftbridge 提供了轻量级的消息流处理能力,特别适合需要 Kafka 类似功能但希望更简单部署的场景。Go 客户端库提供了简洁的 API 来实现流创建、消息发布和订阅等核心功能,并内置了容错机制。
关键优势:
- 轻量级,易于部署
- 与 NATS 生态集成
- 支持分区和消费者组
- 提供消息确认机制
- 简单的 Go API
对于需要更复杂功能的场景,可以考虑结合 NATS JetStream 或其他消息系统使用。