使用Golang并发发送TCP数据包的最佳实践

使用Golang并发发送TCP数据包的最佳实践 你好!

我最近编写了一些代码,通过发送大量TCP数据包来对代理进行负载测试。起初我是在一个for循环中编写的,但想了解更多关于并发性的知识,于是添加了goroutine。

我注意到,当我添加大量请求时,代码会失败:

signal SIGSEGV: segmentation violation code=0x1 “Fprintf” 以及 runtime error: invalid memory address or nil pointer dereference

来自net.dial的错误是: connect: cannot assign requested address

看起来,一旦goroutine数量超过大约30,000个,可能就没有可用的套接字了。 以下是代码:

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"os"
	"time"
)

func main() {
	start := time.Now()
	userendpoint := userEndpoint()
	userport := userPort()
	userpoints := userPoints()

	var name string
	var port string
	var points int

	name = *userendpoint
	port = *userport
	points = *userpoints

	c := make(chan string)

	fmt.Printf("You selected %s over %s and will send %d times\n", name, port, points)

	for k := 0; k < points; k++ {
		go sendPoints(name, port, points, c)
	}
	for i := 0; i < points; i++ {
		fmt.Println(<-c)
		fmt.Printf("This is the %v pass", i)
	}
	fmt.Println("took:", time.Since(start))

}

func sendPoints(name, port string, points int, c chan string) {
	// connect to this socket
	conn, err := net.Dial("tcp", name+":"+port)
	if err != nil {
		log.Fatal(err)
	}
	for i := 0; i < points; i++ {
		reader := "prod.monitoring.test 1 source=mysource"
		fmt.Fprintf(conn, reader+"\n")
		//fmt.Printf("This is the %v pass\n", i)

		c <- ""
		//time.Sleep(100 * time.Millisecond)
	}
}

func userEndpoint() *string {
	fmt.Println("What is the IP or DNS name you wish to send points to?")
	var name string
	scanner := bufio.NewScanner(os.Stdin)
	scanner.Scan()
	name = scanner.Text()
	if name == "" {
		name = "server.server.com"
	}
	if err := scanner.Err(); err != nil {
		fmt.Println("Error reading from input: ", err)
	}
	return &name
}

func userPort() *string {
	fmt.Println("What is the port you wish to send points to?")
	var port string
	scanner := bufio.NewScanner(os.Stdin)
	scanner.Scan()
	port = scanner.Text()
	if port == "" {
		port = "2878"
	}
	if err := scanner.Err(); err != nil {
		fmt.Println("Error reading from input: ", err)
	}
	return &port
}

func userPoints() *int {
	fmt.Println("How many times do you wish to send?")
	var i int
	scanner := bufio.NewReader(os.Stdin)

	for {
		_, err := fmt.Fscan(scanner, &i)
		if err == nil && i >= 0 {
			break
		}
	}
	return &i
}

有什么想法吗? 提前感谢!


更多关于使用Golang并发发送TCP数据包的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

大家好!

感谢大家的评论!

我通过限制 make 命令中创建的通道数量解决了这个问题。 看起来在创建了大约 30,000 个 Go 协程后,程序就会失败。

祝大家周末愉快!

更多关于使用Golang并发发送TCP数据包的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


他们说net.Dial返回了一个错误……不过说实话,从展示的代码来看,确实不清楚机器人是怎么发生的……


在这种情况下,我预计 net.Dial 会返回一个错误。由于 log.Fatal 的存在,这将导致程序终止。除非 connerr 两者都为 nil,否则他不会收到 SIGSEGV 信号。

func main() {
    fmt.Println("hello world")
}

一个进程能使用的文件描述符数量是有限的,你建立的每个连接都会占用1个文件描述符,如果目标主机在同一台机器上,则会占用2个。

也许你达到了这个限制?

你可以使用 ulimit 工具来检查这个限制。

func main() {
    fmt.Println("hello world")
}

不清楚错误是什么。

错误发生时 points 的值是多少? 如果 net.Dial 返回错误,程序会因 log.Fatal 而终止,你不可能收到 SIGSEGV 信号。你怎么可能同时收到错误和 SIGSEGV 信号呢?

conn 为 nil 时可能会发生 SIGSEGV。你检查过当 net.Dial 返回的 err 为 nil 时,conn 是否不为 nil 吗?

请注意,当 sendPoints 完成后,你应该关闭 conn。这将释放资源。

另外需要注意的是,你不需要返回指向 intstring 的指针。直接返回值就足够了。

你的代码存在几个关键问题,导致在高并发场景下失败。主要问题是资源耗尽和goroutine管理不当。

问题分析

  1. 套接字耗尽:每个goroutine创建独立的TCP连接,当超过系统限制时会出现"cannot assign requested address"错误
  2. 内存泄漏:连接没有正确关闭
  3. 并发控制缺失:一次性创建大量goroutine会导致资源竞争
  4. 通道使用错误:每个goroutine向通道发送多个消息,但主程序只接收固定数量

