Golang中如何正确关闭net.Listener

Golang中如何正确关闭net.Listener 我正在开发一个简单的服务器,该服务器监听TCP套接字,接受连接,从客户端读取并处理一些数据。在官方的Go文档以及许多帖子中都有这类服务器的优秀示例。但大多数示例都没有提供关于如何关闭监听器并清理现有连接的信息。

据我理解,正确的做法应该是:在关闭时,停止接受新连接;处理现有连接并等待它们关闭;最后,当所有现有连接都关闭后,停止服务器。

我尝试按以下方式实现:

package main

import (
	"fmt"
	"net"
	"os"
	"sync"
	"time"
)

type Server struct {
	listener  net.Listener
	conns     []net.Conn
	mtx       sync.Mutex
	cancelled bool
}

func NewServer() *Server {
	addr, err := net.ResolveTCPAddr("tcp4", ":9999")
	if err != nil {
		fmt.Println("Failed to resolve address", err.Error())
		os.Exit(1)
	}

	listener, err := net.Listen("tcp", addr.String())
	if err != nil {
		fmt.Println("Failed to create listener", err.Error())
		os.Exit(1)
	}

	srv := &Server{
		listener,
		make([]net.Conn, 0, 50),
		sync.Mutex{},
		false,
	}
	go srv.Serve()
	return srv
}

func (srv *Server) Serve() {
	var stop bool
	for {
		srv.mtx.Lock()
		stop = srv.cancelled
		srv.mtx.Unlock()
		if stop {
			break
		}

		conn, err := srv.listener.Accept()
		if err != nil {
			fmt.Println("Failed to accept connection:", err.Error())
			continue
		}
		srv.mtx.Lock()
		srv.conns = append(srv.conns, conn)
		srv.mtx.Unlock()
		go srv.handleConnection(conn, len(srv.conns)-1)
	}
}

func (srv *Server) handleConnection(conn net.Conn, id int) error {
	fmt.Println("Accepted connection from", conn.RemoteAddr())

	defer func() {
		fmt.Println("Closing connection from", conn.RemoteAddr())
		conn.Close()
		srv.mtx.Lock()
		srv.conns[id] = nil
		srv.mtx.Unlock()
	}()

	buf := make([]byte, 1024)
	_, err := conn.Read(buf)
	if err != nil {
		fmt.Println("Read error", err.Error())
		return err
	}
	return nil
}

func (srv *Server) Stop() {
	fmt.Println("Stop requested")
	srv.mtx.Lock()
	srv.cancelled = true
	srv.mtx.Unlock()

	srv.listener.Close()
	ticker := time.NewTicker(500 * time.Millisecond)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			fmt.Println("Waiting on connections", len(srv.conns))
		}
		if len(srv.conns) == 0 {
			fmt.Println("Stopped")
			return
		}
	}
}

func main() {
	s := NewServer()
	time.Sleep(2 * time.Second)
	s.Stop()
}

这是正确的、符合Go语言风格的方法吗?还是有其他更好的解决方案?

第二个问题:当服务器停止时,会出现"use of closed network connection"错误,例如:

停止请求 接受连接失败:接受tcp [::]:9999:使用了已关闭的网络连接 等待连接数 0 已停止

是否有可能隐藏这个错误,同时仍然捕获所有其他错误?


更多关于Golang中如何正确关闭net.Listener的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

非常感谢您的帮助!

更多关于Golang中如何正确关闭net.Listener的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


看看这里的评论是否有帮助:https://play.golang.org/p/OqiSY2peFBN

说实话,这种写法不太符合Go语言的风格。更地道的解决方案应该是:

(1) 使用一个goroutine来执行Accept()获取连接对象。只需接受连接并将其传递到通道中 (2) 使用多个工作协程来监听连接通道。它们从通道获取连接,处理连接,然后关闭连接

