golang实现ZeroMQ消息队列通信的插件库zmq4的使用

Golang实现ZeroMQ消息队列通信的插件库zmq4的使用

简介

zmq4是ZeroMQ版本4的Go语言接口。ZeroMQ是一个高性能异步消息库,用于构建分布式和并行应用程序。

警告

从Go 1.14开始,在类Unix系统上会出现大量中断信号调用。请查看包文档顶部以获取修复方法。

要求

zmq4只是ZeroMQ库的包装器,不包含库本身。因此您需要安装ZeroMQ及其开发文件。

检查安装

在Linux和Darwin上可以检查:

$ pkg-config --modversion libzmq
4.3.1

Go编译器必须能够编译C代码。您可以检查:

$ go env CGO_ENABLED
1

注意:不能进行交叉编译,这会禁用C。

Windows

在Windows上构建时需要设置CGO_CFLAGSCGO_LDFLAGS环境变量,例如:

$env:CGO_CFLAGS='-ID:/dev/vcpkg/installed/x64-windows/include'
$env:CGO_LDFLAGS='-LD:/dev/vcpkg/installed/x64-windows/lib -l:libzmq-mt-4_3_4.lib'

部署程序时需要带上libzmq-mt-4_3_4.dll

安装

go get github.com/pebbe/zmq4

基本使用示例

简单REQ-REP模式示例

服务端(REP):

package main

import (
	"fmt"
	zmq "github.com/pebbe/zmq4"
)

func main() {
	// 创建一个REP类型的socket
	responder, _ := zmq.NewSocket(zmq.REP)
	defer responder.Close()

	// 绑定到TCP端口5555
	responder.Bind("tcp://*:5555")

	for {
		// 等待接收消息
		msg, _ := responder.Recv(0)
		fmt.Printf("Received: %s\n", msg)

		// 发送回复
		responder.Send("World", 0)
	}
}

客户端(REQ):

package main

import (
	"fmt"
	zmq "github.com/pebbe/zmq4"
	"time"
)

func main() {
	// 创建一个REQ类型的socket
	requester, _ := zmq.NewSocket(zmq.REQ)
	defer requester.Close()

	// 连接到服务端
	requester.Connect("tcp://localhost:5555")

	for i := 0; i < 10; i++ {
		// 发送消息
		requester.Send("Hello", 0)

		// 接收回复
		reply, _ := requester.Recv(0)
		fmt.Printf("Received reply %d [%s]\n", i, reply)

		time.Sleep(time.Second)
	}
}

PUB-SUB模式示例

发布者(PUB):

package main

import (
	zmq "github.com/pebbe/zmq4"
	"time"
)

func main() {
	// 创建一个PUB类型的socket
	publisher, _ := zmq.NewSocket(zmq.PUB)
	defer publisher.Close()

	// 绑定到TCP端口5556
	publisher.Bind("tcp://*:5556")

	for i := 0; ; i++ {
		// 发布消息到主题A和B
		publisher.Send("A "+time.Now().String(), 0)
		publisher.Send("B "+time.Now().String(), 0)
		time.Sleep(time.Second)
	}
}

订阅者(SUB):

package main

import (
	"fmt"
	zmq "github.com/pebbe/zmq4"
)

func main() {
	// 创建一个SUB类型的socket
	subscriber, _ := zmq.NewSocket(zmq.SUB)
	defer subscriber.Close()

	// 连接到发布者
	subscriber.Connect("tcp://localhost:5556")

	// 订阅主题A
	subscriber.SetSubscribe("A")

	for {
		// 接收消息
		msg, _ := subscriber.Recv(0)
		fmt.Printf("Received: %s\n", msg)
	}
}

高级特性

使用DRAFT API

要使用ZeroMQ 4.2 DRAFT功能,可以导入draft版本:

import (
    zmq "github.com/pebbe/zmq4/draft"
)

认证控制

zmq4提供了认证控制功能:

// 允许特定IP地址
zmq.AuthAllow("*", "127.0.0.1", "192.168.1.0/24")

// 拒绝特定IP地址
zmq.AuthDeny("*", "10.0.0.1")

注意事项

  1. 需要ZeroMQ 4.0.1或更高版本
  2. 在4.2之前的版本中使用CURVE安全功能时,ZeroMQ必须安装有libsodium支持
  3. 包含《ØMQ - The Guide》中的所有示例

替代方案

  1. zmq4chan - ZeroMQ sockets的Go原生channel接口
  2. go-zeromq/zmq4 - ZeroMQ版本4的纯Go实现
  3. goczmq - CZMQ的Go接口

希望这些示例能帮助您开始使用zmq4进行ZeroMQ消息队列通信。


