Golang中如何正确关闭监听套接字服务器

Golang中如何正确关闭监听套接字服务器 在论坛成员的帮助下,我创建了这个支持优雅停止的简单"回声"服务器:当请求停止时,已打开的连接会在退出前得到正确处理。

我想知道这个实现是否足够好,或者是否存在一些潜在问题?以下是代码:

package main

import (
	"fmt"
	"net"
	"os"
	"sync"
	"time"
)

type Server struct {
	listener  *net.TCPListener
	quit      chan bool
	waitGroup *sync.WaitGroup
}

func NewService() *Server {
	addr, err := net.ResolveTCPAddr("tcp4", ":9999")
	if err != nil {
		fmt.Println("Failed to resolve address", err.Error())
		os.Exit(1)
	}

	listener, err := net.ListenTCP("tcp", addr)
	if err != nil {
		fmt.Println("Failed to create listener", err.Error())
		os.Exit(1)
	}

	srv := &Server{
		listener:  listener,
		quit:      make(chan bool),
		waitGroup: &sync.WaitGroup{},
	}
	go srv.serve()
	return srv
}

func (srv *Server) serve() {
	for {
		select {
		case <-srv.quit:
			fmt.Println("Stop listening for new clients")
			srv.listener.Close()
			return
		default:
		}
        srv.listener.SetDeadline(time.Now().Add(1e9))
        conn, err := srv.listener.AcceptTCP()
        if err != nil {
            if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
                continue
            }
            fmt.Println("Failed to accept connection:", err.Error())
        }
        srv.waitGroup.Add(1)
        go srv.handleConnection(conn)
	}
}

func (srv *Server) handleConnection(conn net.Conn) error {
	fmt.Println("Accepted connection from", conn.RemoteAddr())

	defer func() {
		fmt.Println("Closing connection from", conn.RemoteAddr())
		conn.Close()
        srv.waitGroup.Done()
	}()

	buf := make([]byte, 1024)
	_, err := conn.Read(buf)
	if err != nil {
		fmt.Println("Read error", err.Error())
		conn.Close()
	}

	_, err = conn.Write(buf)
	if err != nil {
		fmt.Println("Write error", err.Error())
		conn.Close()
	}

	return nil
}

func (srv *Server) Stop() {
	fmt.Println("Stop requested. Waiting for existing connections...")
	close(srv.quit)
	srv.waitGroup.Wait()
	fmt.Println("Stopped successfully")
}

func main() {
	srv := NewService()
	time.Sleep(100 * time.Second)
	srv.Stop()
}

另外,我还尝试使用从通道读取的工作线程来实现稍微不同的方法。据我理解,这是处理多个连接的更惯用的方式。这个实现是否正确呢?

package main

import (
	"fmt"
	"net"
	"os"
	"sync"
	"time"
)

var conns chan net.Conn

type Server struct {
	listener  *net.TCPListener
	quit      chan bool
	waitGroup *sync.WaitGroup
}

func NewService() *Server {
	addr, err := net.ResolveTCPAddr("tcp4", ":9999")
	if err != nil {
		fmt.Println("Failed to resolve address", err.Error())
		os.Exit(1)
	}

	listener, err := net.ListenTCP("tcp", addr)
	if err != nil {
		fmt.Println("Failed to create listener", err.Error())
		os.Exit(1)
	}

	srv := &Server{
		listener:  listener,
		quit:      make(chan bool),
		waitGroup: &sync.WaitGroup{},
	}

    conns = make(chan net.Conn, 50)

    for i := 0; i < 5; i ++ {
        fmt.Println("Start worker", i)
        srv.waitGroup.Add(1)
        go handleConnection(srv.waitGroup)
    }

	go srv.serve()
	return srv
}

func (srv *Server) serve() {
	for {
		select {
		case <-srv.quit:
			fmt.Println("Stop listening for new clients")
			srv.listener.Close()
            close(conns)
			return
		default:
		}
        srv.listener.SetDeadline(time.Now().Add(1e9))
        conn, err := srv.listener.AcceptTCP()
        if err != nil {
            if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
                continue
            }
            fmt.Println("Failed to accept connection:", err.Error())
        }
        conns <- conn
	}
}

func handleConnection(wg *sync.WaitGroup) {
    defer wg.Done()

    buf := make([]byte, 1024)

    for conn := range conns {
    	fmt.Println("Accepted connection from", conn.RemoteAddr())

        _, err := conn.Read(buf)
        if err != nil {
            fmt.Println("Read error", err.Error())
            conn.Close()
        }

        _, err = conn.Write(buf)
        if err != nil {
            fmt.Println("Write error", err.Error())
            conn.Close()
        }

        fmt.Println("Closing connection from", conn.RemoteAddr())
        conn.Close()
    }
}

func (srv *Server) Stop() {
	fmt.Println("Stop requested. Waiting for existing connections...")
	close(srv.quit)
	srv.waitGroup.Wait()
	fmt.Println("Stopped successfully")
}

func main() {
	srv := NewService()
	time.Sleep(100 * time.Second)
	srv.Stop()
}

更多关于Golang中如何正确关闭监听套接字服务器的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何正确关闭监听套接字服务器的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你的两个实现都展示了在Go中优雅关闭监听套接字服务器的不同方法,但都存在一些潜在问题。以下是详细分析:

第一个实现的问题

主要问题在于serve()方法中的竞态条件:

