Golang中如何分块处理HTTP响应数据
Golang中如何分块处理HTTP响应数据 我正在尝试使用传输编码:分块来消费HTTP响应。我原以为可以在数据块出现在正文时逐个读取它们。但实际上,似乎只有在服务器发送完最后一个数据块后,才能读取正文。
我确信在设置客户端时遗漏了某些内容,因为当我使用curl请求服务器时,它工作得很好,并且curl在服务器发送数据块时直接打印它们。
以下是我的设置:
Go:1.10.1 windows/amd64
发送请求
import (
"net"
"net/http"
"time"
)
func Stream(objectSignature interface{}) (StreamInterface, error) {
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
}
httpClient := &http.Client{
Transport: roundTripper,
Timeout: 60 * time.Second,
}
httpRequest = http.NewRequest("GET", "http://localhost:8080/api/v1/steam", nil)
// set the accept content type
httpRequest.Header.Set("Accept", "application/json")
// send the request
resp, err := httpClient.Do(httpRequest)
if err != nil {
ctx := httpRequest.Context()
if ctx != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
}
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("wrong status code %d",resp.StatusCode)
}
// initialize stream reader
dec := newDecoder(resp.Body)
return NewStream(dec), nil
}
流消费者
import (
"fmt"
"io"
"reflect"
"sync"
)
func newDecoder(body io.ReadCloser) *decoder {
return &decoder{
r: body,
}
}
type decoder struct {
r io.ReadCloser
}
func (d *decoder) close() error {
return d.r.Close()
}
func (d *decoder) decode() error {
buffer := make([]byte,256)
_, err := d.r.Read(buffer)
if err != nil {
fmt.Println(fmt.Sprintf("error : %s",err))
return err
}
fmt.Println(buffer)
return nil
}
type StreamInterface interface {
Stop()
}
func NewStream(source *decoder) StreamInterface {
stream := &streamImpl{
source: source,
result: make(chan model.Event),
mutex: sync.Mutex{},
}
go stream.receive()
return stream
}
type streamImpl struct {
StreamInterface
mutex sync.Mutex
source *decoder
result chan model.Event
stopped bool
}
func (s *streamImpl) Stop() {
s.mutex.Lock()
defer s.mutex.Unlock()
if !s.stopped {
s.stopped = true
}
}
func (s *streamImpl) isStopped() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.stopped
}
//receive reads result from io, decodes the data and sends it to the result channel
func (s *streamImpl) receive() {
defer s.source.close()
for {
if s.isStopped() {
return
}
err := s.source.decode()
if err != nil {
return
}
}
}
服务器端
我使用了在echo上可以看到的代码:https://echo.labstack.com/cookbook/streaming-response。
在演示中,我将time.sleep增加到10 * time.Second
结果
使用curl:
curl -v http://localhost:8080/api/v1/stream
Curl日志
- STATE: INIT => CONNECT handle 0x8af160; line 1392 (connection #-5000)
- Added connection 0. The cache now contains 1 members
- STATE: CONNECT => WAITRESOLVE handle 0x8af160; line 1428 (connection #0)
- Trying 127.0.0.1…
- TCP_NODELAY set
- STATE: WAITRESOLVE => WAITCONNECT handle 0x8af160; line 1509 (connection #0)
- Connected to localhost (127.0.0.1) port 8080 (#0)
- STATE: WAITCONNECT => SENDPROTOCONNECT handle 0x8af160; line 1561 (connection #0)
- Marked for [keep alive]: HTTP default
- STATE: SENDPROTOCONNECT => DO handle 0x8af160; line 1579 (connection #0)
GET /api/v1/prometheis/stream HTTP/1.1 Host: localhost:8080 User-Agent: curl/7.58.0 Accept: /
- STATE: DO => DO_DONE handle 0x8af160; line 1658 (connection #0)
- STATE: DO_DONE => WAITPERFORM handle 0x8af160; line 1783 (connection #0)
- STATE: WAITPERFORM => PERFORM handle 0x8af160; line 1799 (connection #0)
- HTTP 1.1 or later with persistent connection, pipelining supported < HTTP/1.1 200 OK < Content-Type: application/json < Vary: Accept-Encoding < Date: Wed, 31 Oct 2018 09:41:05 GMT < Transfer-Encoding: chunked
{“Altitude”:-97,“Latitude”:37.819929,“Longitude”:-122.478255} {“Altitude”:1899,“Latitude”:39.096849,“Longitude”:-120.032351} {“Altitude”:2619,“Latitude”:37.865101,“Longitude”:-119.538329} {“Altitude”:42,“Latitude”:33.812092,“Longitude”:-117.918974} {“Altitude”:15,“Latitude”:37.77493,“Longitude”:-122.419416}
使用go程序:
Get http://localhost:8080/api/v1/stream: net/http: request canceled (Client.Timeout exceeded while awaiting headers)
如果我减少服务器端的时间,它会等待5秒后才打印字节
如果有人知道我哪里做错了,那将非常有帮助!
提前感谢大家的帮助。
此致,
更多关于Golang中如何分块处理HTTP响应数据的实战教程也可以访问 https://www.itying.com/category-94-b0.html
在Go中处理分块HTTP响应时,问题通常出现在读取响应体的方式上。你的代码使用了固定大小的缓冲区读取,但未正确处理分块编码的流式特性。Go的net/http客户端默认支持分块传输编码,但需要正确使用响应体的读取方式。
以下是修改后的代码示例,使用bufio.Scanner或持续读取来处理分块数据:
import (
"bufio"
"fmt"
"io"
"net/http"
"net"
"time"
"sync"
)
func Stream() (StreamInterface, error) {
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
}
httpClient := &http.Client{
Transport: roundTripper,
Timeout: 60 * time.Second,
}
httpRequest, err := http.NewRequest("GET", "http://localhost:8080/api/v1/stream", nil)
if err != nil {
return nil, err
}
httpRequest.Header.Set("Accept", "application/json")
resp, err := httpClient.Do(httpRequest)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("wrong status code %d", resp.StatusCode)
}
dec := newDecoder(resp.Body)
return NewStream(dec), nil
}
type decoder struct {
r io.ReadCloser
}
func newDecoder(body io.ReadCloser) *decoder {
return &decoder{
r: body,
}
}
func (d *decoder) close() error {
return d.r.Close()
}
func (d *decoder) decode() error {
scanner := bufio.NewScanner(d.r)
for scanner.Scan() {
data := scanner.Bytes()
fmt.Printf("Received chunk: %s\n", string(data))
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}
type StreamInterface interface {
Stop()
}
type streamImpl struct {
mutex sync.Mutex
source *decoder
stopped bool
}
func NewStream(source *decoder) StreamInterface {
stream := &streamImpl{
source: source,
}
go stream.receive()
return stream
}
func (s *streamImpl) Stop() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.stopped = true
}
func (s *streamImpl) isStopped() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.stopped
}
func (s *streamImpl) receive() {
defer s.source.close()
for {
if s.isStopped() {
return
}
err := s.source.decode()
if err != nil {
if err != io.EOF {
fmt.Printf("Decode error: %v\n", err)
}
return
}
}
}
关键修改点:
- 使用
bufio.Scanner逐行读取分块数据,假设服务器以换行符分隔JSON对象。 - 移除固定缓冲区读取,避免阻塞直到所有数据到达。
- 保持连接处理,确保在流结束时正确关闭。
如果服务器发送的是纯分块数据(不带换行符),可以使用以下替代decode方法:
func (d *decoder) decode() error {
buffer := make([]byte, 1024)
for {
n, err := d.r.Read(buffer)
if n > 0 {
fmt.Printf("Received chunk: %s\n", string(buffer[:n]))
}
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}
这允许在数据块到达时立即处理它们,模拟curl的行为。确保服务器正确实现分块传输,并且客户端超时设置足够长以避免中断。


