golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用

Golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用

Mangos™是一个纯Go实现的SP(Scalability Protocols)消息系统,也就是俗称的nanomsg。

主要特性

  • 实现了Req/Rep、Pub/Sub、Pair、Bus、Push/Pull和Surveyor/Respondent等模式
  • 支持TCP、inproc、IPC、WebSocket、WebSocket/TLS和TLS等传输协议
  • 与nanomsg和NNG基本兼容(除ZeroTier传输和PAIRv1协议外)

安装

import "go.nanomsg.org/mangos/v3"

示例代码

1. REQ/REP模式示例

服务端(REP):

package main

import (
	"fmt"
	"log"
	"time"

	"go.nanomsg.org/mangos/v3"
	"go.nanomsg.org/mangos/v3/protocol/rep"
	_ "go.nanomsg.org/mangos/v3/transport/all"
)

func server(url string) {
	var sock mangos.Socket
	var err error
	var msg []byte

	if sock, err = rep.NewSocket(); err != nil {
		log.Fatalf("can't get new rep socket: %s", err)
	}
	if err = sock.Listen(url); err != nil {
		log.Fatalf("can't listen on rep socket: %s", err.Error())
	}
	for {
		// 接收请求
		if msg, err = sock.Recv(); err != nil {
			log.Fatalf("can't receive on rep socket: %s", err.Error())
		}
		fmt.Printf("SERVER: RECEIVED \"%s\" REQUEST\n", string(msg))
		
		// 发送响应
		time.Sleep(time.Second)
		if err = sock.Send([]byte("World")); err != nil {
			log.Fatalf("can't send reply: %s", err.Error())
		}
	}
}

func main() {
	server("tcp://127.0.0.1:40899")
}

客户端(REQ):

package main

import (
	"fmt"
	"log"
	"time"

	"go.nanomsg.org/mangos/v3"
	"go.nanomsg.org/mangos/v3/protocol/req"
	_ "go.nanomsg.org/mangos/v3/transport/all"
)

func client(url string, name string) {
	var sock mangos.Socket
	var err error
	var msg []byte

	if sock, err = req.NewSocket(); err != nil {
		log.Fatalf("can't get new req socket: %s", err.Error())
	}
	if err = sock.Dial(url); err != nil {
		log.Fatalf("can't dial on req socket: %s", err.Error())
	}

	for i := 0; i < 3; i++ {
		// 发送请求
		fmt.Printf("CLIENT(%s): SENDING \"%s\"\n", name, "Hello")
		if err = sock.Send([]byte("Hello")); err != nil {
			log.Fatalf("can't send message: %s", err.Error())
		}
		
		// 接收响应
		if msg, err = sock.Recv(); err != nil {
			log.Fatalf("can't receive date: %s", err.Error())
		}
		fmt.Printf("CLIENT(%s): RECEIVED \"%s\"\n", name, string(msg))
		time.Sleep(time.Second)
	}
	sock.Close()
}

func main() {
	go client("tcp://127.0.0.1:40899", "CLIENT1")
	client("tcp://127.0.0.1:40899", "CLIENT2")
}

2. PUB/SUB模式示例

发布者(PUB):

package main

import (
	"fmt"
	"log"
	"time"

	"go.nanomsg.org/mangos/v3"
	"go.nanomsg.org/mangos/v3/protocol/pub"
	_ "go.nanomsg.org/mangos/v3/transport/all"
)

func pubServer(url string) {
	var sock mangos.Socket
	var err error

	if sock, err = pub.NewSocket(); err != nil {
		log.Fatalf("can't get new pub socket: %s", err.Error())
	}
	if err = sock.Listen(url); err != nil {
		log.Fatalf("can't listen on pub socket: %s", err.Error())
	}

	for {
		// 发布消息
		msg := fmt.Sprintf("Current time is %s", time.Now().Format(time.RFC3339))
		fmt.Printf("PUBLISHER: PUBLISHING DATE %s\n", msg)
		if err = sock.Send([]byte(msg)); err != nil {
			log.Fatalf("can't publish: %s", err.Error())
		}
		time.Sleep(time.Second)
	}
}

