golang实现ZeroMQ消息队列通信的插件库zmq4的使用
Golang实现ZeroMQ消息队列通信的插件库zmq4的使用
zmq4是一个Golang接口,支持ZeroMQ版本4及以上。
警告
从Go 1.14开始,在Unix-like系统上会出现大量中断信号调用。请查看包文档顶部以获取修复方法。
要求
zmq4只是ZeroMQ库的包装器,不包括库本身。所以你需要安装ZeroMQ,包括它的开发文件。在Linux和Darwin上可以检查:
$ pkg-config --modversion libzmq
4.3.1
Go编译器必须能够编译C代码,可以检查:
$ go env CGO_ENABLED
1
不能进行交叉编译,这会禁用C。
Windows
使用CGO_CFLAGS
和CGO_LDFLAGS
环境变量构建,例如:
$env:CGO_CFLAGS='-ID:/dev/vcpkg/installed/x64-windows/include'
$env:CGO_LDFLAGS='-LD:/dev/vcpkg/installed/x64-windows/lib -l:libzmq-mt-4_3_4.lib'
部署结果程序时需要libzmq-mt-4_3_4.dll
。
安装
go get github.com/pebbe/zmq4
示例代码
基本REQ/REP模式示例
服务器端(REP):
package main
import (
"fmt"
zmq "github.com/pebbe/zmq4"
)
func main() {
// 创建REP socket
responder, _ := zmq.NewSocket(zmq.REP)
defer responder.Close()
// 绑定到TCP端口5555
responder.Bind("tcp://*:5555")
for {
// 等待客户端消息
msg, _ := responder.Recv(0)
fmt.Println("收到:", msg)
// 发送回复
responder.Send("World", 0)
}
}
客户端(REQ):
package main
import (
"fmt"
zmq "github.com/pebbe/zmq4"
)
func main() {
// 创建REQ socket
requester, _ := zmq.NewSocket(zmq.REQ)
defer requester.Close()
// 连接到服务器
requester.Connect("tcp://localhost:5555")
for i := 0; i < 10; i++ {
// 发送请求
requester.Send("Hello", 0)
// 等待回复
reply, _ := requester.Recv(0)
fmt.Println("收到回复:", reply)
}
}
PUB/SUB模式示例
发布者:
package main
import (
"time"
zmq "github.com/pebbe/zmq4"
)
func main() {
publisher, _ := zmq.NewSocket(zmq.PUB)
defer publisher.Close()
publisher.Bind("tcp://*:5556")
for {
// 发布两条消息,A和B
publisher.Send("A Hello", 0)
publisher.Send("B World", 0)
time.Sleep(time.Second)
}
}
订阅者:
package main
import (
"fmt"
zmq "github.com/pebbe/zmq4"
)
func main() {
subscriber, _ := zmq.NewSocket(zmq.SUB)
defer subscriber.Close()
subscriber.Connect("tcp://localhost:5556")
// 只订阅A开头的信息
subscriber.SetSubscribe("A")
for {
// 接收消息
msg, _ := subscriber.Recv(0)
fmt.Println("收到:", msg)
}
}
API变更
在2014-06-27的提交中,AuthAllow
和AuthDeny
函数有API变更。
旧版:
func AuthAllow(addresses ...string)
func AuthDeny(addresses ...string)
新版:
func AuthAllow(domain string, addresses ...string)
func AuthDeny(domain string, addresses ...string)
如果domain
可以解析为IP地址,它将被解释为IP地址,并且它和所有剩余地址将被添加到所有域中。
地址可以是单个IP地址,也可以是CIDR格式的IP地址和掩码,例如"123.123.123.0/24"。
更多关于golang实现ZeroMQ消息队列通信的插件库zmq4的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现ZeroMQ消息队列通信的插件库zmq4的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用zmq4实现ZeroMQ消息队列通信
ZeroMQ是一个高性能异步消息库,而zmq4是Go语言的ZeroMQ绑定。下面我将详细介绍如何使用zmq4库在Go中实现各种ZeroMQ模式的消息通信。
安装zmq4
首先需要安装zmq4库和ZeroMQ开发库:
# 安装ZeroMQ开发库 (Ubuntu/Debian)
sudo apt-get install libzmq3-dev
# 安装zmq4
go get github.com/pebbe/zmq4
基本通信模式
1. REQ-REP (请求-响应模式)
服务端(REP):
package main
import (
"fmt"
"github.com/pebbe/zmq4"
)
func main() {
// 创建REP套接字
socket, _ := zmq4.NewSocket(zmq4.REP)
defer socket.Close()
socket.Bind("tcp://*:5555")
for {
// 接收请求
msg, _ := socket.Recv(0)
fmt.Printf("Received request: %s\n", msg)
// 发送响应
reply := fmt.Sprintf("Reply to %s", msg)
socket.Send(reply, 0)
}
}
客户端(REQ):
package main
import (
"fmt"
"github.com/pebbe/zmq4"
"time"
)
func main() {
// 创建REQ套接字
socket, _ := zmq4.NewSocket(zmq4.REQ)
defer socket.Close()
socket.Connect("tcp://localhost:5555")
for i := 0; i < 10; i++ {
// 发送请求
msg := fmt.Sprintf("Hello %d", i)
socket.Send(msg, 0)
fmt.Printf("Sent: %s\n", msg)
// 接收响应
reply, _ := socket.Recv(0)
fmt.Printf("Received reply: %s\n", reply)
time.Sleep(time.Second)
}
}
2. PUB-SUB (发布-订阅模式)
发布者(PUB):
package main
import (
"fmt"
"github.com/pebbe/zmq4"
"time"
)
func main() {
// 创建PUB套接字
socket, _ := zmq4.NewSocket(zmq4.PUB)
defer socket.Close()
socket.Bind("tcp://*:5556")
for i := 0; ; i++ {
// 发布消息
msg := fmt.Sprintf("Update %d", i)
socket.Send(msg, 0)
fmt.Printf("Published: %s\n", msg)
time.Sleep(time.Second)
}
}
订阅者(SUB):
package main
import (
"fmt"
"github.com/pebbe/zmq4"
)
func main() {
// 创建SUB套接字
socket, _ := zmq4.NewSocket(zmq4.SUB)
defer socket.Close()
socket.Connect("tcp://localhost:5556")
socket.SetSubscribe("") // 订阅所有消息
for {
// 接收消息
msg, _ := socket.Recv(0)
fmt.Printf("Received: %s\n", msg)
}
}
3. PUSH-PULL (管道模式)
推送者(PUSH):
package main
import (
"fmt"
"github.com/pebbe/zmq4"
"time"
)
func main() {
// 创建PUSH套接字
socket, _ := zmq4.NewSocket(zmq4.PUSH)
defer socket.Close()
socket.Bind("tcp://*:5557")
for i := 0; ; i++ {
// 推送任务
task := fmt.Sprintf("Task %d", i)
socket.Send(task, 0)
fmt.Printf("Sent task: %s\n", task)
time.Sleep(time.Second)
}
}
拉取者(PULL):
package main
import (
"fmt"
"github.com/pebbe/zmq4"
)
func main() {
// 创建PULL套接字
socket, _ := zmq4.NewSocket(zmq4.PULL)
defer socket.Close()
socket.Connect("tcp://localhost:5557")
for {
// 接收任务
task, _ := socket.Recv(0)
fmt.Printf("Received task: %s\n", task)
}
}
高级特性
多部分消息
ZeroMQ支持发送多部分消息:
// 发送多部分消息
socket.Send("header", zmq4.SNDMORE)
socket.Send("body", 0)
// 接收多部分消息
part1, _ := socket.Recv(0)
more, _ := socket.GetRcvmore()
if more {
part2, _ := socket.Recv(0)
}
设置套接字选项
// 设置接收超时
socket.SetRcvtimeo(time.Second * 5)
// 设置发送高水位线
socket.SetSndhwm(1000)
// 设置身份标识
socket.SetIdentity("worker1")
代理模式
ZeroMQ支持使用代理模式实现更复杂的拓扑结构:
package main
import "github.com/pebbe/zmq4"
func main() {
// 创建前端和后端套接字
frontend, _ := zmq4.NewSocket(zmq4.ROUTER)
backend, _ := zmq4.NewSocket(zmq4.DEALER)
defer frontend.Close()
defer backend.Close()
frontend.Bind("tcp://*:5558")
backend.Bind("tcp://*:5559")
// 启动代理
err := zmq4.Proxy(frontend, backend, nil)
if err != nil {
panic(err)
}
}
错误处理
在实际应用中应该妥善处理错误:
socket, err := zmq4.NewSocket(zmq4.REQ)
if err != nil {
panic(err)
}
defer socket.Close()
err = socket.Connect("tcp://localhost:5555")
if err != nil {
panic(err)
}
_, err = socket.Send("Hello", 0)
if err != nil {
panic(err)
}
msg, err := socket.Recv(0)
if err != nil {
panic(err)
}
总结
zmq4为Go提供了完整的ZeroMQ功能实现,支持所有ZeroMQ模式:
- 请求-响应模式(REQ-REP)
- 发布-订阅模式(PUB-SUB)
- 管道模式(PUSH-PULL)
- 路由-代理模式(ROUTER-DEALER)
- 配对模式(PAIR)
通过合理选择模式,可以构建各种分布式系统架构。zmq4的性能接近原生ZeroMQ,是Go语言中实现高性能消息通信的优秀选择。