Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时

Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时 使用 net/http Go 库为流式客户端和服务器请求响应创建了 HTTP/2 客户端和服务器程序。参考了#13444 (评论)中的代码。

服务器代码

func handle_stream_request(w http.ResponseWriter, r *http.Request) {

buf := make([]byte, 1024)
i := 0
for {
	i = i + 1
	if(i == 10){
		break
	}
	len, _ := r.Body.Read(buf)
	response_str := "Server ACK for: " + string(buf[:len])
        w.Write([]byte(response_str))
        if f, ok := w.(http.Flusher); ok {
	   f.Flush()
        }
}


}

客户端代码

func send_stream_request(client *http.Client){
pr, pw := io.Pipe()
req, _ := http.NewRequest("PUT", "https://localhost:8000/", ioutil.NopCloser(pr))
var res *http.Response

go func() {
	for {
		time.Sleep(2 * time.Second)
		s := "Client ping@" + get_time()
		pw.Write([]byte(s))
		if res != nil {
			buf := make([]byte, 1024)
			len, _ := res.Body.Read(buf)
			log.Printf("Response is: %s", string(buf[:len]))
		}
	}
}()

go func() {
	response, _ := client.Do(req)
	res = response
	log.Printf("\n Got: %#v", res)
}()
select {}

}

期望看到什么?

客户端发送 10 个请求后,客户端程序应该成功退出。

实际看到什么?

服务器完成处理函数后,客户端程序在管道写入 “pw.Write([]byte(s))” 处阻塞,因为管道上没有读取器来读取数据。

我无法理解如何阻止此类客户端程序挂起。服务器向客户端发送什么信号表明请求流已关闭,客户端不应再尝试向管道写入更多内容。


更多关于Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

感谢您的回复。但上面的代码是为了简洁易读而简化的版本。在我的实际代码中,确实处理了错误条件。但问题在于当服务器完成请求后,程序就是无法从 pw.Write([]byte(s)) 继续执行。我通过添加打印语句和使用调试器都验证了这一点。

我怀疑这与 net/http 库有关,可能是它没有正确关闭管道的读取端,导致写入端也被阻塞。

pw.Write([]byte(s))

更多关于Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Write 实现了标准的 Write 接口:它将数据写入管道,阻塞直到一个或多个读取器消耗完所有数据或读取端关闭。如果读取端因错误而关闭,则该错误将作为 err 返回;否则 err 为 ErrClosedPipe。

pkg.go.dev

io 包 - io - Go 语言包

Package io provides basic interfaces to I/O primitives.

你没有处理 pr.Write 返回的错误,也没有在发送端中断循环。这可能是原因所在。

在HTTP/2流式通信中,当服务器端关闭请求流时,客户端需要检测到这一状态并停止向管道写入数据。问题在于你的客户端代码没有正确处理服务器关闭连接的情况。

以下是修改后的客户端代码,通过检测管道关闭和响应体EOF来正确处理流关闭:

func send_stream_request(client *http.Client) {
    pr, pw := io.Pipe()
    req, _ := http.NewRequest("PUT", "https://localhost:8000/", ioutil.NopCloser(pr))
    var res *http.Response
    var err error

    // 用于协调goroutine的通道
    done := make(chan bool)
    serverClosed := make(chan bool)

    // 发送数据的goroutine
    go func() {
        defer pw.Close()
        
        for i := 0; i < 10; i++ {
            select {
            case <-serverClosed:
                log.Println("Server closed the connection, stopping writes")
                return
            case <-done:
                return
            default:
                time.Sleep(2 * time.Second)
                s := "Client ping@" + get_time()
                _, err := pw.Write([]byte(s))
                if err != nil {
                    log.Printf("Write error: %v", err)
                    return
                }
                log.Printf("Sent: %s", s)
            }
        }
        log.Println("Finished sending 10 requests")
        close(done)
    }()

    // 接收响应的goroutine
    go func() {
        defer close(serverClosed)
        
        response, err := client.Do(req)
        if err != nil {
            log.Printf("Request error: %v", err)
            return
        }
        res = response
        log.Printf("Got response: %#v", res)

        // 持续读取服务器响应
        buf := make([]byte, 1024)
        for {
            n, err := res.Body.Read(buf)
            if n > 0 {
                log.Printf("Response is: %s", string(buf[:n]))
            }
            if err != nil {
                if err == io.EOF {
                    log.Println("Server closed the response stream")
                } else {
                    log.Printf("Read error: %v", err)
                }
                res.Body.Close()
                return
            }
        }
    }()

    // 等待完成或超时
    select {
    case <-done:
        log.Println("Client finished successfully")
    case <-time.After(30 * time.Second):
        log.Println("Timeout reached")
    case <-serverClosed:
        log.Println("Server closed connection")
    }
}

同时,建议修改服务器代码以更明确地处理流结束:

func handle_stream_request(w http.ResponseWriter, r *http.Request) {
    buf := make([]byte, 1024)
    
    for i := 0; i < 10; i++ {
        n, err := r.Body.Read(buf)
        if err != nil {
            if err == io.EOF {
                log.Println("Client closed the request stream")
                break
            }
            log.Printf("Read error: %v", err)
            break
        }
        
        if n > 0 {
            response_str := "Server ACK for: " + string(buf[:n])
            w.Write([]byte(response_str))
            if f, ok := w.(http.Flusher); ok {
                f.Flush()
            }
            log.Printf("Processed request %d: %s", i+1, string(buf[:n]))
        }
    }
    
    log.Println("Server finished processing 10 requests")
}

关键改进点:

  1. 在客户端发送goroutine中添加了对管道关闭的检测
  2. 使用通道来协调goroutine之间的状态
  3. 在读取响应体时检测EOF错误,这表示服务器关闭了流
  4. 添加了明确的完成条件和超时机制
  5. 服务器端也添加了对EOF的检测

这样当服务器处理完10个请求后,客户端能够检测到连接关闭并正常退出,而不是在管道写入时阻塞。

回到顶部