golang实现ZeroMQ消息队列通信的插件库zmq4的使用

Golang实现ZeroMQ消息队列通信的插件库zmq4的使用

zmq4是一个Golang接口,支持ZeroMQ版本4及以上。

警告

从Go 1.14开始,在Unix-like系统上会出现大量中断信号调用。请查看包文档顶部以获取修复方法。

要求

zmq4只是ZeroMQ库的包装器,不包括库本身。所以你需要安装ZeroMQ,包括它的开发文件。在Linux和Darwin上可以检查:

$ pkg-config --modversion libzmq
4.3.1

Go编译器必须能够编译C代码,可以检查:

$ go env CGO_ENABLED
1

不能进行交叉编译,这会禁用C。

Windows

使用CGO_CFLAGSCGO_LDFLAGS环境变量构建,例如:

$env:CGO_CFLAGS='-ID:/dev/vcpkg/installed/x64-windows/include'
$env:CGO_LDFLAGS='-LD:/dev/vcpkg/installed/x64-windows/lib -l:libzmq-mt-4_3_4.lib'

部署结果程序时需要libzmq-mt-4_3_4.dll

安装

go get github.com/pebbe/zmq4

示例代码

基本REQ/REP模式示例

服务器端(REP):

package main

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
)

func main() {
    // 创建REP socket
    responder, _ := zmq.NewSocket(zmq.REP)
    defer responder.Close()
    
    // 绑定到TCP端口5555
    responder.Bind("tcp://*:5555")
    
    for {
        // 等待客户端消息
        msg, _ := responder.Recv(0)
        fmt.Println("收到:", msg)
        
        // 发送回复
        responder.Send("World", 0)
    }
}

客户端(REQ):

package main

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
)

func main() {
    // 创建REQ socket
    requester, _ := zmq.NewSocket(zmq.REQ)
    defer requester.Close()
    
    // 连接到服务器
    requester.Connect("tcp://localhost:5555")
    
    for i := 0; i < 10; i++ {
        // 发送请求
        requester.Send("Hello", 0)
        
        // 等待回复
        reply, _ := requester.Recv(0)
        fmt.Println("收到回复:", reply)
    }
}

PUB/SUB模式示例

发布者:

package main

import (
    "time"
    zmq "github.com/pebbe/zmq4"
)

func main() {
    publisher, _ := zmq.NewSocket(zmq.PUB)
    defer publisher.Close()
    publisher.Bind("tcp://*:5556")
    
    for {
        // 发布两条消息,A和B
        publisher.Send("A Hello", 0)
        publisher.Send("B World", 0)
        time.Sleep(time.Second)
    }
}

订阅者:

package main

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
)

func main() {
    subscriber, _ := zmq.NewSocket(zmq.SUB)
    defer subscriber.Close()
    subscriber.Connect("tcp://localhost:5556")
    
    // 只订阅A开头的信息
    subscriber.SetSubscribe("A")
    
    for {
        // 接收消息
        msg, _ := subscriber.Recv(0)
        fmt.Println("收到:", msg)
    }
}

API变更

在2014-06-27的提交中,AuthAllowAuthDeny函数有API变更。

旧版:

func AuthAllow(addresses ...string)
func AuthDeny(addresses ...string)

新版:

func AuthAllow(domain string, addresses ...string)
func AuthDeny(domain string, addresses ...string)

如果domain可以解析为IP地址,它将被解释为IP地址,并且它和所有剩余地址将被添加到所有域中。

地址可以是单个IP地址,也可以是CIDR格式的IP地址和掩码,例如"123.123.123.0/24"。


更多关于golang实现ZeroMQ消息队列通信的插件库zmq4的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现ZeroMQ消息队列通信的插件库zmq4的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用zmq4实现ZeroMQ消息队列通信

ZeroMQ是一个高性能异步消息库,而zmq4是Go语言的ZeroMQ绑定。下面我将详细介绍如何使用zmq4库在Go中实现各种ZeroMQ模式的消息通信。

安装zmq4

首先需要安装zmq4库和ZeroMQ开发库:

# 安装ZeroMQ开发库 (Ubuntu/Debian)
sudo apt-get install libzmq3-dev

# 安装zmq4
go get github.com/pebbe/zmq4

基本通信模式

1. REQ-REP (请求-响应模式)

服务端(REP):

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
)

func main() {
	// 创建REP套接字
	socket, _ := zmq4.NewSocket(zmq4.REP)
	defer socket.Close()
	socket.Bind("tcp://*:5555")

	for {
		// 接收请求
		msg, _ := socket.Recv(0)
		fmt.Printf("Received request: %s\n", msg)

		// 发送响应
		reply := fmt.Sprintf("Reply to %s", msg)
		socket.Send(reply, 0)
	}
}