关闭流程意味着: (1) 通过退出Accept()所在的循环来停止监听 (2) 关闭连接通道。现在所有工作协程在下一轮连接通道循环迭代时都会退出 (3) 每个工作协程退出连接通道循环并发出完成信号(例如使用sync.WaitGroup.Done()) (4) 等待所有工作协程退出,例如使用sync.WaitGroup.Wait()

如果需要更多说明,请告诉我!

感谢您的帮助!

我尝试实现了类似您建议的工作流程,但在将连接放入/从通道接收以及在关闭时处理这些连接方面遇到了困难。能否就以下代码提供一些反馈或提示?

package main

import (
	"fmt"
	"net"
	"os"
	"time"
)

type Server struct {
	listener net.Listener
	quit     chan bool
	conns    chan net.Conn
}

func NewServer() *Server {
	addr, err := net.ResolveTCPAddr("tcp4", ":9999")
	if err != nil {
		fmt.Println("Failed to resolve address", err.Error())
		os.Exit(1)
	}

	listener, err := net.Listen("tcp", addr.String())
	if err != nil {
		fmt.Println("Failed to create listener", err.Error())
		os.Exit(1)
	}

	srv := &Server{
		listener,
		make(chan bool),
		make(chan net.Conn, 10),
	}
	go srv.Serve()
	return srv
}

func (srv *Server) Serve() {
	for {
		select {
		case <-srv.quit:
			fmt.Println("Shutting down...")
			srv.listener.Close()
			// FIXME: check that all existing connections were processed
			// and closed correctly
			srv.quit <- true
			return
		default:
			//fmt.Println("Listening for clients")
			srv.listener.SetDeadline(time.Now().Add(1e9))
			conn, err := srv.listener.Accept()
			if err != nil {
				if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
					continue
				}
				fmt.Println("Failed to accept connection:", err.Error())
			}
			srv.conns <- conn
			// FIXME: handle connections from the channel
		}
	}
}

func (srv *Server) handleConnection(conn net.Conn, id int) error {
	fmt.Println("Accepted connection from", conn.RemoteAddr())

	defer func() {
		fmt.Println("Closing connection from", conn.RemoteAddr())
		conn.Close()
	}()

	buf := make([]byte, 1024)
	_, err := conn.Read(buf)
	if err != nil {
		fmt.Println("Read error", err.Error())
		return err
	}
	return nil
}

func (srv *Server) Stop() {
	fmt.Println("Stop requested")
	srv.quit <- true
	<-srv.quit
	fmt.Println("Stopped successfully")
}

func main() {
	srv := NewServer()
	time.Sleep(2 * time.Second)
	srv.Stop()
}

在Go中正确关闭net.Listener需要仔细处理并发和连接管理。您的实现基本正确,但有几个改进点:

改进的连接管理

type Server struct {
    listener  net.Listener
    conns     map[int]net.Conn
    mtx       sync.RWMutex
    cancelled bool
    nextID    int
    wg        sync.WaitGroup
}

func NewServer() *Server {
    listener, err := net.Listen("tcp", ":9999")
    if err != nil {
        fmt.Println("Failed to create listener", err.Error())
        os.Exit(1)
    }

    srv := &Server{
        listener:  listener,
        conns:     make(map[int]net.Conn),
        mtx:       sync.RWMutex{},
        cancelled: false,
        nextID:    0,
        wg:        sync.WaitGroup{},
    }
    go srv.Serve()
    return srv
}

func (srv *Server) Serve() {
    for {
        conn, err := srv.listener.Accept()
        if err != nil {
            // 检查是否是监听器关闭导致的错误
            if srv.isCancelled() {
                return
            }
            fmt.Println("Failed to accept connection:", err.Error())
            continue
        }
        
        srv.mtx.Lock()
        id := srv.nextID
        srv.conns[id] = conn
        srv.nextID++
        srv.mtx.Unlock()
        
        srv.wg.Add(1)
        go srv.handleConnection(conn, id)
    }
}

