golang轻量级容错消息流处理插件库liftbridge的使用

Golang轻量级容错消息流处理插件库Liftbridge的使用

Liftbridge简介

Liftbridge Logo

Liftbridge通过实现持久化、可复制和可扩展的消息日志,提供轻量级、容错的消息流。Liftbridge的愿景是提供一个"简化版Kafka"解决方案,优先考虑Go社区的需求。与基于JVM构建的Kafka不同,Liftbridge及其官方客户端go-liftbridge是用Go实现的。Liftbridge的最终目标是提供一个轻量级的消息流解决方案,专注于简单性和可用性。您可以将其作为Kafka和Pulsar等系统的更简单、更轻量级的替代品,或者为现有的NATS部署添加流语义。

使用示例

下面是一个完整的Liftbridge使用示例,展示了如何创建生产者发送消息和消费者接收消息:

生产者示例

package main

import (
	"context"
	"log"
	"time"

	"github.com/liftbridge-io/go-liftbridge/v2"
)

func main() {
	// 连接到Liftbridge服务器
	client, err := liftbridge.Connect([]string{"localhost:9292"})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// 创建消息流(如果不存在)
	ctx := context.Background()
	if err := client.CreateStream(ctx, "example-stream", "example-stream"); err != nil {
		if err != liftbridge.ErrStreamExists {
			log.Fatal(err)
		}
	}

	// 发送消息
	for i := 0; i < 10; i++ {
		_, err := client.Publish(ctx, "example-stream", []byte("Hello, Liftbridge!"), 
			liftbridge.ToPartition(0),
			liftbridge.WithAckPolicyAll(),
		)
		if err != nil {
			log.Fatal(err)
		}
		log.Printf("Sent message %d\n", i)
		time.Sleep(1 * time.Second)
	}
}

消费者示例

package main

import (
	"context"
	"log"

	"github.com/liftbridge-io/go-liftbridge/v2"
)

func main() {
	// 连接到Liftbridge服务器
	client, err := liftbridge.Connect([]string{"localhost:9292"})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// 订阅消息流
	ctx := context.Background()
	err = client.Subscribe(ctx, "example-stream", 
		func(msg *liftbridge.Message, err error) {
			if err != nil {
				log.Printf("Error receiving message: %v", err)
				return
			}
			log.Printf("Received message: %s, offset: %d", string(msg.Value()), msg.Offset())
		},
		liftbridge.StartAtEarliestReceived(),
	)
	if err != nil {
		log.Fatal(err)
	}

	// 保持运行
	select {}
}

主要特性

  1. 轻量级:专为Go设计,资源占用小
  2. 容错:通过复制提供高可用性
  3. 持久化:消息持久存储在日志中
  4. 可扩展:支持水平扩展
  5. 简单易用:API设计简洁直观

使用场景

  • 需要轻量级消息队列/流的应用
  • 需要为现有NATS系统添加持久化功能的场景
  • 作为Kafka的简化替代方案
  • 需要简单流处理功能的Go应用

安装

要使用Liftbridge Go客户端库,可以通过以下命令安装:

go get github.com/liftbridge-io/go-liftbridge/v2

高级功能示例

下面是一个展示Liftbridge更多功能的示例,包括分区、消息确认策略等:

package main

import (
	"context"
	"log"
	"time"

	"github.com/liftbridge-io/go-liftbridge/v2"
)

func main() {
	// 连接到Liftbridge集群
	client, err := liftbridge.Connect([]string{"localhost:9292", "localhost:9293"})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	ctx := context.Background()

	// 创建带分区的流
	streamConfig := &liftbridge.StreamConfig{
		Name:              "advanced-stream",
		Subject:           "advanced-stream",
		ReplicationFactor: 2,
		Partitions:        3,
	}
	if err := client.CreateStream(ctx, streamConfig); err != nil && err != liftbridge.ErrStreamExists {
		log.Fatal(err)
	}

	// 发送带键的消息到特定分区
	for i := 0; i < 10; i++ {
		_, err := client.Publish(ctx, "advanced-stream", []byte("Advanced message"),
			liftbridge.WithKey([]byte("message-key")),
			liftbridge.ToPartition(i%3), // 轮询分区
			liftbridge.WithAckPolicyAll(),
			liftbridge.WithHeader("timestamp", time.Now().Format(time.RFC3339)),
		)
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(500 * time.Millisecond)
	}

	// 从特定分区订阅消息
	err = client.Subscribe(ctx, "advanced-stream",
		func(msg *liftbridge.Message, err error) {
			if err != nil {
				log.Printf("Error receiving message: %v", err)
				return
			}
			log.Printf("Received message from partition %d: %s", msg.Partition(), string(msg.Value()))
		},
		liftbridge.StartAtEarliestReceived(),
		liftbridge.Partition(1), // 只订阅分区1
	)
	if err != nil {
		log.Fatal(err)
	}

	// 保持运行
	select {}
}

Liftbridge为Go开发者提供了一个简单而强大的消息流处理解决方案,特别适合需要轻量级但功能完整的流处理系统的场景。


