Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时
Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时 使用 net/http Go 库为流式客户端和服务器请求响应创建了 HTTP/2 客户端和服务器程序。参考了#13444 (评论)中的代码。
服务器代码
func handle_stream_request(w http.ResponseWriter, r *http.Request) {
buf := make([]byte, 1024)
i := 0
for {
i = i + 1
if(i == 10){
break
}
len, _ := r.Body.Read(buf)
response_str := "Server ACK for: " + string(buf[:len])
w.Write([]byte(response_str))
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
}
客户端代码
func send_stream_request(client *http.Client){
pr, pw := io.Pipe()
req, _ := http.NewRequest("PUT", "https://localhost:8000/", ioutil.NopCloser(pr))
var res *http.Response
go func() {
for {
time.Sleep(2 * time.Second)
s := "Client ping@" + get_time()
pw.Write([]byte(s))
if res != nil {
buf := make([]byte, 1024)
len, _ := res.Body.Read(buf)
log.Printf("Response is: %s", string(buf[:len]))
}
}
}()
go func() {
response, _ := client.Do(req)
res = response
log.Printf("\n Got: %#v", res)
}()
select {}
}
期望看到什么?
客户端发送 10 个请求后,客户端程序应该成功退出。
实际看到什么?
服务器完成处理函数后,客户端程序在管道写入 “pw.Write([]byte(s))” 处阻塞,因为管道上没有读取器来读取数据。
我无法理解如何阻止此类客户端程序挂起。服务器向客户端发送什么信号表明请求流已关闭,客户端不应再尝试向管道写入更多内容。
更多关于Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时的实战教程也可以访问 https://www.itying.com/category-94-b0.html
感谢您的回复。但上面的代码是为了简洁易读而简化的版本。在我的实际代码中,确实处理了错误条件。但问题在于当服务器完成请求后,程序就是无法从 pw.Write([]byte(s)) 继续执行。我通过添加打印语句和使用调试器都验证了这一点。
我怀疑这与 net/http 库有关,可能是它没有正确关闭管道的读取端,导致写入端也被阻塞。
pw.Write([]byte(s))
更多关于Golang中如何关闭HTTP/2客户端流当服务器关闭请求流时的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Write 实现了标准的 Write 接口:它将数据写入管道,阻塞直到一个或多个读取器消耗完所有数据或读取端关闭。如果读取端因错误而关闭,则该错误将作为 err 返回;否则 err 为 ErrClosedPipe。
io 包 - io - Go 语言包
Package io provides basic interfaces to I/O primitives.
你没有处理 pr.Write 返回的错误,也没有在发送端中断循环。这可能是原因所在。
在HTTP/2流式通信中,当服务器端关闭请求流时,客户端需要检测到这一状态并停止向管道写入数据。问题在于你的客户端代码没有正确处理服务器关闭连接的情况。
以下是修改后的客户端代码,通过检测管道关闭和响应体EOF来正确处理流关闭:
func send_stream_request(client *http.Client) {
pr, pw := io.Pipe()
req, _ := http.NewRequest("PUT", "https://localhost:8000/", ioutil.NopCloser(pr))
var res *http.Response
var err error
// 用于协调goroutine的通道
done := make(chan bool)
serverClosed := make(chan bool)
// 发送数据的goroutine
go func() {
defer pw.Close()
for i := 0; i < 10; i++ {
select {
case <-serverClosed:
log.Println("Server closed the connection, stopping writes")
return
case <-done:
return
default:
time.Sleep(2 * time.Second)
s := "Client ping@" + get_time()
_, err := pw.Write([]byte(s))
if err != nil {
log.Printf("Write error: %v", err)
return
}
log.Printf("Sent: %s", s)
}
}
log.Println("Finished sending 10 requests")
close(done)
}()
// 接收响应的goroutine
go func() {
defer close(serverClosed)
response, err := client.Do(req)
if err != nil {
log.Printf("Request error: %v", err)
return
}
res = response
log.Printf("Got response: %#v", res)
// 持续读取服务器响应
buf := make([]byte, 1024)
for {
n, err := res.Body.Read(buf)
if n > 0 {
log.Printf("Response is: %s", string(buf[:n]))
}
if err != nil {
if err == io.EOF {
log.Println("Server closed the response stream")
} else {
log.Printf("Read error: %v", err)
}
res.Body.Close()
return
}
}
}()
// 等待完成或超时
select {
case <-done:
log.Println("Client finished successfully")
case <-time.After(30 * time.Second):
log.Println("Timeout reached")
case <-serverClosed:
log.Println("Server closed connection")
}
}
同时,建议修改服务器代码以更明确地处理流结束:
func handle_stream_request(w http.ResponseWriter, r *http.Request) {
buf := make([]byte, 1024)
for i := 0; i < 10; i++ {
n, err := r.Body.Read(buf)
if err != nil {
if err == io.EOF {
log.Println("Client closed the request stream")
break
}
log.Printf("Read error: %v", err)
break
}
if n > 0 {
response_str := "Server ACK for: " + string(buf[:n])
w.Write([]byte(response_str))
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
log.Printf("Processed request %d: %s", i+1, string(buf[:n]))
}
}
log.Println("Server finished processing 10 requests")
}
关键改进点:
- 在客户端发送goroutine中添加了对管道关闭的检测
- 使用通道来协调goroutine之间的状态
- 在读取响应体时检测EOF错误,这表示服务器关闭了流
- 添加了明确的完成条件和超时机制
- 服务器端也添加了对EOF的检测
这样当服务器处理完10个请求后,客户端能够检测到连接关闭并正常退出,而不是在管道写入时阻塞。

