Golang实现HTTP流式Comet客户端
Golang实现HTTP流式Comet客户端 我正在尝试实现一个Comet协议的客户端。 在“流式”模式下,服务器使用分块传输编码,通过单一的HTTP通道返回所有结果。
Cache-Control: no-cache
Connection: keep-alive
Pragma: no-cache
Transfer-Encoding: chunked
Content-Type: application/json
使用以下代码,我可以收到来自创建服务器的命令的第一个响应,但除此之外没有其他内容。后续的其他消息都丢失了。
p := make([]byte, 50)
rd := bufio.NewReader(body)
for {
l, err := rd.Read(p)
if err != nil {
log.Fatal("Read Error:", err)
return
}
fmt.Println(l, string(p))
}
一分钟后出现错误信息:
2019/09/24 17:37:13 Read Error:unexpected EOF
exit status 1
看起来通道在第一个响应之后就被关闭了。 有什么想法吗?
更多关于Golang实现HTTP流式Comet客户端的实战教程也可以访问 https://www.itying.com/category-94-b0.html
谢谢马蒂亚斯,但我无法控制服务器。 我正在尝试使用Bayeux协议将我的客户端连接到它。
更多关于Golang实现HTTP流式Comet客户端的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我不太熟悉 Comet,但看起来您想实现服务器发送事件。大多数现代浏览器都支持 SSE 流,并且有相关的库可以使用。
感谢您的建议。是的,我认为原理是相似的。
我尝试从 https://github.com/r3labs/sse 获取读取器代码,结果是一样的:“Read Error:unexpected EOF”。无论我使用哪种读取器或方法,当发送第二条消息时,我都会收到相同的错误。因此,我认为我的问题出在更底层的层面。
供参考,这里是关于 comet/bayeux 协议的文档:https://docs.cometd.org/current/reference/#_concepts_bayeux_protocol
你好
我使用了以下模板,它功能完全正常…
kljensen/golang-html5-sse-example
HTML5 Server Sent Events with Go. Contribute to kljensen/golang-html5-sse-example development by creating an account on GitHub.
你必须考虑到最大事件通道数是6!
此致
不幸的是,我毫无进展: 我有一个简单的循环来获取消息。
rsp, err := c.http.Do(req)
if err != nil {
return err
}
defer rsp.Body.Close()
dec := json.NewDecoder(rsp.Body)
for {
fmt.Println("Loop")
var messages []message
err = dec.Decode(&messages)
if err != nil {
if err == io.EOF {
fmt.Println("EOF")
}
return err
}
fmt.Println("Handshake")
}
但在收到第一条消息后,我遇到了EOF,可能是因为第一条消息已经完整接收。
Loop
Handshake
Loop
EOF
我该如何重置读取器,以便它在同一连接上(无需发起新请求)等待下一条消息?
问题出在你使用了bufio.Reader的Read方法,这个方法会在遇到EOF时返回错误。对于分块传输编码的流式响应,你应该使用http.Response.Body的流式读取方式。以下是正确的实现方法:
package main
import (
"bufio"
"fmt"
"io"
"log"
"net/http"
)
func main() {
resp, err := http.Get("http://your-server.com/comet-endpoint")
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
for {
// 读取直到换行符,每个chunk以CRLF结束
line, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
break
}
log.Fatal("Read error:", err)
}
// 处理chunk数据(这里需要根据你的实际协议解析)
fmt.Printf("Received: %s", line)
}
}
或者,如果你需要处理JSON格式的流式数据,可以使用更精细的读取方式:
func streamCometClient() {
resp, err := http.Get("http://your-server.com/comet-endpoint")
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
for {
var data map[string]interface{}
if err := decoder.Decode(&data); err != nil {
if err == io.EOF {
break
}
log.Println("Decode error:", err)
continue
}
fmt.Printf("Received: %v\n", data)
}
}
如果你的服务器发送的是以特定分隔符分隔的消息,可以使用Scanner:
func cometClientWithScanner() {
resp, err := http.Get("http://your-server.com/comet-endpoint")
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
message := scanner.Text()
fmt.Printf("Received: %s\n", message)
}
if err := scanner.Err(); err != nil {
log.Fatal("Scanner error:", err)
}
}
关键点:
- 不要使用固定大小的缓冲区读取
- 正确处理chunked编码的边界
- 根据服务器的实际响应格式选择合适的读取方式
- 注意处理EOF错误,它可能表示正常结束而不是错误
对于Comet协议,通常服务器会保持连接开放并持续发送数据,直到显式关闭连接。确保你的代码能够持续读取直到连接被服务器关闭。

