Golang中如何分块处理HTTP响应数据

Golang中如何分块处理HTTP响应数据 我正在尝试使用传输编码:分块来消费HTTP响应。我原以为可以在数据块出现在正文时逐个读取它们。但实际上,似乎只有在服务器发送完最后一个数据块后,才能读取正文。

我确信在设置客户端时遗漏了某些内容,因为当我使用curl请求服务器时,它工作得很好,并且curl在服务器发送数据块时直接打印它们。

以下是我的设置:

Go:1.10.1 windows/amd64

发送请求

import (
   "net"
   "net/http"
   "time"
)

func Stream(objectSignature interface{}) (StreamInterface, error) {
   roundTripper := &http.Transport{
     Proxy: http.ProxyFromEnvironment,
		DialContext: (&net.Dialer{
			Timeout:   60 * time.Second,
			KeepAlive: 60 * time.Second,
		}).DialContext,
   }
   httpClient := &http.Client{
		Transport: roundTripper,
		Timeout:   60 * time.Second,
   }
   
   httpRequest = http.NewRequest("GET", "http://localhost:8080/api/v1/steam", nil)
   
   // set the accept content type
   httpRequest.Header.Set("Accept", "application/json")
   
   // send the request
   resp, err := httpClient.Do(httpRequest)
   
   if err != nil {
		ctx := httpRequest.Context()
		if ctx != nil {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
			}
		}
		return nil, err
	}
   
   if resp.StatusCode != http.StatusOK {
		defer resp.Body.Close()
		return nil, fmt.Errorf("wrong status code %d",resp.StatusCode)
   }
   
   // initialize stream reader
	dec := newDecoder(resp.Body)
	return NewStream(dec), nil
}

流消费者

import (
        "fmt"
        "io"
        "reflect"
        "sync"
)

func newDecoder(body io.ReadCloser) *decoder {
	return &decoder{
		r:      body,
	}
}

type decoder struct {
	r      io.ReadCloser
}

func (d *decoder) close() error {
	return d.r.Close()
}

func (d *decoder) decode() error {
	buffer := make([]byte,256)
	_, err := d.r.Read(buffer)
	if err != nil {
		fmt.Println(fmt.Sprintf("error : %s",err))
		return err
	}

	fmt.Println(buffer)
	return nil
}

type StreamInterface interface {
	Stop()
}

func NewStream(source *decoder) StreamInterface {
	stream := &streamImpl{
		source: source,
		result: make(chan model.Event),
		mutex:  sync.Mutex{},
	}

	go stream.receive()
	return stream
}

type streamImpl struct {
	StreamInterface
	mutex   sync.Mutex
	source  *decoder
	result  chan model.Event
	stopped bool
}

func (s *streamImpl) Stop() {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	if !s.stopped {
		s.stopped = true
	}
}

func (s *streamImpl) isStopped() bool {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	return s.stopped
}

//receive reads result from io, decodes the data and sends it to the result channel
func (s *streamImpl) receive() {
	defer s.source.close()
	for {
		if s.isStopped() {
			return
		}
		
		err := s.source.decode()
		if err != nil {
			return
		}
	}
}

服务器端

我使用了在echo上可以看到的代码:https://echo.labstack.com/cookbook/streaming-response。 在演示中,我将time.sleep增加到10 * time.Second

结果

使用curl:

curl -v http://localhost:8080/api/v1/stream

