Golang实现HTTP流式Comet客户端

Golang实现HTTP流式Comet客户端 我正在尝试实现一个Comet协议的客户端。 在“流式”模式下,服务器使用分块传输编码,通过单一的HTTP通道返回所有结果。

Cache-Control: no-cache
Connection: keep-alive
Pragma: no-cache
Transfer-Encoding: chunked
Content-Type: application/json

使用以下代码,我可以收到来自创建服务器的命令的第一个响应,但除此之外没有其他内容。后续的其他消息都丢失了。

	p := make([]byte, 50)
	rd := bufio.NewReader(body)
	for {
		l, err := rd.Read(p)
		if err != nil {
			log.Fatal("Read Error:", err)
			return
		}
		fmt.Println(l, string(p))
	}

一分钟后出现错误信息:

2019/09/24 17:37:13 Read Error:unexpected EOF
exit status 1

看起来通道在第一个响应之后就被关闭了。 有什么想法吗?


更多关于Golang实现HTTP流式Comet客户端的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

谢谢马蒂亚斯,但我无法控制服务器。 我正在尝试使用Bayeux协议将我的客户端连接到它。

更多关于Golang实现HTTP流式Comet客户端的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我不太熟悉 Comet,但看起来您想实现服务器发送事件。大多数现代浏览器都支持 SSE 流,并且有相关的库可以使用。

感谢您的建议。是的,我认为原理是相似的。

我尝试从 https://github.com/r3labs/sse 获取读取器代码,结果是一样的:“Read Error:unexpected EOF”。无论我使用哪种读取器或方法,当发送第二条消息时,我都会收到相同的错误。因此,我认为我的问题出在更底层的层面。

供参考,这里是关于 comet/bayeux 协议的文档:https://docs.cometd.org/current/reference/#_concepts_bayeux_protocol

你好

我使用了以下模板,它功能完全正常…

GitHub

kljensen/golang-html5-sse-example

Avatar

HTML5 Server Sent Events with Go. Contribute to kljensen/golang-html5-sse-example development by creating an account on GitHub.

你必须考虑到最大事件通道数是6!

此致

不幸的是,我毫无进展: 我有一个简单的循环来获取消息。

rsp, err := c.http.Do(req)
if err != nil {
	return err
}

defer rsp.Body.Close()
dec := json.NewDecoder(rsp.Body)

for {
	fmt.Println("Loop")
	var messages []message
	err = dec.Decode(&messages)
	if err != nil {
		if err == io.EOF {
			fmt.Println("EOF")
		}
		return err
	}
    fmt.Println("Handshake")
}

但在收到第一条消息后,我遇到了EOF,可能是因为第一条消息已经完整接收。

Loop
Handshake
Loop
EOF

我该如何重置读取器,以便它在同一连接上(无需发起新请求)等待下一条消息?

问题出在你使用了bufio.ReaderRead方法,这个方法会在遇到EOF时返回错误。对于分块传输编码的流式响应,你应该使用http.Response.Body的流式读取方式。以下是正确的实现方法:

package main

import (
    "bufio"
    "fmt"
    "io"
    "log"
    "net/http"
)

func main() {
    resp, err := http.Get("http://your-server.com/comet-endpoint")
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)
    
    for {
        // 读取直到换行符,每个chunk以CRLF结束
        line, err := reader.ReadBytes('\n')
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Fatal("Read error:", err)
        }
        
        // 处理chunk数据(这里需要根据你的实际协议解析)
        fmt.Printf("Received: %s", line)
    }
}

或者,如果你需要处理JSON格式的流式数据,可以使用更精细的读取方式:

func streamCometClient() {
    resp, err := http.Get("http://your-server.com/comet-endpoint")
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    decoder := json.NewDecoder(resp.Body)
    
    for {
        var data map[string]interface{}
        if err := decoder.Decode(&data); err != nil {
            if err == io.EOF {
                break
            }
            log.Println("Decode error:", err)
            continue
        }
        
        fmt.Printf("Received: %v\n", data)
    }
}

如果你的服务器发送的是以特定分隔符分隔的消息,可以使用Scanner:

func cometClientWithScanner() {
    resp, err := http.Get("http://your-server.com/comet-endpoint")
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    scanner := bufio.NewScanner(resp.Body)
    for scanner.Scan() {
        message := scanner.Text()
        fmt.Printf("Received: %s\n", message)
    }
    
    if err := scanner.Err(); err != nil {
        log.Fatal("Scanner error:", err)
    }
}

关键点:

  1. 不要使用固定大小的缓冲区读取
  2. 正确处理chunked编码的边界
  3. 根据服务器的实际响应格式选择合适的读取方式
  4. 注意处理EOF错误,它可能表示正常结束而不是错误

对于Comet协议,通常服务器会保持连接开放并持续发送数据,直到显式关闭连接。确保你的代码能够持续读取直到连接被服务器关闭。

回到顶部