golang高效网络通信支持双向调用通知广播插件库arpc的使用
Golang高效网络通信支持双向调用通知广播插件库arpc的使用
ARPC是一个高效的Golang网络通信库,支持双向调用和通知广播功能。下面我将详细介绍其使用方法和示例代码。
特性
- 双向调用(客户端到服务端,服务端到客户端)
- 双向通知
- 同步和异步调用
- 同步和异步响应
- 批量写入(Writev/net.Buffers)
- 广播功能
- 中间件支持
- 发布/订阅模式
- OpenTracing支持
安装
- 获取并安装arpc:
$ go get -u github.com/lesismal/arpc
- 在代码中导入:
import "github.com/lesismal/arpc"
快速开始
服务端示例
package main
import "github.com/lesismal/arpc"
func main() {
server := arpc.NewServer()
// 注册路由
server.Handler.Handle("/echo", func(ctx *arpc.Context) {
str := ""
if err := ctx.Bind(&str); err == nil {
ctx.Write(str)
}
})
server.Run("localhost:8888")
}
客户端示例
package main
import (
"log"
"net"
"time"
"github.com/lesismal/arpc"
)
func main() {
client, err := arpc.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
})
if err != nil {
panic(err)
}
defer client.Stop()
req := "hello"
rsp := ""
err = client.Call("/echo", &req, &rsp, time.Second*5)
if err != nil {
log.Fatalf("Call failed: %v", err)
} else {
log.Printf("Call Response: \"%v\"", rsp)
}
}
API示例
注册路由
var handler arpc.Handler
// 包级别
handler = arpc.DefaultHandler
// 服务端
handler = server.Handler
// 客户端
handler = client.Handler
// 消息默认在同一个连接读取goroutine中顺序处理
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })
// 使消息由新的goroutine处理
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)
路由中间件
import "github.com/lesismal/arpc/extension/middleware/router"
var handler arpc.Handler
handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })
客户端调用方式
- 同步调用(阻塞,带超时/上下文)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// 或使用上下文
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
- 异步调用(非阻塞,带回调和超时/上下文)
request := &Echo{...}
timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
response := &Echo{}
ctx.Bind(response)
...
}, timeout)
- 通知(与CallAsync类似,但没有回调)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// 或使用上下文
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)
广播通知
var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})
func broadcast() {
var svr *arpc.Server = ...
msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
mux.RLock()
for client := range clientMap {
client.PushMsg(msg, arpc.TimeZero)
}
mux.RUnlock()
}
发布/订阅示例
服务端
import "github.com/lesismal/arpc/extension/pubsub"
var (
address = "localhost:8888"
password = "123qwe"
topicName = "Broadcast"
)
func main() {
s := pubsub.NewServer()
s.Password = password
// 服务端发布消息给所有客户端
go func() {
for i := 0; true; i++ {
time.Sleep(time.Second)
s.Publish(topicName, fmt.Sprintf("message from server %v", i))
}
}()
s.Run(address)
}
订阅客户端
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpc/extension/pubsub"
func onTopic(topic *pubsub.Topic) {
log.Info("[OnTopic] [%v] \"%v\", [%v]",
topic.Name,
string(topic.Data),
time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}
func main() {
client, err := pubsub.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", address, time.Second*3)
})
if err != nil {
panic(err)
}
client.Password = password
// 认证
err = client.Authenticate()
if err != nil {
panic(err)
}
// 订阅主题
if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
panic(err)
}
<-make(chan int)
}
发布客户端
import "github.com/lesismal/arpc/extension/pubsub"
func main() {
client, err := pubsub.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", address, time.Second*3)
})
if err != nil {
panic(err)
}
client.Password = password
// 认证
err = client.Authenticate()
if err != nil {
panic(err)
}
for i := 0; true; i++ {
if i%5 == 0 {
// 发布消息给所有客户端
client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
} else {
// 发布消息给单个客户端
client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
}
time.Sleep(time.Second)
}
}
更多示例
更多使用示例可以参考官方示例代码库。
ARPC提供了丰富的功能和灵活的API,可以满足各种网络通信需求,特别是需要双向通信和广播通知的场景。
更多关于golang高效网络通信支持双向调用通知广播插件库arpc的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高效网络通信支持双向调用通知广播插件库arpc的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
arpc - Golang高效网络通信与双向调用插件库
arpc是一个高性能的Go语言RPC框架,支持双向调用、通知和广播功能。下面我将详细介绍arpc的核心特性及使用方法。
核心特性
- 高性能:基于TCP协议,优化了网络传输效率
- 双向调用:客户端和服务器可以相互调用方法
- 通知机制:支持单向通知,无需等待响应
- 广播功能:可以向所有连接的客户端广播消息
- 插件化设计:易于扩展功能
基本使用示例
1. 安装
go get github.com/lesismal/arpc
2. 服务端实现
package main
import (
"context"
"log"
"github.com/lesismal/arpc"
)
func main() {
// 创建服务器
server := arpc.NewServer()
// 注册处理函数
server.Handler.Handle("echo", func(ctx *arpc.Context) {
// 获取客户端发送的数据
var msg string
ctx.Bind(&msg)
// 返回响应
ctx.Write(msg)
})
// 启动服务器
if err := server.Run(":8888"); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
3. 客户端实现
package main
import (
"context"
"log"
"time"
"github.com/lesismal/arpc"
)
func main() {
// 创建客户端
client, err := arpc.NewClient(func() (arpc.Conn, error) {
return arpc.Dial("localhost:8888")
})
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
defer client.Stop()
// 调用服务端方法
var reply string
err = client.Call("echo", "hello arpc", &reply, time.Second*5)
if err != nil {
log.Printf("call failed: %v", err)
return
}
log.Printf("Server reply: %s", reply)
}
高级功能
1. 双向调用
服务端也可以调用客户端注册的方法:
// 服务端调用客户端方法
client.Notify("client_method", "data_from_server", arpc.TimeZero)
// 客户端注册方法
client.Handler.Handle("client_method", func(ctx *arpc.Context) {
var data string
ctx.Bind(&data)
log.Printf("Received from server: %s", data)
})
2. 广播功能
// 获取所有客户端连接
clients := server.GetAllClients()
// 向所有客户端广播消息
for _, c := range clients {
c.Notify("broadcast", "message_to_all", arpc.TimeZero)
}
3. 异步调用
// 异步调用
call := client.AsyncCall("echo", "hello", func(reply string, err error) {
if err != nil {
log.Printf("async call error: %v", err)
return
}
log.Printf("async reply: %s", reply)
})
// 可以取消异步调用
// call.Cancel()
性能优化建议
- 连接池:复用客户端连接
- 编解码优化:使用高效的序列化协议如protobuf
- 批量处理:合并小消息为批量请求
- 超时控制:合理设置调用超时时间
完整示例:聊天服务
// 聊天服务器
func chatServer() {
server := arpc.NewServer()
// 存储所有客户端
clients := make(map[*arpc.Client]struct{})
var mu sync.Mutex
server.Handler.Handle("join", func(ctx *arpc.Context) {
mu.Lock()
clients[ctx.Client] = struct{}{}
mu.Unlock()
ctx.Write("welcome!")
})
server.Handler.Handle("chat", func(ctx *arpc.Context) {
var msg string
ctx.Bind(&msg)
// 广播消息给所有客户端
mu.Lock()
for client := range clients {
client.Notify("new_msg", msg, arpc.TimeZero)
}
mu.Unlock()
})
server.Run(":8888")
}
// 聊天客户端
func chatClient(name string) {
client, _ := arpc.NewClient(func() (arpc.Conn, error) {
return arpc.Dial("localhost:8888")
})
// 注册消息处理器
client.Handler.Handle("new_msg", func(ctx *arpc.Context) {
var msg string
ctx.Bind(&msg)
fmt.Printf("[%s] %s\n", name, msg)
})
// 加入聊天室
client.Call("join", nil, nil, time.Second)
// 模拟发送消息
for i := 0; i < 3; i++ {
client.Notify("chat", fmt.Sprintf("%s: hello %d", name, i), arpc.TimeZero)
time.Sleep(time.Second)
}
client.Stop()
}
arpc框架非常适合需要高效网络通信和复杂交互模式的场景,如游戏服务器、实时聊天系统、分布式计算等。其简洁的API设计和强大的功能使其成为Go语言RPC框架中的优秀选择。