golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuthZeroMQ的使用

Golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuth/ZeroMQ的使用

简介

sleuth是一个Go语言库,提供无主节点的点对点自动发现和同一网络上HTTP服务之间的RPC通信功能。它需要最少的配置,并提供一种机制来加入本地网络,既可以作为不提供任何服务的客户端,也可以作为任何支持HTTP的服务。主要使用场景是同一网络上的微服务相互调用。

安装

sleuth依赖于libzmq,可以从源代码或二进制文件安装。安装libzmq后,可以像安装其他Go库一样安装sleuth

go get -u github.com/ursiform/sleuth

示例代码

示例1:回声服务

这是一个简单的回声服务,它会将HTTP请求体中的内容原样返回:

package main

import (
  "io/ioutil"
  "net/http"

  "github.com/ursiform/sleuth"
)

type echoHandler struct{}

func (h *echoHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
  body, _ := ioutil.ReadAll(req.Body)
  res.Write(body)
}

func main() {
  handler := new(echoHandler)
  // 在实际应用中,应该设置sleuth.Config的Interface字段,
  // 以确保所有服务都在同一子网上
  config := &sleuth.Config{
    Handler: handler,
    LogLevel: "debug",
    Service: "echo-service",
  }
  server, err := sleuth.New(config)
  if err != nil {
    panic(err.Error())
  }
  defer server.Close()
  http.ListenAndServe(":9873", handler)
}

示例1:客户端代码

这是一个简单的客户端,它会等待连接到网络并找到echo-service服务,然后发送请求:

package main

import (
	"bytes"
	"fmt"
	"io/ioutil"
	"net/http"

	"github.com/ursiform/sleuth"
)

func main() {
	service := "echo-service"
	// 在实际应用中,应该设置sleuth.Config的Interface字段,
	// 以确保所有服务都在同一子网上
	config := &sleuth.Config{LogLevel: "debug"}
	client, err := sleuth.New(config)
	if err != nil {
		panic(err.Error())
	}
	defer client.Close()
	client.WaitFor(service)
	input := "This is the value I am inputting."
	body := bytes.NewBuffer([]byte(input))
	request, _ := http.NewRequest("POST", "sleuth://"+service+"/", body)
	response, err := client.Do(request)
	if err != nil {
		panic(err.Error())
	}
	output, _ := ioutil.ReadAll(response.Body)
	if string(output) == input {
		fmt.Println("It works.")
	} else {
		fmt.Println("It doesn't work.")
	}
}

工作原理

sleuth通过创建一个临时的Gyre网络(基于ZeroMQ)来实现服务发现和通信:

  1. 节点作为SLEUTH-v1组的成员加入Gyre网络
  2. 如果节点提供服务,它会通知网络
  3. 节点发现网络上的其他节点
  4. 提供服务时,sleuth会在单独的goroutine中监听传入请求
  5. 当向可用服务发出请求时,sleuth会将请求编组并发送到提供该服务的对等节点
  6. 离开网络时,应调用Close()方法通知网络

常见问题

如何处理多个相同服务的实例?

sleuth会自动将请求轮询发送给所有名称相同的服务。

服务离线会发生什么?

sleuth网络会在大约1秒内检测到对等节点消失,所有请求将路由到其他提供相同服务的节点。如果没有可用的服务,请求将返回错误(代码919)。

为什么不起作用?

检查防火墙是否允许UDP端口5670的流量,并确保设置了sleuth.ConfigInterface字段。

许可证

sleuth使用MIT许可证,它依赖的Gyrelibzmq使用LGPL许可证。


更多关于golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuthZeroMQ的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现无主节点P2P自动发现和HTTP服务RPC通信插件库sleuthZeroMQ的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang实现无主节点P2P自动发现和HTTP服务RPC通信

下面我将介绍如何使用Golang实现一个无主节点的P2P网络,包括自动发现节点和通过HTTP服务进行RPC通信。虽然你提到了ZeroMQ和sleuth,但我会提供一个更通用的实现方案,因为sleuth库已经较旧且维护不活跃。

1. P2P节点自动发现实现

我们可以使用Gossip协议或Kademlia DHT来实现P2P节点发现。这里展示一个基于UDP广播的简单发现机制:

package main

import (
	"context"
	"encoding/json"
	"log"
	"net"
	"time"
)

const (
	discoveryPort = 9999
	broadcastAddr = "255.255.255.255"
)

type Peer struct {
	ID       string
	Endpoint string // e.g. "192.168.1.100:8080"
}

type DiscoveryService struct {
	localPeer Peer
	peers     map[string]Peer
}

func NewDiscoveryService(localPeer Peer) *DiscoveryService {
	return &DiscoveryService{
		localPeer: localPeer,
		peers:     make(map[string]Peer),
	}
}

func (ds *DiscoveryService) Start(ctx context.Context) {
	go ds.listenForPeers(ctx)
	go ds.broadcastPresence(ctx)
}