func main() {
	pubServer("tcp://127.0.0.1:40898")
}

订阅者(SUB):

package main

import (
	"fmt"
	"log"

	"go.nanomsg.org/mangos/v3"
	"go.nanomsg.org/mangos/v3/protocol/sub"
	_ "go.nanomsg.org/mangos/v3/transport/all"
)

func subClient(url string, name string) {
	var sock mangos.Socket
	var err error
	var msg []byte

	if sock, err = sub.NewSocket(); err != nil {
		log.Fatalf("can't get new sub socket: %s", err.Error())
	}
	if err = sock.Dial(url); err != nil {
		log.Fatalf("can't dial on sub socket: %s", err.Error())
	}
	// 空字符串表示订阅所有消息
	err = sock.SetOption(mangos.OptionSubscribe, []byte(""))
	if err != nil {
		log.Fatalf("cannot subscribe: %s", err.Error())
	}
	for {
		if msg, err = sock.Recv(); err != nil {
			log.Fatalf("can't recv: %s", err.Error())
		}
		fmt.Printf("SUBSCRIBER(%s): RECEIVED \"%s\"\n", name, string(msg))
	}
}

func main() {
	go subClient("tcp://127.0.0.1:40898", "SUB1")
	subClient("tcp://127.0.0.1:40898", "SUB2")
}

测试

运行测试:

go test go.nanomsg.org/mangos/v3/...

运行基准测试:

go test -bench=. go.nanomsg.org/mangos/v3/test

注意事项

  1. 导入路径已更改为go.nanomsg.org/mangos/v3
  2. 仅支持Go 1.22及以上版本
  3. 所有传输协议需要显式导入_ "go.nanomsg.org/mangos/v3/transport/all"

Mangos™是Garrett D’Amore的商标。


更多关于golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现Nanomsg可扩展协议的高性能网络通信插件库mangos的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Mangos实现高性能Nanomsg网络通信

Mangos是Nanomsg协议的纯Go实现,提供了高性能、可扩展的网络通信能力。下面我将详细介绍如何使用Mangos库来实现各种通信模式。

安装Mangos

首先安装Mangos库:

go get -u github.com/nanomsg/mangos/v2

基本通信模式

1. 请求/回复模式(REQ/REP)

服务器端(REP):

package main

import (
	"fmt"
	"log"
	"time"

	"go.nanomsg.org/mangos/v2"
	"go.nanomsg.org/mangos/v2/protocol/rep"
	"go.nanomsg.org/mangos/v2/protocol/req"

	// 注册传输
	_ "go.nanomsg.org/mangos/v2/transport/all"
)

func server(url string) {
	sock, err := rep.NewSocket()
	if err != nil {
		log.Fatalf("can't get new rep socket: %v", err)
	}
	if err = sock.Listen(url); err != nil {
		log.Fatalf("can't listen on rep socket: %v", err)
	}

	for {
		msg, err := sock.Recv()
		if err != nil {
			log.Printf("can't receive: %v", err)
			continue
		}
		fmt.Printf("SERVER: RECEIVED \"%s\"\n", string(msg))
		
		// 发送回复
		reply := []byte(fmt.Sprintf("Reply to: %s", string(msg)))
		if err = sock.Send(reply); err != nil {
			log.Printf("can't send reply: %v", err)
		}
	}
}

客户端(REQ):

func client(url string) {
	sock, err := req.NewSocket()
	if err != nil {
		log.Fatalf("can't get new req socket: %v", err)
	}
	if err = sock.Dial(url); err != nil {
		log.Fatalf("can't dial on req socket: %v", err)
	}

	for i := 0; i < 5; i++ {
		msg := []byte(fmt.Sprintf("Request #%d", i))
		fmt.Printf("CLIENT: SENDING \"%s\"\n", string(msg))
		if err = sock.Send(msg); err != nil {
			log.Printf("can't send message: %v", err)
			continue
		}
		
		reply, err := sock.Recv()
		if err != nil {
			log.Printf("can't receive reply: %v", err)
			continue
		}
		fmt.Printf("CLIENT: RECEIVED \"%s\"\n", string(reply))
		time.Sleep(time.Second)
	}
}

