golang高效网络通信支持双向调用通知广播插件库arpc的使用

Golang高效网络通信支持双向调用通知广播插件库arpc的使用

ARPC是一个高效的Golang网络通信库,支持双向调用和通知广播功能。下面我将详细介绍其使用方法和示例代码。

特性

  • 双向调用(客户端到服务端,服务端到客户端)
  • 双向通知
  • 同步和异步调用
  • 同步和异步响应
  • 批量写入(Writev/net.Buffers)
  • 广播功能
  • 中间件支持
  • 发布/订阅模式
  • OpenTracing支持

安装

  1. 获取并安装arpc:
$ go get -u github.com/lesismal/arpc
  1. 在代码中导入:
import "github.com/lesismal/arpc"

快速开始

服务端示例

package main

import "github.com/lesismal/arpc"

func main() {
    server := arpc.NewServer()

    // 注册路由
    server.Handler.Handle("/echo", func(ctx *arpc.Context) {
        str := ""
        if err := ctx.Bind(&str); err == nil {
            ctx.Write(str)
        }
    })

    server.Run("localhost:8888")
}

客户端示例

package main

import (
    "log"
    "net"
    "time"
    "github.com/lesismal/arpc"
)

func main() {
    client, err := arpc.NewClient(func() (net.Conn, error) {
        return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
    })
    if err != nil {
        panic(err)
    }
    defer client.Stop()

    req := "hello"
    rsp := ""
    err = client.Call("/echo", &req, &rsp, time.Second*5)
    if err != nil {
        log.Fatalf("Call failed: %v", err)
    } else {
        log.Printf("Call Response: \"%v\"", rsp)
    }
}

API示例

注册路由

var handler arpc.Handler

// 包级别
handler = arpc.DefaultHandler
// 服务端
handler = server.Handler
// 客户端
handler = client.Handler

// 消息默认在同一个连接读取goroutine中顺序处理
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })

// 使消息由新的goroutine处理
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)

路由中间件

import "github.com/lesismal/arpc/extension/middleware/router"

var handler arpc.Handler

handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })

客户端调用方式

  1. 同步调用(阻塞,带超时/上下文)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// 或使用上下文
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. 异步调用(非阻塞,带回调和超时/上下文)
request := &Echo{...}

timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
    response := &Echo{}
    ctx.Bind(response)
    ...    
}, timeout)
  1. 通知(与CallAsync类似,但没有回调)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// 或使用上下文
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)

广播通知

var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() {
    var svr *arpc.Server = ... 
    msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
    mux.RLock()
    for client := range clientMap {
        client.PushMsg(msg, arpc.TimeZero)
    }
    mux.RUnlock()
}

发布/订阅示例

服务端

import "github.com/lesismal/arpc/extension/pubsub"

var (
    address = "localhost:8888"
    password = "123qwe"
    topicName = "Broadcast"
)

func main() {
    s := pubsub.NewServer()
    s.Password = password

    // 服务端发布消息给所有客户端
    go func() {
        for i := 0; true; i++ {
            time.Sleep(time.Second)
            s.Publish(topicName, fmt.Sprintf("message from server %v", i))
        }
    }()

    s.Run(address)
}

订阅客户端

import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpc/extension/pubsub"

func onTopic(topic *pubsub.Topic) {
    log.Info("[OnTopic] [%v] \"%v\", [%v]",
        topic.Name,
        string(topic.Data),
        time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}

func main() {
    client, err := pubsub.NewClient(func() (net.Conn, error) {
        return net.DialTimeout("tcp", address, time.Second*3)
    })
    if err != nil {
        panic(err)
    }
    client.Password = password

    // 认证
    err = client.Authenticate()
    if err != nil {
        panic(err)
    }

    // 订阅主题
    if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
        panic(err)
    }

    <-make(chan int)
}

发布客户端

import "github.com/lesismal/arpc/extension/pubsub"

func main() {
    client, err := pubsub.NewClient(func() (net.Conn, error) {
        return net.DialTimeout("tcp", address, time.Second*3)
    })
    if err != nil {
        panic(err)
    }
    client.Password = password

    // 认证
    err = client.Authenticate()
    if err != nil {
        panic(err)
    }

    for i := 0; true; i++ {
        if i%5 == 0 {
            // 发布消息给所有客户端
            client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
        } else {
            // 发布消息给单个客户端
            client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
        }
        time.Sleep(time.Second)
    }
}

更多示例

更多使用示例可以参考官方示例代码库。

ARPC提供了丰富的功能和灵活的API,可以满足各种网络通信需求,特别是需要双向通信和广播通知的场景。