func (srv *Server) serve() {
	for {
		select {
		case <-srv.quit:
			fmt.Println("Stop listening for new clients")
			srv.listener.Close()
			return
		default:
		}
        // 这里可能在检查quit后、AcceptTCP前收到停止信号
        srv.listener.SetDeadline(time.Now().Add(1e9))
        conn, err := srv.listener.AcceptTCP()
        // ...
	}
}

改进版本:

func (srv *Server) serve() {
    for {
        srv.listener.SetDeadline(time.Now().Add(time.Second))
        conn, err := srv.listener.AcceptTCP()
        if err != nil {
            if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
                select {
                case <-srv.quit:
                    fmt.Println("Stop listening for new clients")
                    srv.listener.Close()
                    return
                default:
                    continue
                }
            }
            fmt.Println("Failed to accept connection:", err.Error())
            continue
        }
        srv.waitGroup.Add(1)
        go srv.handleConnection(conn)
    }
}

第二个实现的问题

第二个实现使用了连接池模式,但存在以下问题:

  1. 全局变量问题conns是全局变量,这限制了多个服务器实例
  2. 资源泄漏风险:在Stop()中关闭conns通道时,可能还有goroutine阻塞在通道读取上

改进版本:

type Server struct {
	listener  *net.TCPListener
	quit      chan bool
	waitGroup *sync.WaitGroup
	conns     chan net.Conn
}

func NewService() *Server {
	addr, err := net.ResolveTCPAddr("tcp4", ":9999")
	if err != nil {
		fmt.Println("Failed to resolve address", err.Error())
		os.Exit(1)
	}

	listener, err := net.ListenTCP("tcp", addr)
	if err != nil {
		fmt.Println("Failed to create listener", err.Error())
		os.Exit(1)
	}

	srv := &Server{
		listener:  listener,
		quit:      make(chan bool),
		waitGroup: &sync.WaitGroup{},
		conns:     make(chan net.Conn, 50),
	}

    for i := 0; i < 5; i++ {
        fmt.Println("Start worker", i)
        srv.waitGroup.Add(1)
        go srv.handleConnection()
    }

	go srv.serve()
	return srv
}

func (srv *Server) handleConnection() {
    defer srv.waitGroup.Done()

    buf := make([]byte, 1024)

    for {
        select {
        case conn, ok := <-srv.conns:
            if !ok {
                return
            }
            fmt.Println("Accepted connection from", conn.RemoteAddr())

            _, err := conn.Read(buf)
            if err != nil {
                fmt.Println("Read error", err.Error())
                conn.Close()
                continue
            }

            _, err = conn.Write(buf)
            if err != nil {
                fmt.Println("Write error", err.Error())
                conn.Close()
                continue
            }

            fmt.Println("Closing connection from", conn.RemoteAddr())
            conn.Close()
        case <-srv.quit:
            return
        }
    }
}

func (srv *Server) Stop() {
	fmt.Println("Stop requested. Waiting for existing connections...")
	close(srv.quit)
    close(srv.conns)
	srv.waitGroup.Wait()
	fmt.Println("Stopped successfully")
}

推荐的完整实现

结合两种方法的优点:

package main

import (
	"context"
	"fmt"
	"net"
	"os"
	"sync"
	"time"
)

type Server struct {
	listener  net.Listener
	waitGroup sync.WaitGroup
	ctx       context.Context
	cancel    context.CancelFunc
}

func NewServer(addr string) (*Server, error) {
	listener, err := net.Listen("tcp", addr)
	if err != nil {
		return nil, fmt.Errorf("failed to create listener: %w", err)
	}

	ctx, cancel := context.WithCancel(context.Background())

	srv := &Server{
		listener: listener,
		ctx:      ctx,
		cancel:   cancel,
	}

	srv.waitGroup.Add(1)
	go srv.serve()

	return srv, nil
}

func (srv *Server) serve() {
	defer srv.waitGroup.Done()

	for {
		select {
		case <-srv.ctx.Done():
			return
		default:
		}

		srv.listener.(*net.TCPListener).SetDeadline(time.Now().Add(time.Second))
		conn, err := srv.listener.Accept()
		if err != nil {
			if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
				continue
			}
			if srv.ctx.Err() != nil {
				return
			}
			fmt.Printf("Accept error: %v\n", err)
			continue
		}

		srv.waitGroup.Add(1)
		go srv.handleConnection(conn)
	}
}

func (srv *Server) handleConnection(conn net.Conn) {
	defer srv.waitGroup.Done()
	defer conn.Close()

	fmt.Printf("Accepted connection from %s\n", conn.RemoteAddr())

	buf := make([]byte, 1024)
	for {
		select {
		case <-srv.ctx.Done():
			return
		default:
		}

		conn.SetReadDeadline(time.Now().Add(time.Second))
		n, err := conn.Read(buf)
		if err != nil {
			if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
				continue
			}
			fmt.Printf("Read error from %s: %v\n", conn.RemoteAddr(), err)
			return
		}

		if n == 0 {
			return
		}

		_, err = conn.Write(buf[:n])
		if err != nil {
			fmt.Printf("Write error to %s: %v\n", conn.RemoteAddr(), err)
			return
		}
	}
}

func (srv *Server) Stop() {
	fmt.Println("Stopping server...")
	srv.cancel()
	srv.listener.Close()
	srv.waitGroup.Wait()
	fmt.Println("Server stopped successfully")
}

func main() {
	srv, err := NewServer(":9999")
	if err != nil {
		fmt.Printf("Failed to start server: %v\n", err)
		os.Exit(1)
	}

	time.Sleep(10 * time.Second)
	srv.Stop()
}

这个实现使用了context来处理取消信号,避免了竞态条件,并提供了更好的错误处理。

回到顶部