更多关于golang轻量级容错消息流处理插件库liftbridge的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang轻量级容错消息流处理插件库liftbridge的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Liftbridge Golang 客户端使用指南

Liftbridge 是一个轻量级的容错消息流处理系统,构建在 NATS 之上,提供了类似 Kafka 的日志流功能。下面我将介绍如何在 Golang 中使用 Liftbridge 客户端库。

安装

首先安装 Liftbridge Go 客户端:

go get github.com/liftbridge-io/go-liftbridge/v2

基本使用

1. 连接到 Liftbridge 服务器

package main

import (
	"context"
	"log"
	"time"

	"github.com/liftbridge-io/go-liftbridge/v2"
)

func main() {
	// 创建客户端连接
	client, err := liftbridge.Connect([]string{"localhost:9292"})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// 其他操作...
}

2. 创建流

// 创建流
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := client.CreateStream(ctx, "example-stream", "example-subject")
if err != nil && err != liftbridge.ErrStreamExists {
	log.Fatal(err)
}

3. 发布消息

// 发布消息
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err = client.Publish(ctx, "example-stream", []byte("Hello, Liftbridge!"))
if err != nil {
	log.Fatal(err)
}

4. 订阅消息

// 创建订阅
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

err = client.Subscribe(ctx, "example-stream", func(msg *liftbridge.Message, err error) {
	if err != nil {
		log.Printf("Subscription error: %v", err)
		return
	}
	log.Printf("Received message: %s", string(msg.Value()))
}, liftbridge.StartAtEarliestReceived())
if err != nil {
	log.Fatal(err)
}

// 保持程序运行
select {}

高级特性

1. 分区支持

// 创建分区流
err := client.CreateStream(ctx, "partitioned-stream", "partitioned-subject",
	liftbridge.Partitions(3))
if err != nil && err != liftbridge.ErrStreamExists {
	log.Fatal(err)
}

// 发布到特定分区
_, err = client.Publish(ctx, "partitioned-stream", []byte("Partitioned message"),
	liftbridge.Partition(1))
if err != nil {
	log.Fatal(err)
}

2. 消息确认

// 发布需要确认的消息
ack, err := client.Publish(ctx, "example-stream", []byte("Message requiring ack"),
	liftbridge.AckPolicyAll())
if err != nil {
	log.Fatal(err)
}

// 等待确认
select {
case <-ack:
	log.Println("Message was acked by all replicas")
case <-time.After(5 * time.Second):
	log.Println("Timed out waiting for ack")
}

3. 消费者组

// 使用消费者组订阅
err = client.Subscribe(ctx, "example-stream", func(msg *liftbridge.Message, err error) {
	// 处理消息
}, 
	liftbridge.StartAtLatestReceived(),
	liftbridge.ConsumerGroup("my-group"))
if err != nil {
	log.Fatal(err)
}

容错处理

// 带重试的连接
client, err := liftbridge.Connect([]string{"localhost:9292"},
	liftbridge.MaxReconnects(5),
	liftbridge.ReconnectDelay(2 * time.Second))
if err != nil {
	log.Fatal(err)
}

// 订阅时处理错误
err = client.Subscribe(ctx, "example-stream", func(msg *liftbridge.Message, err error) {
	if err != nil {
		if err == liftbridge.ErrUnavailable {
			log.Println("Server unavailable, will retry...")
			return
		}
		log.Printf("Fatal error: %v", err)
		return
	}
	// 处理消息
})

完整示例

package main

import (
	"context"
	"log"
	"time"

	"github.com/liftbridge-io/go-liftbridge/v2"
)

func main() {
	// 连接
	client, err := liftbridge.Connect([]string{"localhost:9292"})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// 创建流
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	err = client.CreateStream(ctx, "demo-stream", "demo.subject")
	if err != nil && err != liftbridge.ErrStreamExists {
		log.Fatal(err)
	}

	// 订阅
	ctx, cancel = context.WithCancel(context.Background())
	defer cancel()
	
	go func() {
		err := client.Subscribe(ctx, "demo-stream", func(msg *liftbridge.Message, err error) {
			if err != nil {
				log.Printf("Error: %v", err)
				return
			}
			log.Printf("Received: %s", string(msg.Value()))
		}, liftbridge.StartAtEarliestReceived())
		
		if err != nil {
			log.Fatal(err)
		}
	}()

	// 发布消息
	for i := 0; i < 10; i++ {
		_, err = client.Publish(ctx, "demo-stream", []byte("Message "+string(i)))
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(1 * time.Second)
	}

	// 等待
	time.Sleep(10 * time.Second)
}

总结

Liftbridge 提供了轻量级的消息流处理能力,特别适合需要 Kafka 类似功能但希望更简单部署的场景。Go 客户端库提供了简洁的 API 来实现流创建、消息发布和订阅等核心功能,并内置了容错机制。

关键优势:

  • 轻量级,易于部署
  • 与 NATS 生态集成
  • 支持分区和消费者组
  • 提供消息确认机制
  • 简单的 Go API

对于需要更复杂功能的场景,可以考虑结合 NATS JetStream 或其他消息系统使用。

回到顶部