func (ds *DiscoveryService) listenForPeers(ctx context.Context) {
	addr, err := net.ResolveUDPAddr("udp", ":"+discoveryPort)
	if err != nil {
		log.Fatal(err)
	}

	conn, err := net.ListenUDP("udp", addr)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	buf := make([]byte, 1024)
	
	for {
		select {
		case <-ctx.Done():
			return
		default:
			n, addr, err := conn.ReadFromUDP(buf)
			if err != nil {
				log.Println("Read error:", err)
				continue
			}

			var peer Peer
			if err := json.Unmarshal(buf[:n], &peer); err != nil {
				log.Println("Decode error:", err)
				continue
			}

			if peer.ID != ds.localPeer.ID {
				ds.peers[peer.ID] = peer
				log.Printf("Discovered peer: %s at %s", peer.ID, peer.Endpoint)
			}
		}
	}
}

func (ds *DiscoveryService) broadcastPresence(ctx context.Context) {
	addr, err := net.ResolveUDPAddr("udp", broadcastAddr+":"+discoveryPort)
	if err != nil {
		log.Fatal(err)
	}

	conn, err := net.DialUDP("udp", nil, addr)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			data, err := json.Marshal(ds.localPeer)
			if err != nil {
				log.Println("Marshal error:", err)
				continue
			}

			if _, err := conn.Write(data); err != nil {
				log.Println("Broadcast error:", err)
			}
		}
	}
}

2. HTTP RPC服务实现

我们可以使用标准库的net/httpencoding/json来实现简单的RPC通信:

package main

import (
	"encoding/json"
	"errors"
	"log"
	"net/http"
)

type RPCServer struct {
	methods map[string]func(params json.RawMessage) (interface{}, error)
}

func NewRPCServer() *RPCServer {
	return &RPCServer{
		methods: make(map[string]func(params json.RawMessage) (interface{}, error)),
	}
}

func (s *RPCServer) RegisterMethod(name string, handler func(params json.RawMessage) (interface{}, error)) {
	s.methods[name] = handler
}

func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	var request struct {
		Method string          `json:"method"`
		Params json.RawMessage `json:"params"`
		ID     interface{}     `json:"id"`
	}

	if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
		http.Error(w, "Invalid request", http.StatusBadRequest)
		return
	}

	handler, ok := s.methods[request.Method]
	if !ok {
		http.Error(w, "Method not found", http.StatusNotFound)
		return
	}

	result, err := handler(request.Params)
	response := struct {
		Result interface{} `json:"result,omitempty"`
		Error  string      `json:"error,omitempty"`
		ID     interface{} `json:"id"`
	}{
		ID: request.ID,
	}

	if err != nil {
		response.Error = err.Error()
	} else {
		response.Result = result
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(response)
}

3. 整合P2P和RPC

下面是如何将两者整合:

func main() {
	// 创建本地peer
	localPeer := Peer{
		ID:       generatePeerID(),
		Endpoint: getLocalIP() + ":8080",
	}

	// 初始化发现服务
	discovery := NewDiscoveryService(localPeer)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	discovery.Start(ctx)

	// 初始化RPC服务器
	rpcServer := NewRPCServer()
	rpcServer.RegisterMethod("echo", func(params json.RawMessage) (interface{}, error) {
		var message string
		if err := json.Unmarshal(params, &message); err != nil {
			return nil, errors.New("invalid params")
		}
		return message, nil
	})

	// 启动HTTP服务器
	go func() {
		log.Println("Starting RPC server on", localPeer.Endpoint)
		if err := http.ListenAndServe(localPeer.Endpoint, rpcServer); err != nil {
			log.Fatal("HTTP server error:", err)
		}
	}()

	// 主循环
	select {}
}

// 辅助函数
func generatePeerID() string {
	// 实际实现应该生成唯一的ID
	return "peer-" + time.Now().Format("20060102150405")
}

func getLocalIP() string {
	conn, err := net.Dial("udp", "8.8.8.8:80")
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	return conn.LocalAddr().(*net.UDPAddr).IP.String()
}

4. 替代方案:使用libp2p

如果你需要更成熟的P2P解决方案,可以考虑使用libp2p:

package main

import (
	"context"
	"crypto/rand"
	"log"

	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p-core/crypto"
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
)

type discoveryNotifee struct {
	host host.Host
}

func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
	log.Printf("Discovered new peer %s\n", pi.ID)
	err := n.host.Connect(context.Background(), pi)
	if err != nil {
		log.Printf("Error connecting to peer %s: %v\n", pi.ID, err)
	}
}

func main() {
	// 生成密钥对
	priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, rand.Reader)
	if err != nil {
		panic(err)
	}

	// 创建libp2p主机
	host, err := libp2p.New(
		libp2p.Identity(priv),
		libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
	)
	if err != nil {
		panic(err)
	}

	// 设置mDNS服务发现
	service := mdns.NewMdnsService(host, "my-p2p-network", &discoveryNotifee{host: host})
	if err := service.Start(); err != nil {
		panic(err)
	}

	log.Printf("Host ID: %s", host.ID())
	log.Printf("Connect to me on:")
	for _, addr := range host.Addrs() {
		log.Printf("  %s/p2p/%s", addr, host.ID())
	}

	select {}
}

总结

  1. 对于简单的P2P网络,可以使用UDP广播实现节点发现
  2. HTTP + JSON-RPC是简单有效的节点间通信方式
  3. 对于生产环境,建议使用成熟的P2P库如libp2p
  4. 无主节点架构需要处理好节点加入/离开、消息广播/路由等问题

以上代码提供了基础框架,实际应用中还需要添加错误处理、安全认证、消息序列化等更多功能。

回到顶部