更多关于golang高效网络通信支持双向调用通知广播插件库arpc的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效网络通信支持双向调用通知广播插件库arpc的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


arpc - Golang高效网络通信与双向调用插件库

arpc是一个高性能的Go语言RPC框架,支持双向调用、通知和广播功能。下面我将详细介绍arpc的核心特性及使用方法。

核心特性

  1. 高性能:基于TCP协议,优化了网络传输效率
  2. 双向调用:客户端和服务器可以相互调用方法
  3. 通知机制:支持单向通知,无需等待响应
  4. 广播功能:可以向所有连接的客户端广播消息
  5. 插件化设计:易于扩展功能

基本使用示例

1. 安装

go get github.com/lesismal/arpc

2. 服务端实现

package main

import (
	"context"
	"log"

	"github.com/lesismal/arpc"
)

func main() {
	// 创建服务器
	server := arpc.NewServer()

	// 注册处理函数
	server.Handler.Handle("echo", func(ctx *arpc.Context) {
		// 获取客户端发送的数据
		var msg string
		ctx.Bind(&msg)
		
		// 返回响应
		ctx.Write(msg)
	})

	// 启动服务器
	if err := server.Run(":8888"); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

3. 客户端实现

package main

import (
	"context"
	"log"
	"time"

	"github.com/lesismal/arpc"
)

func main() {
	// 创建客户端
	client, err := arpc.NewClient(func() (arpc.Conn, error) {
		return arpc.Dial("localhost:8888")
	})
	if err != nil {
		log.Fatalf("failed to create client: %v", err)
	}
	defer client.Stop()

	// 调用服务端方法
	var reply string
	err = client.Call("echo", "hello arpc", &reply, time.Second*5)
	if err != nil {
		log.Printf("call failed: %v", err)
		return
	}
	
	log.Printf("Server reply: %s", reply)
}

高级功能

1. 双向调用

服务端也可以调用客户端注册的方法:

// 服务端调用客户端方法
client.Notify("client_method", "data_from_server", arpc.TimeZero)

// 客户端注册方法
client.Handler.Handle("client_method", func(ctx *arpc.Context) {
	var data string
	ctx.Bind(&data)
	log.Printf("Received from server: %s", data)
})

2. 广播功能

// 获取所有客户端连接
clients := server.GetAllClients()

// 向所有客户端广播消息
for _, c := range clients {
	c.Notify("broadcast", "message_to_all", arpc.TimeZero)
}

3. 异步调用

// 异步调用
call := client.AsyncCall("echo", "hello", func(reply string, err error) {
	if err != nil {
		log.Printf("async call error: %v", err)
		return
	}
	log.Printf("async reply: %s", reply)
})

// 可以取消异步调用
// call.Cancel()

性能优化建议

  1. 连接池:复用客户端连接
  2. 编解码优化:使用高效的序列化协议如protobuf
  3. 批量处理:合并小消息为批量请求
  4. 超时控制:合理设置调用超时时间

完整示例:聊天服务

// 聊天服务器
func chatServer() {
	server := arpc.NewServer()
	
	// 存储所有客户端
	clients := make(map[*arpc.Client]struct{})
	var mu sync.Mutex
	
	server.Handler.Handle("join", func(ctx *arpc.Context) {
		mu.Lock()
		clients[ctx.Client] = struct{}{}
		mu.Unlock()
		ctx.Write("welcome!")
	})
	
	server.Handler.Handle("chat", func(ctx *arpc.Context) {
		var msg string
		ctx.Bind(&msg)
		
		// 广播消息给所有客户端
		mu.Lock()
		for client := range clients {
			client.Notify("new_msg", msg, arpc.TimeZero)
		}
		mu.Unlock()
	})
	
	server.Run(":8888")
}

// 聊天客户端
func chatClient(name string) {
	client, _ := arpc.NewClient(func() (arpc.Conn, error) {
		return arpc.Dial("localhost:8888")
	})
	
	// 注册消息处理器
	client.Handler.Handle("new_msg", func(ctx *arpc.Context) {
		var msg string
		ctx.Bind(&msg)
		fmt.Printf("[%s] %s\n", name, msg)
	})
	
	// 加入聊天室
	client.Call("join", nil, nil, time.Second)
	
	// 模拟发送消息
	for i := 0; i < 3; i++ {
		client.Notify("chat", fmt.Sprintf("%s: hello %d", name, i), arpc.TimeZero)
		time.Sleep(time.Second)
	}
	
	client.Stop()
}

arpc框架非常适合需要高效网络通信和复杂交互模式的场景,如游戏服务器、实时聊天系统、分布式计算等。其简洁的API设计和强大的功能使其成为Go语言RPC框架中的优秀选择。

回到顶部