golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用
golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用
golongpoll是一个Golang长轮询库,通过HTTP长轮询服务器和客户端实现简单的Web发布订阅功能。
快速开始
创建一个长轮询服务器:
import (
"github.com/jcuga/golongpoll"
)
// 使用默认/空选项。参见自定义选项部分和Options文档
manager, err := golongpoll.StartLongpoll(golongpoll.Options{})
// 暴露发布订阅接口。如果不需要客户端发布,可以省略PublishHandler
if err == nil {
http.HandleFunc("/events", manager.SubscriptionHandler)
http.HandleFunc("/publish", manager.PublishHandler)
http.ListenAndServe("127.0.0.1:8101", nil)
} else {
// 处理创建长轮询管理器时的错误-通常是选项设置错误
}
上面的代码片段创建了一个LongpollManager
,它有一个SubscriptionHandler
和一个PublishHandler
,可以通过http(或https)提供服务。创建时,管理器会启动一个单独的goroutine来处理发布订阅的内部逻辑。
工作原理
长轮询管理器可以看作是一个使用通道处理发布订阅请求的goroutine。管理器有一个按类别存储事件的map[string]eventBuffer
(实际上是map[string]*expiringBuffer
),以及用于订阅请求簿记的数据结构。PublishHandler
和SubscribeHandler
通过通道与管理器goroutine交互。
事件存储在内存缓冲区中,每个类别有配置的最大事件数。可选地,可以基于生存时间设置自动删除事件。由于所有内容都在内存中,有一个可选的附加组件用于自动持久化和从磁盘重新填充数据。
自定义选项
可以通过golongpoll.Options
配置长轮询管理器,包括:
MaxEventBufferSize
- 每个类别的最大事件数,超过后按先进先出截断。默认为250EventTimeToLiveSeconds
- 事件在缓冲区中存在的时间,默认为永久(只要不超过MaxEventBufferSize
)AddOn
- 提供自定义行为的可选方式
在Gin框架中使用
要在Gin HTTP框架应用中添加长轮询,只需用gin.Context
包装golongpoll的管理器发布/订阅函数:
package main
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/jcuga/golongpoll"
)
func main() {
// 使用默认选项创建长轮询管理器
manager, err := golongpoll.StartLongpoll(golongpoll.Options{})
if err != nil {
panic(err)
}
router := gin.Default()
router.POST("/pub", wrapWithContext(manager.PublishHandler))
router.GET("/sub", wrapWithContext(manager.SubscriptionHandler))
router.Run(":8001")
}
func wrapWithContext(lpHandler func(http.ResponseWriter, *http.Request)) func(*gin.Context) {
return func(c *gin.Context) {
lpHandler(c.Writer, c.Request)
}
}
完整示例
下面是一个完整的发布订阅示例:
package main
import (
"fmt"
"net/http"
"time"
"github.com/jcuga/golongpoll"
)
func main() {
// 创建长轮询管理器
manager, err := golongpoll.StartLongpoll(golongpoll.Options{
MaxEventBufferSize: 100, // 每个类别最多100个事件
EventTimeToLiveSeconds: 60, // 事件存活60秒
})
if err != nil {
fmt.Println("Error creating longpoll manager:", err)
return
}
// 设置HTTP处理程序
http.HandleFunc("/events", manager.SubscriptionHandler)
http.HandleFunc("/publish", manager.PublishHandler)
// 启动一个goroutine定期发布事件
go func() {
for {
// 发布到"time"类别
manager.Publish("time", fmt.Sprintf("Current time: %v", time.Now()))
time.Sleep(5 * time.Second)
}
}()
// 启动HTTP服务器
fmt.Println("Server starting on :8080...")
http.ListenAndServe(":8080", nil)
}
这个示例创建了一个长轮询服务器,每5秒发布一次当前时间到"time"类别,客户端可以通过订阅"/events"端点来接收这些事件。
更多关于golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用golongpoll实现HTTP长轮询发布订阅服务
golongpoll是一个简单高效的Go语言库,用于实现HTTP长轮询模式的发布订阅服务。下面我将详细介绍如何使用这个库。
安装
首先安装golongpoll库:
go get github.com/jcuga/golongpoll
基本使用
1. 创建长轮询管理器
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/jcuga/golongpoll"
)
func main() {
// 创建长轮询管理器
manager, err := golongpoll.StartLongpoll(golongpoll.Options{
LoggingEnabled: true, // 启用日志
// 消息保留1小时
EventTimeToLiveSeconds: 60 * 60,
// 长轮询请求最多等待90秒
MaxLongpollTimeoutSeconds: 90,
})
if err != nil {
log.Fatalf("Failed to create longpoll manager: %v", err)
}
defer manager.Shutdown() // 程序退出时关闭管理器
// 设置HTTP路由
http.HandleFunc("/events", manager.SubscriptionHandler)
http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
// 这里处理发布逻辑
})
// 启动HTTP服务器
fmt.Println("Server starting on :8080...")
log.Fatal(http.ListenAndServe(":8080", nil))
}
2. 发布消息
// 发布消息到特定分类
func publishHandler(w http.ResponseWriter, r *http.Request, manager *golongpoll.LongpollManager) {
category := r.URL.Query().Get("category")
message := r.URL.Query().Get("message")
if category == "" || message == "" {
http.Error(w, "Missing category or message", http.StatusBadRequest)
return
}
// 发布消息
manager.Publish(category, message)
fmt.Fprintf(w, "Message published to category '%s'", category)
}
3. 订阅消息
客户端可以通过向/events
端点发送GET请求来订阅消息:
// 前端JavaScript示例
function subscribe(category) {
fetch(`/events?category=${category}&timeout=30`)
.then(response => response.json())
.then(events => {
// 处理接收到的事件
events.forEach(event => {
console.log("Received:", event.data);
});
// 继续订阅
subscribe(category);
})
.catch(error => {
console.error("Subscription error:", error);
// 出错后重试
setTimeout(() => subscribe(category), 1000);
});
}
// 开始订阅
subscribe("news");
高级功能
1. 多分类订阅
// 客户端可以订阅多个分类
http.HandleFunc("/multi-events", func(w http.ResponseWriter, r *http.Request) {
categories := r.URL.Query()["category"]
if len(categories) == 0 {
http.Error(w, "No categories specified", http.StatusBadRequest)
return
}
// 使用manager.SubscriptionHandlerWithCategories处理多分类订阅
manager.SubscriptionHandlerWithCategories(w, r, categories)
})
2. 自定义事件数据
// 可以发布结构化的自定义数据
type CustomEvent struct {
UserID int `json:"user_id"`
Message string `json:"message"`
Time int64 `json:"timestamp"`
}
// 发布自定义事件
event := CustomEvent{
UserID: 123,
Message: "Hello world",
Time: time.Now().Unix(),
}
manager.Publish("user_updates", event)
3. 安全性考虑
// 添加认证中间件
func authMiddleware(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("Authorization")
if token != "valid-token" {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
next(w, r)
}
}
// 保护订阅端点
http.HandleFunc("/secure-events", authMiddleware(manager.SubscriptionHandler))
性能优化建议
-
调整超时时间:根据业务需求调整
MaxLongpollTimeoutSeconds
,通常30-90秒是合理范围。 -
分类粒度:合理设计分类系统,避免过于宽泛或过于精细的分类。
-
消息TTL:设置合适的
EventTimeToLiveSeconds
,避免内存无限增长。 -
连接限制:在生产环境中,应该限制单个客户端的连接数。
完整示例
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/jcuga/golongpoll"
)
func main() {
// 初始化长轮询管理器
manager, err := golongpoll.StartLongpoll(golongpoll.Options{
LoggingEnabled: true,
EventTimeToLiveSeconds: 60 * 60,
MaxLongpollTimeoutSeconds: 90,
})
if err != nil {
log.Fatal(err)
}
defer manager.Shutdown()
// 设置路由
http.HandleFunc("/events", manager.SubscriptionHandler)
http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
category := r.URL.Query().Get("category")
message := r.URL.Query().Get("message")
if category == "" || message == "" {
http.Error(w, "Missing category or message", http.StatusBadRequest)
return
}
manager.Publish(category, message)
fmt.Fprintf(w, "Published to %s: %s", category, message)
})
// 模拟后台发布
go func() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
manager.Publish("time_updates", fmt.Sprintf("Current time: %v", time.Now()))
}
}()
// 启动服务器
log.Println("Starting server on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
golongpoll库提供了一种简单高效的方式来实现HTTP长轮询服务,适用于需要实时通知但又不适合WebSocket的场景。它的API设计简洁,性能良好,是构建实时应用的优秀选择。