改进方案

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"os"
	"sync"
	"time"
)

type Config struct {
	Endpoint string
	Port     string
	Points   int
	Workers  int
}

func main() {
	config := getConfig()
	start := time.Now()

	fmt.Printf("Sending %d points to %s:%s using %d workers\n", 
		config.Points, config.Endpoint, config.Port, config.Workers)

	// 使用工作池模式控制并发
	results := make(chan string, config.Points)
	var wg sync.WaitGroup

	// 创建worker池
	for w := 0; w < config.Workers; w++ {
		wg.Add(1)
		go worker(w, config, results, &wg)
	}

	// 等待所有worker完成
	go func() {
		wg.Wait()
		close(results)
	}()

	// 收集结果
	count := 0
	for result := range results {
		count++
		if count%1000 == 0 {
			fmt.Printf("Processed %d/%d points\n", count, config.Points)
		}
		_ = result // 处理结果,这里简单丢弃
	}

	fmt.Printf("Total time: %v\n", time.Since(start))
}

func worker(id int, config Config, results chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()

	// 每个worker复用连接
	conn, err := net.Dial("tcp", config.Endpoint+":"+config.Port)
	if err != nil {
		log.Printf("Worker %d: failed to connect: %v", id, err)
		return
	}
	defer conn.Close()

	// 设置连接超时
	conn.SetDeadline(time.Now().Add(30 * time.Second))

	// 每个worker处理一部分任务
	batchSize := config.Points / config.Workers
	start := id * batchSize
	end := start + batchSize
	if id == config.Workers-1 {
		end = config.Points // 最后一个worker处理剩余部分
	}

	for i := start; i < end; i++ {
		message := fmt.Sprintf("prod.monitoring.test 1 source=mysource %d\n", i)
		
		_, err := conn.Write([]byte(message))
		if err != nil {
			log.Printf("Worker %d: write failed: %v", id, err)
			// 尝试重新连接
			conn.Close()
			conn, err = net.Dial("tcp", config.Endpoint+":"+config.Port)
			if err != nil {
				log.Printf("Worker %d: reconnection failed: %v", id, err)
				return
			}
			conn.SetDeadline(time.Now().Add(30 * time.Second))
			continue
		}

		// 可选:读取响应(如果需要)
		// buf := make([]byte, 1024)
		// n, _ := conn.Read(buf)

		results <- fmt.Sprintf("Worker %d sent point %d", id, i)
	}
}

func getConfig() Config {
	reader := bufio.NewReader(os.Stdin)
	
	fmt.Print("Endpoint (default: server.server.com): ")
	endpoint, _ := reader.ReadString('\n')
	if endpoint == "\n" {
		endpoint = "server.server.com"
	} else {
		endpoint = endpoint[:len(endpoint)-1] // 去除换行符
	}

	fmt.Print("Port (default: 2878): ")
	port, _ := reader.ReadString('\n')
	if port == "\n" {
		port = "2878"
	} else {
		port = port[:len(port)-1]
	}

	fmt.Print("Total points to send: ")
	var points int
	fmt.Scanf("%d", &points)

	fmt.Print("Number of workers (default: 1000): ")
	var workers int
	_, err := fmt.Scanf("%d", &workers)
	if err != nil || workers <= 0 {
		workers = 1000
	}

	// 限制最大worker数量,避免资源耗尽
	if workers > 5000 {
		workers = 5000
		fmt.Printf("Warning: Limiting workers to %d to avoid resource exhaustion\n", workers)
	}

	return Config{
		Endpoint: endpoint,
		Port:     port,
		Points:   points,
		Workers:  workers,
	}
}

关键改进点

  1. 工作池模式:限制并发goroutine数量,避免系统资源耗尽
  2. 连接复用:每个worker复用TCP连接,减少套接字创建
  3. 连接管理:正确关闭连接,设置超时时间
  4. 错误处理:添加重连机制,避免单点失败影响整个测试
  5. 资源限制:自动限制最大worker数量
  6. 进度显示:定期显示处理进度

性能优化建议

如果需要更高性能,可以考虑以下额外优化:

// 使用连接池
type ConnectionPool struct {
	connections chan net.Conn
	factory     func() (net.Conn, error)
}

// 批量发送数据
func batchSend(conn net.Conn, messages []string) error {
	var buf []byte
	for _, msg := range messages {
		buf = append(buf, []byte(msg+"\n")...)
	}
	_, err := conn.Write(buf)
	return err
}

// 使用sync.Pool减少内存分配
var bufPool = sync.Pool{
	New: func() interface{} {
		return make([]byte, 1024)
	},
}

这个改进版本可以处理数百万个请求而不会导致系统资源耗尽。通过控制并发数量和合理复用资源,避免了原始代码中的套接字耗尽问题。

回到顶部