Curl日志
  • STATE: INIT => CONNECT handle 0x8af160; line 1392 (connection #-5000)
  • Added connection 0. The cache now contains 1 members
  • STATE: CONNECT => WAITRESOLVE handle 0x8af160; line 1428 (connection #0)
  • Trying 127.0.0.1…
  • TCP_NODELAY set
  • STATE: WAITRESOLVE => WAITCONNECT handle 0x8af160; line 1509 (connection #0)
  • Connected to localhost (127.0.0.1) port 8080 (#0)
  • STATE: WAITCONNECT => SENDPROTOCONNECT handle 0x8af160; line 1561 (connection #0)
  • Marked for [keep alive]: HTTP default
  • STATE: SENDPROTOCONNECT => DO handle 0x8af160; line 1579 (connection #0)

GET /api/v1/prometheis/stream HTTP/1.1 Host: localhost:8080 User-Agent: curl/7.58.0 Accept: /

  • STATE: DO => DO_DONE handle 0x8af160; line 1658 (connection #0)
  • STATE: DO_DONE => WAITPERFORM handle 0x8af160; line 1783 (connection #0)
  • STATE: WAITPERFORM => PERFORM handle 0x8af160; line 1799 (connection #0)
  • HTTP 1.1 or later with persistent connection, pipelining supported < HTTP/1.1 200 OK < Content-Type: application/json < Vary: Accept-Encoding < Date: Wed, 31 Oct 2018 09:41:05 GMT < Transfer-Encoding: chunked

{“Altitude”:-97,“Latitude”:37.819929,“Longitude”:-122.478255} {“Altitude”:1899,“Latitude”:39.096849,“Longitude”:-120.032351} {“Altitude”:2619,“Latitude”:37.865101,“Longitude”:-119.538329} {“Altitude”:42,“Latitude”:33.812092,“Longitude”:-117.918974} {“Altitude”:15,“Latitude”:37.77493,“Longitude”:-122.419416}

使用go程序:

Get http://localhost:8080/api/v1/stream: net/http: request canceled (Client.Timeout exceeded while awaiting headers)

如果我减少服务器端的时间,它会等待5秒后才打印字节

如果有人知道我哪里做错了,那将非常有帮助!

提前感谢大家的帮助。

此致,


更多关于Golang中如何分块处理HTTP响应数据的实战教程也可以访问 https://www.itying.com/category-94-b0.html

2 回复

这似乎是因为我在API级别启用了gzip压缩。看起来我需要手动对每个数据块内容进行gzip压缩

更多关于Golang中如何分块处理HTTP响应数据的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中处理分块HTTP响应时,问题通常出现在读取响应体的方式上。你的代码使用了固定大小的缓冲区读取,但未正确处理分块编码的流式特性。Go的net/http客户端默认支持分块传输编码,但需要正确使用响应体的读取方式。

以下是修改后的代码示例,使用bufio.Scanner或持续读取来处理分块数据:

import (
    "bufio"
    "fmt"
    "io"
    "net/http"
    "net"
    "time"
    "sync"
)

func Stream() (StreamInterface, error) {
    roundTripper := &http.Transport{
        Proxy: http.ProxyFromEnvironment,
        DialContext: (&net.Dialer{
            Timeout:   60 * time.Second,
            KeepAlive: 60 * time.Second,
        }).DialContext,
    }
    httpClient := &http.Client{
        Transport: roundTripper,
        Timeout:   60 * time.Second,
    }

    httpRequest, err := http.NewRequest("GET", "http://localhost:8080/api/v1/stream", nil)
    if err != nil {
        return nil, err
    }
    httpRequest.Header.Set("Accept", "application/json")

    resp, err := httpClient.Do(httpRequest)
    if err != nil {
        return nil, err
    }

    if resp.StatusCode != http.StatusOK {
        resp.Body.Close()
        return nil, fmt.Errorf("wrong status code %d", resp.StatusCode)
    }

    dec := newDecoder(resp.Body)
    return NewStream(dec), nil
}

type decoder struct {
    r io.ReadCloser
}

func newDecoder(body io.ReadCloser) *decoder {
    return &decoder{
        r: body,
    }
}

func (d *decoder) close() error {
    return d.r.Close()
}

func (d *decoder) decode() error {
    scanner := bufio.NewScanner(d.r)
    for scanner.Scan() {
        data := scanner.Bytes()
        fmt.Printf("Received chunk: %s\n", string(data))
    }
    if err := scanner.Err(); err != nil {
        return err
    }
    return nil
}

type StreamInterface interface {
    Stop()
}

type streamImpl struct {
    mutex   sync.Mutex
    source  *decoder
    stopped bool
}

func NewStream(source *decoder) StreamInterface {
    stream := &streamImpl{
        source: source,
    }
    go stream.receive()
    return stream
}

func (s *streamImpl) Stop() {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    s.stopped = true
}

func (s *streamImpl) isStopped() bool {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    return s.stopped
}

func (s *streamImpl) receive() {
    defer s.source.close()
    for {
        if s.isStopped() {
            return
        }
        err := s.source.decode()
        if err != nil {
            if err != io.EOF {
                fmt.Printf("Decode error: %v\n", err)
            }
            return
        }
    }
}

关键修改点:

  1. 使用bufio.Scanner逐行读取分块数据,假设服务器以换行符分隔JSON对象。
  2. 移除固定缓冲区读取,避免阻塞直到所有数据到达。
  3. 保持连接处理,确保在流结束时正确关闭。

如果服务器发送的是纯分块数据(不带换行符),可以使用以下替代decode方法:

func (d *decoder) decode() error {
    buffer := make([]byte, 1024)
    for {
        n, err := d.r.Read(buffer)
        if n > 0 {
            fmt.Printf("Received chunk: %s\n", string(buffer[:n]))
        }
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
    }
}

这允许在数据块到达时立即处理它们,模拟curl的行为。确保服务器正确实现分块传输,并且客户端超时设置足够长以避免中断。

回到顶部