func (srv *Server) handleConnection(conn net.Conn, id int) {
    defer srv.wg.Done()
    defer func() {
        conn.Close()
        srv.mtx.Lock()
        delete(srv.conns, id)
        srv.mtx.Unlock()
    }()

    fmt.Println("Accepted connection from", conn.RemoteAddr())
    
    buf := make([]byte, 1024)
    _, err := conn.Read(buf)
    if err != nil {
        if !srv.isCancelled() {
            fmt.Println("Read error", err.Error())
        }
        return
    }
}

func (srv *Server) isCancelled() bool {
    srv.mtx.RLock()
    defer srv.mtx.RUnlock()
    return srv.cancelled
}

func (srv *Server) Stop() {
    fmt.Println("Stop requested")
    
    srv.mtx.Lock()
    srv.cancelled = true
    srv.mtx.Unlock()

    // 关闭监听器,这会使得Accept()立即返回错误
    srv.listener.Close()
    
    // 关闭所有活跃连接
    srv.mtx.Lock()
    for id, conn := range srv.conns {
        conn.Close()
        delete(srv.conns, id)
    }
    srv.mtx.Unlock()
    
    // 等待所有处理协程完成
    srv.wg.Wait()
    fmt.Println("Stopped")
}

处理"use of closed network connection"错误

这个错误是预期的行为,可以通过检查错误类型来过滤:

func (srv *Server) Serve() {
    for {
        conn, err := srv.listener.Accept()
        if err != nil {
            if srv.isCancelled() {
                return
            }
            // 检查是否是监听器关闭导致的错误
            if opErr, ok := err.(*net.OpError); ok {
                if opErr.Err.Error() == "use of closed network connection" {
                    return
                }
            }
            // 或者使用errors包检查
            if errors.Is(err, net.ErrClosed) {
                return
            }
            fmt.Println("Failed to accept connection:", err.Error())
            continue
        }
        
        srv.mtx.Lock()
        id := srv.nextID
        srv.conns[id] = conn
        srv.nextID++
        srv.mtx.Unlock()
        
        srv.wg.Add(1)
        go srv.handleConnection(conn, id)
    }
}

使用context的替代方案

type Server struct {
    listener net.Listener
    conns    map[int]net.Conn
    mtx      sync.RWMutex
    nextID   int
    wg       sync.WaitGroup
    ctx      context.Context
    cancel   context.CancelFunc
}

func NewServer() *Server {
    listener, err := net.Listen("tcp", ":9999")
    if err != nil {
        fmt.Println("Failed to create listener", err.Error())
        os.Exit(1)
    }

    ctx, cancel := context.WithCancel(context.Background())
    
    srv := &Server{
        listener: listener,
        conns:    make(map[int]net.Conn),
        ctx:      ctx,
        cancel:   cancel,
    }
    go srv.Serve()
    return srv
}

func (srv *Server) Serve() {
    for {
        select {
        case <-srv.ctx.Done():
            return
        default:
        }
        
        conn, err := srv.listener.Accept()
        if err != nil {
            if errors.Is(err, net.ErrClosed) {
                return
            }
            fmt.Println("Failed to accept connection:", err.Error())
            continue
        }
        
        srv.mtx.Lock()
        id := srv.nextID
        srv.conns[id] = conn
        srv.nextID++
        srv.mtx.Unlock()
        
        srv.wg.Add(1)
        go srv.handleConnection(conn, id)
    }
}

func (srv *Server) Stop() {
    fmt.Println("Stop requested")
    srv.cancel()
    srv.listener.Close()
    
    srv.mtx.Lock()
    for id, conn := range srv.conns {
        conn.Close()
        delete(srv.conns, id)
    }
    srv.mtx.Unlock()
    
    srv.wg.Wait()
    fmt.Println("Stopped")
}

主要改进:

  1. 使用sync.WaitGroup来等待所有连接处理完成
  2. 使用map而不是slice来管理连接,避免nil值问题
  3. 使用context进行更清晰的取消机制
  4. 通过错误类型检查来过滤预期的关闭错误

这种方法提供了更清晰、更健壮的服务器关闭机制。

回到顶部