golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用

golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用

golongpoll是一个Golang长轮询库,通过HTTP长轮询服务器和客户端实现简单的Web发布订阅功能。

快速开始

创建一个长轮询服务器:

import (
  "github.com/jcuga/golongpoll"
)

// 使用默认/空选项。参见自定义选项部分和Options文档
manager, err := golongpoll.StartLongpoll(golongpoll.Options{})

// 暴露发布订阅接口。如果不需要客户端发布,可以省略PublishHandler
if err == nil {
  http.HandleFunc("/events", manager.SubscriptionHandler)
  http.HandleFunc("/publish", manager.PublishHandler)
  http.ListenAndServe("127.0.0.1:8101", nil)
} else {
  // 处理创建长轮询管理器时的错误-通常是选项设置错误
}

上面的代码片段创建了一个LongpollManager,它有一个SubscriptionHandler和一个PublishHandler,可以通过http(或https)提供服务。创建时,管理器会启动一个单独的goroutine来处理发布订阅的内部逻辑。

工作原理

长轮询管理器可以看作是一个使用通道处理发布订阅请求的goroutine。管理器有一个按类别存储事件的map[string]eventBuffer(实际上是map[string]*expiringBuffer),以及用于订阅请求簿记的数据结构。PublishHandlerSubscribeHandler通过通道与管理器goroutine交互。

事件存储在内存缓冲区中,每个类别有配置的最大事件数。可选地,可以基于生存时间设置自动删除事件。由于所有内容都在内存中,有一个可选的附加组件用于自动持久化和从磁盘重新填充数据。

自定义选项

可以通过golongpoll.Options配置长轮询管理器,包括:

  • MaxEventBufferSize - 每个类别的最大事件数,超过后按先进先出截断。默认为250
  • EventTimeToLiveSeconds - 事件在缓冲区中存在的时间,默认为永久(只要不超过MaxEventBufferSize)
  • AddOn - 提供自定义行为的可选方式

在Gin框架中使用

要在Gin HTTP框架应用中添加长轮询,只需用gin.Context包装golongpoll的管理器发布/订阅函数:

package main

import (
	"net/http"

	"github.com/gin-gonic/gin"

	"github.com/jcuga/golongpoll"
)

func main() {
	// 使用默认选项创建长轮询管理器
	manager, err := golongpoll.StartLongpoll(golongpoll.Options{})
	if err != nil {
		panic(err)
	}

	router := gin.Default()
	router.POST("/pub", wrapWithContext(manager.PublishHandler))
	router.GET("/sub", wrapWithContext(manager.SubscriptionHandler))
	router.Run(":8001")
}

func wrapWithContext(lpHandler func(http.ResponseWriter, *http.Request)) func(*gin.Context) {
	return func(c *gin.Context) {
		lpHandler(c.Writer, c.Request)
	}
}

完整示例

下面是一个完整的发布订阅示例:

package main

import (
	"fmt"
	"net/http"
	"time"

	"github.com/jcuga/golongpoll"
)

func main() {
	// 创建长轮询管理器
	manager, err := golongpoll.StartLongpoll(golongpoll.Options{
		MaxEventBufferSize:      100,      // 每个类别最多100个事件
		EventTimeToLiveSeconds: 60,       // 事件存活60秒
	})
	
	if err != nil {
		fmt.Println("Error creating longpoll manager:", err)
		return
	}

	// 设置HTTP处理程序
	http.HandleFunc("/events", manager.SubscriptionHandler)
	http.HandleFunc("/publish", manager.PublishHandler)
	
	// 启动一个goroutine定期发布事件
	go func() {
		for {
			// 发布到"time"类别
			manager.Publish("time", fmt.Sprintf("Current time: %v", time.Now()))
			time.Sleep(5 * time.Second)
		}
	}()
	
	// 启动HTTP服务器
	fmt.Println("Server starting on :8080...")
	http.ListenAndServe(":8080", nil)
}

这个示例创建了一个长轮询服务器,每5秒发布一次当前时间到"time"类别,客户端可以通过订阅"/events"端点来接收这些事件。


更多关于golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现简单高效的HTTP长轮询Web发布订阅服务插件库golongpoll的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用golongpoll实现HTTP长轮询发布订阅服务

golongpoll是一个简单高效的Go语言库,用于实现HTTP长轮询模式的发布订阅服务。下面我将详细介绍如何使用这个库。

安装

首先安装golongpoll库:

go get github.com/jcuga/golongpoll

基本使用

1. 创建长轮询管理器

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/jcuga/golongpoll"
)