客户端(REQ):

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
	"time"
)

func main() {
	// 创建REQ套接字
	socket, _ := zmq4.NewSocket(zmq4.REQ)
	defer socket.Close()
	socket.Connect("tcp://localhost:5555")

	for i := 0; i < 10; i++ {
		// 发送请求
		msg := fmt.Sprintf("Hello %d", i)
		socket.Send(msg, 0)
		fmt.Printf("Sent: %s\n", msg)

		// 接收响应
		reply, _ := socket.Recv(0)
		fmt.Printf("Received reply: %s\n", reply)
		
		time.Sleep(time.Second)
	}
}

2. PUB-SUB (发布-订阅模式)

发布者(PUB):

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
	"time"
)

func main() {
	// 创建PUB套接字
	socket, _ := zmq4.NewSocket(zmq4.PUB)
	defer socket.Close()
	socket.Bind("tcp://*:5556")

	for i := 0; ; i++ {
		// 发布消息
		msg := fmt.Sprintf("Update %d", i)
		socket.Send(msg, 0)
		fmt.Printf("Published: %s\n", msg)
		time.Sleep(time.Second)
	}
}

订阅者(SUB):

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
)

func main() {
	// 创建SUB套接字
	socket, _ := zmq4.NewSocket(zmq4.SUB)
	defer socket.Close()
	socket.Connect("tcp://localhost:5556")
	socket.SetSubscribe("") // 订阅所有消息

	for {
		// 接收消息
		msg, _ := socket.Recv(0)
		fmt.Printf("Received: %s\n", msg)
	}
}

3. PUSH-PULL (管道模式)

推送者(PUSH):

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
	"time"
)

func main() {
	// 创建PUSH套接字
	socket, _ := zmq4.NewSocket(zmq4.PUSH)
	defer socket.Close()
	socket.Bind("tcp://*:5557")

	for i := 0; ; i++ {
		// 推送任务
		task := fmt.Sprintf("Task %d", i)
		socket.Send(task, 0)
		fmt.Printf("Sent task: %s\n", task)
		time.Sleep(time.Second)
	}
}

拉取者(PULL):

package main

import (
	"fmt"
	"github.com/pebbe/zmq4"
)

func main() {
	// 创建PULL套接字
	socket, _ := zmq4.NewSocket(zmq4.PULL)
	defer socket.Close()
	socket.Connect("tcp://localhost:5557")

	for {
		// 接收任务
		task, _ := socket.Recv(0)
		fmt.Printf("Received task: %s\n", task)
	}
}

高级特性

多部分消息

ZeroMQ支持发送多部分消息:

// 发送多部分消息
socket.Send("header", zmq4.SNDMORE)
socket.Send("body", 0)

// 接收多部分消息
part1, _ := socket.Recv(0)
more, _ := socket.GetRcvmore()
if more {
    part2, _ := socket.Recv(0)
}

设置套接字选项

// 设置接收超时
socket.SetRcvtimeo(time.Second * 5)

// 设置发送高水位线
socket.SetSndhwm(1000)

// 设置身份标识
socket.SetIdentity("worker1")

代理模式

ZeroMQ支持使用代理模式实现更复杂的拓扑结构:

package main

import "github.com/pebbe/zmq4"

func main() {
	// 创建前端和后端套接字
	frontend, _ := zmq4.NewSocket(zmq4.ROUTER)
	backend, _ := zmq4.NewSocket(zmq4.DEALER)
	defer frontend.Close()
	defer backend.Close()
	
	frontend.Bind("tcp://*:5558")
	backend.Bind("tcp://*:5559")
	
	// 启动代理
	err := zmq4.Proxy(frontend, backend, nil)
	if err != nil {
		panic(err)
	}
}

错误处理

在实际应用中应该妥善处理错误:

socket, err := zmq4.NewSocket(zmq4.REQ)
if err != nil {
    panic(err)
}
defer socket.Close()

err = socket.Connect("tcp://localhost:5555")
if err != nil {
    panic(err)
}

_, err = socket.Send("Hello", 0)
if err != nil {
    panic(err)
}

msg, err := socket.Recv(0)
if err != nil {
    panic(err)
}

总结

zmq4为Go提供了完整的ZeroMQ功能实现,支持所有ZeroMQ模式:

  • 请求-响应模式(REQ-REP)
  • 发布-订阅模式(PUB-SUB)
  • 管道模式(PUSH-PULL)
  • 路由-代理模式(ROUTER-DEALER)
  • 配对模式(PAIR)

通过合理选择模式,可以构建各种分布式系统架构。zmq4的性能接近原生ZeroMQ,是Go语言中实现高性能消息通信的优秀选择。

回到顶部