func main() {
	url := "tcp://127.0.0.1:40899"
	go server(url)
	time.Sleep(time.Second) // 确保服务器先启动
	client(url)
}

2. 发布/订阅模式(PUB/SUB)

发布者(PUB):

func publisher(url string) {
	sock, err := pub.NewSocket()
	if err != nil {
		log.Fatalf("can't get new pub socket: %v", err)
	}
	if err = sock.Listen(url); err != nil {
		log.Fatalf("can't listen on pub socket: %v", err)
	}

	topics := []string{"sports", "news", "weather"}
	for i := 0; ; i++ {
		topic := topics[i%len(topics)]
		msg := []byte(fmt.Sprintf("%s update #%d", topic, i))
		fmt.Printf("PUBLISHER: SENDING \"%s\"\n", string(msg))
		if err = sock.Send(msg); err != nil {
			log.Printf("can't send message: %v", err)
		}
		time.Sleep(time.Second)
	}
}

订阅者(SUB):

func subscriber(url string, topic string) {
	sock, err := sub.NewSocket()
	if err != nil {
		log.Fatalf("can't get new sub socket: %v", err)
	}
	if err = sock.Dial(url); err != nil {
		log.Fatalf("can't dial on sub socket: %v", err)
	}
	
	// 设置订阅主题
	err = sock.SetOption(mangos.OptionSubscribe, []byte(topic))
	if err != nil {
		log.Fatalf("can't subscribe: %v", err)
	}

	for {
		msg, err := sock.Recv()
		if err != nil {
			log.Printf("can't receive: %v", err)
			continue
		}
		fmt.Printf("SUBSCRIBER(%s): RECEIVED \"%s\"\n", topic, string(msg))
	}
}

func main() {
	url := "tcp://127.0.0.1:40898"
	go publisher(url)
	
	time.Sleep(time.Second) // 确保发布者先启动
	go subscriber(url, "sports")
	go subscriber(url, "news")
	subscriber(url, "weather") // 最后一个在主goroutine运行
}

高级特性

1. 多路复用

func multiplexServer(url string) {
	sock, err := rep.NewSocket()
	if err != nil {
		log.Fatalf("can't get new rep socket: %v", err)
	}
	
	// 添加多个传输端点
	if err = sock.Listen(url); err != nil {
		log.Fatalf("can't listen on rep socket: %v", err)
	}
	if err = sock.Listen("ipc:///tmp/multiplex.ipc"); err != nil {
		log.Fatalf("can't listen on ipc socket: %v", err)
	}
	
	// 处理逻辑...
}

2. 设置选项

func setOptions() {
	sock, err := req.NewSocket()
	if err != nil {
		log.Fatal(err)
	}
	
	// 设置重试间隔
	err = sock.SetOption(mangos.OptionRetryTime, time.Second)
	if err != nil {
		log.Printf("can't set retry time: %v", err)
	}
	
	// 设置接收超时
	err = sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
	if err != nil {
		log.Printf("can't set recv deadline: %v", err)
	}
	
	// 设置发送缓冲区大小
	err = sock.SetOption(mangos.OptionWriteQLen, 100)
	if err != nil {
		log.Printf("can't set write queue length: %v", err)
	}
}

性能优化建议

  1. 批量处理消息:对于高吞吐量场景,考虑批量发送/接收消息
  2. 连接池:为REQ套接字实现连接池以避免频繁建立连接
  3. 异步处理:使用goroutine处理接收到的消息
  4. 缓冲区调优:根据负载调整读写缓冲区大小
  5. 协议选择:对于本地通信,考虑使用IPC而不是TCP

Mangos提供了灵活而强大的网络通信能力,通过合理使用各种模式和优化手段,可以构建出高性能的分布式系统。

回到顶部