func main() {
	// 创建长轮询管理器
	manager, err := golongpoll.StartLongpoll(golongpoll.Options{
		LoggingEnabled: true, // 启用日志
		// 消息保留1小时
		EventTimeToLiveSeconds: 60 * 60,
		// 长轮询请求最多等待90秒
		MaxLongpollTimeoutSeconds: 90,
	})
	if err != nil {
		log.Fatalf("Failed to create longpoll manager: %v", err)
	}
	defer manager.Shutdown() // 程序退出时关闭管理器

	// 设置HTTP路由
	http.HandleFunc("/events", manager.SubscriptionHandler)
	http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
		// 这里处理发布逻辑
	})

	// 启动HTTP服务器
	fmt.Println("Server starting on :8080...")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

2. 发布消息

// 发布消息到特定分类
func publishHandler(w http.ResponseWriter, r *http.Request, manager *golongpoll.LongpollManager) {
	category := r.URL.Query().Get("category")
	message := r.URL.Query().Get("message")
	
	if category == "" || message == "" {
		http.Error(w, "Missing category or message", http.StatusBadRequest)
		return
	}
	
	// 发布消息
	manager.Publish(category, message)
	fmt.Fprintf(w, "Message published to category '%s'", category)
}

3. 订阅消息

客户端可以通过向/events端点发送GET请求来订阅消息:

// 前端JavaScript示例
function subscribe(category) {
    fetch(`/events?category=${category}&timeout=30`)
        .then(response => response.json())
        .then(events => {
            // 处理接收到的事件
            events.forEach(event => {
                console.log("Received:", event.data);
            });
            // 继续订阅
            subscribe(category);
        })
        .catch(error => {
            console.error("Subscription error:", error);
            // 出错后重试
            setTimeout(() => subscribe(category), 1000);
        });
}

// 开始订阅
subscribe("news");

高级功能

1. 多分类订阅

// 客户端可以订阅多个分类
http.HandleFunc("/multi-events", func(w http.ResponseWriter, r *http.Request) {
    categories := r.URL.Query()["category"]
    if len(categories) == 0 {
        http.Error(w, "No categories specified", http.StatusBadRequest)
        return
    }
    
    // 使用manager.SubscriptionHandlerWithCategories处理多分类订阅
    manager.SubscriptionHandlerWithCategories(w, r, categories)
})

2. 自定义事件数据

// 可以发布结构化的自定义数据
type CustomEvent struct {
    UserID  int    `json:"user_id"`
    Message string `json:"message"`
    Time    int64  `json:"timestamp"`
}

// 发布自定义事件
event := CustomEvent{
    UserID:  123,
    Message: "Hello world",
    Time:    time.Now().Unix(),
}
manager.Publish("user_updates", event)

3. 安全性考虑

// 添加认证中间件
func authMiddleware(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        token := r.Header.Get("Authorization")
        if token != "valid-token" {
            http.Error(w, "Unauthorized", http.StatusUnauthorized)
            return
        }
        next(w, r)
    }
}

// 保护订阅端点
http.HandleFunc("/secure-events", authMiddleware(manager.SubscriptionHandler))

性能优化建议

  1. 调整超时时间:根据业务需求调整MaxLongpollTimeoutSeconds,通常30-90秒是合理范围。

  2. 分类粒度:合理设计分类系统,避免过于宽泛或过于精细的分类。

  3. 消息TTL:设置合适的EventTimeToLiveSeconds,避免内存无限增长。

  4. 连接限制:在生产环境中,应该限制单个客户端的连接数。

完整示例

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/jcuga/golongpoll"
)

func main() {
	// 初始化长轮询管理器
	manager, err := golongpoll.StartLongpoll(golongpoll.Options{
		LoggingEnabled:          true,
		EventTimeToLiveSeconds:  60 * 60,
		MaxLongpollTimeoutSeconds: 90,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer manager.Shutdown()

	// 设置路由
	http.HandleFunc("/events", manager.SubscriptionHandler)
	http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
		category := r.URL.Query().Get("category")
		message := r.URL.Query().Get("message")
		
		if category == "" || message == "" {
			http.Error(w, "Missing category or message", http.StatusBadRequest)
			return
		}
		
		manager.Publish(category, message)
		fmt.Fprintf(w, "Published to %s: %s", category, message)
	})

	// 模拟后台发布
	go func() {
		ticker := time.NewTicker(5 * time.Second)
		for range ticker.C {
			manager.Publish("time_updates", fmt.Sprintf("Current time: %v", time.Now()))
		}
	}()

	// 启动服务器
	log.Println("Starting server on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

golongpoll库提供了一种简单高效的方式来实现HTTP长轮询服务,适用于需要实时通知但又不适合WebSocket的场景。它的API设计简洁,性能良好,是构建实时应用的优秀选择。

回到顶部