golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用
Golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用
Mangos™是一个纯Go实现的SP(Scalability Protocols)消息系统,也就是俗称的nanomsg。
主要特性
- 实现了Req/Rep、Pub/Sub、Pair、Bus、Push/Pull和Surveyor/Respondent等模式
- 支持TCP、inproc、IPC、WebSocket、WebSocket/TLS和TLS等传输协议
- 与nanomsg和NNG基本兼容(除ZeroTier传输和PAIRv1协议外)
安装
import "go.nanomsg.org/mangos/v3"
示例代码
1. REQ/REP模式示例
服务端(REP):
package main
import (
"fmt"
"log"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/rep"
_ "go.nanomsg.org/mangos/v3/transport/all"
)
func server(url string) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = rep.NewSocket(); err != nil {
log.Fatalf("can't get new rep socket: %s", err)
}
if err = sock.Listen(url); err != nil {
log.Fatalf("can't listen on rep socket: %s", err.Error())
}
for {
// 接收请求
if msg, err = sock.Recv(); err != nil {
log.Fatalf("can't receive on rep socket: %s", err.Error())
}
fmt.Printf("SERVER: RECEIVED \"%s\" REQUEST\n", string(msg))
// 发送响应
time.Sleep(time.Second)
if err = sock.Send([]byte("World")); err != nil {
log.Fatalf("can't send reply: %s", err.Error())
}
}
}
func main() {
server("tcp://127.0.0.1:40899")
}
客户端(REQ):
package main
import (
"fmt"
"log"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/req"
_ "go.nanomsg.org/mangos/v3/transport/all"
)
func client(url string, name string) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = req.NewSocket(); err != nil {
log.Fatalf("can't get new req socket: %s", err.Error())
}
if err = sock.Dial(url); err != nil {
log.Fatalf("can't dial on req socket: %s", err.Error())
}
for i := 0; i < 3; i++ {
// 发送请求
fmt.Printf("CLIENT(%s): SENDING \"%s\"\n", name, "Hello")
if err = sock.Send([]byte("Hello")); err != nil {
log.Fatalf("can't send message: %s", err.Error())
}
// 接收响应
if msg, err = sock.Recv(); err != nil {
log.Fatalf("can't receive date: %s", err.Error())
}
fmt.Printf("CLIENT(%s): RECEIVED \"%s\"\n", name, string(msg))
time.Sleep(time.Second)
}
sock.Close()
}
func main() {
go client("tcp://127.0.0.1:40899", "CLIENT1")
client("tcp://127.0.0.1:40899", "CLIENT2")
}
2. PUB/SUB模式示例
发布者(PUB):
package main
import (
"fmt"
"log"
"time"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/pub"
_ "go.nanomsg.org/mangos/v3/transport/all"
)
func pubServer(url string) {
var sock mangos.Socket
var err error
if sock, err = pub.NewSocket(); err != nil {
log.Fatalf("can't get new pub socket: %s", err.Error())
}
if err = sock.Listen(url); err != nil {
log.Fatalf("can't listen on pub socket: %s", err.Error())
}
for {
// 发布消息
msg := fmt.Sprintf("Current time is %s", time.Now().Format(time.RFC3339))
fmt.Printf("PUBLISHER: PUBLISHING DATE %s\n", msg)
if err = sock.Send([]byte(msg)); err != nil {
log.Fatalf("can't publish: %s", err.Error())
}
time.Sleep(time.Second)
}
}
func main() {
pubServer("tcp://127.0.0.1:40898")
}
订阅者(SUB):
package main
import (
"fmt"
"log"
"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/sub"
_ "go.nanomsg.org/mangos/v3/transport/all"
)
func subClient(url string, name string) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = sub.NewSocket(); err != nil {
log.Fatalf("can't get new sub socket: %s", err.Error())
}
if err = sock.Dial(url); err != nil {
log.Fatalf("can't dial on sub socket: %s", err.Error())
}
// 空字符串表示订阅所有消息
err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
if err != nil {
log.Fatalf("cannot subscribe: %s", err.Error())
}
for {
if msg, err = sock.Recv(); err != nil {
log.Fatalf("can't recv: %s", err.Error())
}
fmt.Printf("SUBSCRIBER(%s): RECEIVED \"%s\"\n", name, string(msg))
}
}
func main() {
go subClient("tcp://127.0.0.1:40898", "SUB1")
subClient("tcp://127.0.0.1:40898", "SUB2")
}
测试
运行测试:
go test go.nanomsg.org/mangos/v3/...
运行基准测试:
go test -bench=. go.nanomsg.org/mangos/v3/test
注意事项
- 导入路径已更改为
go.nanomsg.org/mangos/v3
- 仅支持Go 1.22及以上版本
- 所有传输协议需要显式导入
_ "go.nanomsg.org/mangos/v3/transport/all"
Mangos™是Garrett D’Amore的商标。
更多关于golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用Mangos实现高性能Nanomsg网络通信
Mangos是Nanomsg协议的纯Go实现,提供了高性能、可扩展的网络通信能力。下面我将详细介绍如何使用Mangos库来实现各种通信模式。
安装Mangos
首先安装Mangos库:
go get -u github.com/nanomsg/mangos/v2
基本通信模式
1. 请求/回复模式(REQ/REP)
服务器端(REP):
package main
import (
"fmt"
"log"
"time"
"go.nanomsg.org/mangos/v2"
"go.nanomsg.org/mangos/v2/protocol/rep"
"go.nanomsg.org/mangos/v2/protocol/req"
// 注册传输
_ "go.nanomsg.org/mangos/v2/transport/all"
)
func server(url string) {
sock, err := rep.NewSocket()
if err != nil {
log.Fatalf("can't get new rep socket: %v", err)
}
if err = sock.Listen(url); err != nil {
log.Fatalf("can't listen on rep socket: %v", err)
}
for {
msg, err := sock.Recv()
if err != nil {
log.Printf("can't receive: %v", err)
continue
}
fmt.Printf("SERVER: RECEIVED \"%s\"\n", string(msg))
// 发送回复
reply := []byte(fmt.Sprintf("Reply to: %s", string(msg)))
if err = sock.Send(reply); err != nil {
log.Printf("can't send reply: %v", err)
}
}
}
客户端(REQ):
func client(url string) {
sock, err := req.NewSocket()
if err != nil {
log.Fatalf("can't get new req socket: %v", err)
}
if err = sock.Dial(url); err != nil {
log.Fatalf("can't dial on req socket: %v", err)
}
for i := 0; i < 5; i++ {
msg := []byte(fmt.Sprintf("Request #%d", i))
fmt.Printf("CLIENT: SENDING \"%s\"\n", string(msg))
if err = sock.Send(msg); err != nil {
log.Printf("can't send message: %v", err)
continue
}
reply, err := sock.Recv()
if err != nil {
log.Printf("can't receive reply: %v", err)
continue
}
fmt.Printf("CLIENT: RECEIVED \"%s\"\n", string(reply))
time.Sleep(time.Second)
}
}
func main() {
url := "tcp://127.0.0.1:40899"
go server(url)
time.Sleep(time.Second) // 确保服务器先启动
client(url)
}
2. 发布/订阅模式(PUB/SUB)
发布者(PUB):
func publisher(url string) {
sock, err := pub.NewSocket()
if err != nil {
log.Fatalf("can't get new pub socket: %v", err)
}
if err = sock.Listen(url); err != nil {
log.Fatalf("can't listen on pub socket: %v", err)
}
topics := []string{"sports", "news", "weather"}
for i := 0; ; i++ {
topic := topics[i%len(topics)]
msg := []byte(fmt.Sprintf("%s update #%d", topic, i))
fmt.Printf("PUBLISHER: SENDING \"%s\"\n", string(msg))
if err = sock.Send(msg); err != nil {
log.Printf("can't send message: %v", err)
}
time.Sleep(time.Second)
}
}
订阅者(SUB):
func subscriber(url string, topic string) {
sock, err := sub.NewSocket()
if err != nil {
log.Fatalf("can't get new sub socket: %v", err)
}
if err = sock.Dial(url); err != nil {
log.Fatalf("can't dial on sub socket: %v", err)
}
// 设置订阅主题
err = sock.SetOption(mangos.OptionSubscribe, []byte(topic))
if err != nil {
log.Fatalf("can't subscribe: %v", err)
}
for {
msg, err := sock.Recv()
if err != nil {
log.Printf("can't receive: %v", err)
continue
}
fmt.Printf("SUBSCRIBER(%s): RECEIVED \"%s\"\n", topic, string(msg))
}
}
func main() {
url := "tcp://127.0.0.1:40898"
go publisher(url)
time.Sleep(time.Second) // 确保发布者先启动
go subscriber(url, "sports")
go subscriber(url, "news")
subscriber(url, "weather") // 最后一个在主goroutine运行
}
高级特性
1. 多路复用
func multiplexServer(url string) {
sock, err := rep.NewSocket()
if err != nil {
log.Fatalf("can't get new rep socket: %v", err)
}
// 添加多个传输端点
if err = sock.Listen(url); err != nil {
log.Fatalf("can't listen on rep socket: %v", err)
}
if err = sock.Listen("ipc:///tmp/multiplex.ipc"); err != nil {
log.Fatalf("can't listen on ipc socket: %v", err)
}
// 处理逻辑...
}
2. 设置选项
func setOptions() {
sock, err := req.NewSocket()
if err != nil {
log.Fatal(err)
}
// 设置重试间隔
err = sock.SetOption(mangos.OptionRetryTime, time.Second)
if err != nil {
log.Printf("can't set retry time: %v", err)
}
// 设置接收超时
err = sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
if err != nil {
log.Printf("can't set recv deadline: %v", err)
}
// 设置发送缓冲区大小
err = sock.SetOption(mangos.OptionWriteQLen, 100)
if err != nil {
log.Printf("can't set write queue length: %v", err)
}
}
性能优化建议
- 批量处理消息:对于高吞吐量场景,考虑批量发送/接收消息
- 连接池:为REQ套接字实现连接池以避免频繁建立连接
- 异步处理:使用goroutine处理接收到的消息
- 缓冲区调优:根据负载调整读写缓冲区大小
- 协议选择:对于本地通信,考虑使用IPC而不是TCP
Mangos提供了灵活而强大的网络通信能力,通过合理使用各种模式和优化手段,可以构建出高性能的分布式系统。