golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuthZeroMQ的使用
Golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuth/ZeroMQ的使用
简介
sleuth
是一个Go语言库,提供无主节点的点对点自动发现和同一网络上HTTP服务之间的RPC通信功能。它需要最少的配置,并提供一种机制来加入本地网络,既可以作为不提供任何服务的客户端,也可以作为任何支持HTTP的服务。主要使用场景是同一网络上的微服务相互调用。
安装
sleuth
依赖于libzmq
,可以从源代码或二进制文件安装。安装libzmq
后,可以像安装其他Go库一样安装sleuth
:
go get -u github.com/ursiform/sleuth
示例代码
示例1:回声服务
这是一个简单的回声服务,它会将HTTP请求体中的内容原样返回:
package main
import (
"io/ioutil"
"net/http"
"github.com/ursiform/sleuth"
)
type echoHandler struct{}
func (h *echoHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
body, _ := ioutil.ReadAll(req.Body)
res.Write(body)
}
func main() {
handler := new(echoHandler)
// 在实际应用中,应该设置sleuth.Config的Interface字段,
// 以确保所有服务都在同一子网上
config := &sleuth.Config{
Handler: handler,
LogLevel: "debug",
Service: "echo-service",
}
server, err := sleuth.New(config)
if err != nil {
panic(err.Error())
}
defer server.Close()
http.ListenAndServe(":9873", handler)
}
示例1:客户端代码
这是一个简单的客户端,它会等待连接到网络并找到echo-service
服务,然后发送请求:
package main
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"github.com/ursiform/sleuth"
)
func main() {
service := "echo-service"
// 在实际应用中,应该设置sleuth.Config的Interface字段,
// 以确保所有服务都在同一子网上
config := &sleuth.Config{LogLevel: "debug"}
client, err := sleuth.New(config)
if err != nil {
panic(err.Error())
}
defer client.Close()
client.WaitFor(service)
input := "This is the value I am inputting."
body := bytes.NewBuffer([]byte(input))
request, _ := http.NewRequest("POST", "sleuth://"+service+"/", body)
response, err := client.Do(request)
if err != nil {
panic(err.Error())
}
output, _ := ioutil.ReadAll(response.Body)
if string(output) == input {
fmt.Println("It works.")
} else {
fmt.Println("It doesn't work.")
}
}
工作原理
sleuth
通过创建一个临时的Gyre
网络(基于ZeroMQ)来实现服务发现和通信:
- 节点作为
SLEUTH-v1
组的成员加入Gyre
网络 - 如果节点提供服务,它会通知网络
- 节点发现网络上的其他节点
- 提供服务时,
sleuth
会在单独的goroutine中监听传入请求 - 当向可用服务发出请求时,
sleuth
会将请求编组并发送到提供该服务的对等节点 - 离开网络时,应调用
Close()
方法通知网络
常见问题
如何处理多个相同服务的实例?
sleuth
会自动将请求轮询发送给所有名称相同的服务。
服务离线会发生什么?
sleuth
网络会在大约1秒内检测到对等节点消失,所有请求将路由到其他提供相同服务的节点。如果没有可用的服务,请求将返回错误(代码919)。
为什么不起作用?
检查防火墙是否允许UDP端口5670的流量,并确保设置了sleuth.Config
的Interface
字段。
许可证
sleuth
使用MIT许可证,它依赖的Gyre
和libzmq
使用LGPL许可证。
更多关于golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuthZeroMQ的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuthZeroMQ的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang实现无主节点P2P自动发现和HTTP服务RPC通信
下面我将介绍如何使用Golang实现一个无主节点的P2P网络,包括自动发现节点和通过HTTP服务进行RPC通信。虽然你提到了ZeroMQ和sleuth,但我会提供一个更通用的实现方案,因为sleuth库已经较旧且维护不活跃。
1. P2P节点自动发现实现
我们可以使用Gossip协议或Kademlia DHT来实现P2P节点发现。这里展示一个基于UDP广播的简单发现机制:
package main
import (
"context"
"encoding/json"
"log"
"net"
"time"
)
const (
discoveryPort = 9999
broadcastAddr = "255.255.255.255"
)
type Peer struct {
ID string
Endpoint string // e.g. "192.168.1.100:8080"
}
type DiscoveryService struct {
localPeer Peer
peers map[string]Peer
}
func NewDiscoveryService(localPeer Peer) *DiscoveryService {
return &DiscoveryService{
localPeer: localPeer,
peers: make(map[string]Peer),
}
}
func (ds *DiscoveryService) Start(ctx context.Context) {
go ds.listenForPeers(ctx)
go ds.broadcastPresence(ctx)
}
func (ds *DiscoveryService) listenForPeers(ctx context.Context) {
addr, err := net.ResolveUDPAddr("udp", ":"+discoveryPort)
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
buf := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return
default:
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println("Read error:", err)
continue
}
var peer Peer
if err := json.Unmarshal(buf[:n], &peer); err != nil {
log.Println("Decode error:", err)
continue
}
if peer.ID != ds.localPeer.ID {
ds.peers[peer.ID] = peer
log.Printf("Discovered peer: %s at %s", peer.ID, peer.Endpoint)
}
}
}
}
func (ds *DiscoveryService) broadcastPresence(ctx context.Context) {
addr, err := net.ResolveUDPAddr("udp", broadcastAddr+":"+discoveryPort)
if err != nil {
log.Fatal(err)
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
data, err := json.Marshal(ds.localPeer)
if err != nil {
log.Println("Marshal error:", err)
continue
}
if _, err := conn.Write(data); err != nil {
log.Println("Broadcast error:", err)
}
}
}
}
2. HTTP RPC服务实现
我们可以使用标准库的net/http
和encoding/json
来实现简单的RPC通信:
package main
import (
"encoding/json"
"errors"
"log"
"net/http"
)
type RPCServer struct {
methods map[string]func(params json.RawMessage) (interface{}, error)
}
func NewRPCServer() *RPCServer {
return &RPCServer{
methods: make(map[string]func(params json.RawMessage) (interface{}, error)),
}
}
func (s *RPCServer) RegisterMethod(name string, handler func(params json.RawMessage) (interface{}, error)) {
s.methods[name] = handler
}
func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var request struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}
handler, ok := s.methods[request.Method]
if !ok {
http.Error(w, "Method not found", http.StatusNotFound)
return
}
result, err := handler(request.Params)
response := struct {
Result interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
ID interface{} `json:"id"`
}{
ID: request.ID,
}
if err != nil {
response.Error = err.Error()
} else {
response.Result = result
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
3. 整合P2P和RPC
下面是如何将两者整合:
func main() {
// 创建本地peer
localPeer := Peer{
ID: generatePeerID(),
Endpoint: getLocalIP() + ":8080",
}
// 初始化发现服务
discovery := NewDiscoveryService(localPeer)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discovery.Start(ctx)
// 初始化RPC服务器
rpcServer := NewRPCServer()
rpcServer.RegisterMethod("echo", func(params json.RawMessage) (interface{}, error) {
var message string
if err := json.Unmarshal(params, &message); err != nil {
return nil, errors.New("invalid params")
}
return message, nil
})
// 启动HTTP服务器
go func() {
log.Println("Starting RPC server on", localPeer.Endpoint)
if err := http.ListenAndServe(localPeer.Endpoint, rpcServer); err != nil {
log.Fatal("HTTP server error:", err)
}
}()
// 主循环
select {}
}
// 辅助函数
func generatePeerID() string {
// 实际实现应该生成唯一的ID
return "peer-" + time.Now().Format("20060102150405")
}
func getLocalIP() string {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
return conn.LocalAddr().(*net.UDPAddr).IP.String()
}
4. 替代方案:使用libp2p
如果你需要更成熟的P2P解决方案,可以考虑使用libp2p:
package main
import (
"context"
"crypto/rand"
"log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)
type discoveryNotifee struct {
host host.Host
}
func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
log.Printf("Discovered new peer %s\n", pi.ID)
err := n.host.Connect(context.Background(), pi)
if err != nil {
log.Printf("Error connecting to peer %s: %v\n", pi.ID, err)
}
}
func main() {
// 生成密钥对
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, rand.Reader)
if err != nil {
panic(err)
}
// 创建libp2p主机
host, err := libp2p.New(
libp2p.Identity(priv),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
panic(err)
}
// 设置mDNS服务发现
service := mdns.NewMdnsService(host, "my-p2p-network", &discoveryNotifee{host: host})
if err := service.Start(); err != nil {
panic(err)
}
log.Printf("Host ID: %s", host.ID())
log.Printf("Connect to me on:")
for _, addr := range host.Addrs() {
log.Printf(" %s/p2p/%s", addr, host.ID())
}
select {}
}
总结
- 对于简单的P2P网络,可以使用UDP广播实现节点发现
- HTTP + JSON-RPC是简单有效的节点间通信方式
- 对于生产环境,建议使用成熟的P2P库如libp2p
- 无主节点架构需要处理好节点加入/离开、消息广播/路由等问题
以上代码提供了基础框架,实际应用中还需要添加错误处理、安全认证、消息序列化等更多功能。