更多关于golang实现ZeroMQ消息队列通信的插件库zmq4的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现ZeroMQ消息队列通信的插件库zmq4的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Go语言实现ZeroMQ通信(zmq4库指南)

ZeroMQ(ØMQ)是一个高性能异步消息库,而zmq4是Go语言中最流行的ZeroMQ绑定库。下面我将详细介绍如何使用zmq4实现各种消息模式。

安装zmq4

首先需要安装ZeroMQ系统库,然后在Go项目中引入zmq4:

# Ubuntu/Debian安装ZeroMQ
sudo apt-get install libzmq3-dev

# 安装Go绑定
go get github.com/pebbe/zmq4

基本消息模式实现

1. 请求-应答模式(REQ-REP)

服务端(应答方)

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
)

func main() {
	// 创建REP套接字
	responder, _ := zmq4.NewSocket(zmq4.REP)
	defer responder.Close()
	responder.Bind("tcp://*:5555")

	for {
		// 等待请求
		msg, _ := responder.Recv(0)
		fmt.Printf("Received request: [%s]\n", msg)

		// 发送响应
		responder.Send("World", 0)
	}
}

客户端(请求方)

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
	"time"
)

func main() {
	// 创建REQ套接字
	requester, _ := zmq4.NewSocket(zmq4.REQ)
	defer requester.Close()
	requester.Connect("tcp://localhost:5555")

	for i := 0; i < 10; i++ {
		// 发送请求
		requester.Send("Hello", 0)

		// 接收响应
		reply, _ := requester.Recv(0)
		fmt.Printf("Received reply %d [%s]\n", i, reply)
		
		time.Sleep(time.Second)
	}
}

2. 发布-订阅模式(PUB-SUB)

发布者

package main

import (
	"github.com/pebbe/zmq4"
	"time"
)

func main() {
	publisher, _ := zmq4.NewSocket(zmq4.PUB)
	defer publisher.Close()
	publisher.Bind("tcp://*:5556")

	for {
		// 发布两条消息,分别带有A和B前缀
		publisher.Send("A Hello", 0)
		publisher.Send("B World", 0)
		time.Sleep(time.Second)
	}
}

订阅者

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
)

func main() {
	subscriber, _ := zmq4.NewSocket(zmq4.SUB)
	defer subscriber.Close()
	subscriber.Connect("tcp://localhost:5556")
	
	// 只订阅以"A"开头的消息
	subscriber.SetSubscribe("A")

	for {
		msg, _ := subscriber.Recv(0)
		fmt.Printf("Received: %s\n", msg)
	}
}

3. 管道模式(PUSH-PULL)

任务分发者

package main

import (
	"github.com/pebbe/zmq4"
	"time"
)

func main() {
	sender, _ := zmq4.NewSocket(zmq4.PUSH)
	defer sender.Close()
	sender.Bind("tcp://*:5557")

	// 让所有worker连接
	time.Sleep(time.Second)

	// 发送100个任务
	for i := 0; i < 100; i++ {
		sender.Send("task", 0)
	}
}

工作者

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
)

func main() {
	receiver, _ := zmq4.NewSocket(zmq4.PULL)
	defer receiver.Close()
	receiver.Connect("tcp://localhost:5557")

	for {
		task, _ := receiver.Recv(0)
		fmt.Printf("Processing task: %s\n", task)
	}
}

高级特性

多部分消息

// 发送多部分消息
socket.Send("header", zmq4.SNDMORE)
socket.Send("body", 0)

// 接收多部分消息
part1, _ := socket.Recv(0)
more, _ := socket.GetRcvmore()
if more {
    part2, _ := socket.Recv(0)
}

设置选项

// 设置超时
socket.SetRcvtimeo(time.Second * 5)

// 设置身份标识
socket.SetIdentity("worker1")

// 设置高水位线
socket.SetRcvhwm(1000)

错误处理最佳实践

func createSocket(socketType zmq4.Type, endpoint string) (*zmq4.Socket, error) {
    socket, err := zmq4.NewSocket(socketType)
    if err != nil {
        return nil, fmt.Errorf("failed to create socket: %v", err)
    }
    
    err = socket.Bind(endpoint)
    if err != nil {
        socket.Close()
        return nil, fmt.Errorf("failed to bind to %s: %v", endpoint, err)
    }
    
    return socket, nil
}

性能优化建议

  1. 复用Socket对象而不是频繁创建销毁
  2. 合理设置高水位线(HWM)防止内存溢出
  3. 考虑使用goroutine处理消息提高并发能力
  4. 对于高吞吐场景,使用PUSH/PULL模式比REQ/REP更高效

zmq4库提供了ZeroMQ的全部功能,以上示例展示了最常见的几种模式。根据实际需求,你可以组合这些模式构建更复杂的分布式系统架构。

回到顶部