使用Golang并发发送TCP数据包的最佳实践
使用Golang并发发送TCP数据包的最佳实践 你好!
我最近编写了一些代码,通过发送大量TCP数据包来对代理进行负载测试。起初我是在一个for循环中编写的,但想了解更多关于并发性的知识,于是添加了goroutine。
我注意到,当我添加大量请求时,代码会失败:
signal SIGSEGV: segmentation violation code=0x1 “Fprintf” 以及 runtime error: invalid memory address or nil pointer dereference
来自net.dial的错误是: connect: cannot assign requested address
看起来,一旦goroutine数量超过大约30,000个,可能就没有可用的套接字了。 以下是代码:
package main
import (
"bufio"
"fmt"
"log"
"net"
"os"
"time"
)
func main() {
start := time.Now()
userendpoint := userEndpoint()
userport := userPort()
userpoints := userPoints()
var name string
var port string
var points int
name = *userendpoint
port = *userport
points = *userpoints
c := make(chan string)
fmt.Printf("You selected %s over %s and will send %d times\n", name, port, points)
for k := 0; k < points; k++ {
go sendPoints(name, port, points, c)
}
for i := 0; i < points; i++ {
fmt.Println(<-c)
fmt.Printf("This is the %v pass", i)
}
fmt.Println("took:", time.Since(start))
}
func sendPoints(name, port string, points int, c chan string) {
// connect to this socket
conn, err := net.Dial("tcp", name+":"+port)
if err != nil {
log.Fatal(err)
}
for i := 0; i < points; i++ {
reader := "prod.monitoring.test 1 source=mysource"
fmt.Fprintf(conn, reader+"\n")
//fmt.Printf("This is the %v pass\n", i)
c <- ""
//time.Sleep(100 * time.Millisecond)
}
}
func userEndpoint() *string {
fmt.Println("What is the IP or DNS name you wish to send points to?")
var name string
scanner := bufio.NewScanner(os.Stdin)
scanner.Scan()
name = scanner.Text()
if name == "" {
name = "server.server.com"
}
if err := scanner.Err(); err != nil {
fmt.Println("Error reading from input: ", err)
}
return &name
}
func userPort() *string {
fmt.Println("What is the port you wish to send points to?")
var port string
scanner := bufio.NewScanner(os.Stdin)
scanner.Scan()
port = scanner.Text()
if port == "" {
port = "2878"
}
if err := scanner.Err(); err != nil {
fmt.Println("Error reading from input: ", err)
}
return &port
}
func userPoints() *int {
fmt.Println("How many times do you wish to send?")
var i int
scanner := bufio.NewReader(os.Stdin)
for {
_, err := fmt.Fscan(scanner, &i)
if err == nil && i >= 0 {
break
}
}
return &i
}
有什么想法吗? 提前感谢!
更多关于使用Golang并发发送TCP数据包的最佳实践的实战教程也可以访问 https://www.itying.com/category-94-b0.html
大家好!
感谢大家的评论!
我通过限制 make 命令中创建的通道数量解决了这个问题。
看起来在创建了大约 30,000 个 Go 协程后,程序就会失败。
祝大家周末愉快!
更多关于使用Golang并发发送TCP数据包的最佳实践的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
他们说net.Dial返回了一个错误……不过说实话,从展示的代码来看,确实不清楚机器人是怎么发生的……
在这种情况下,我预计 net.Dial 会返回一个错误。由于 log.Fatal 的存在,这将导致程序终止。除非 conn 和 err 两者都为 nil,否则他不会收到 SIGSEGV 信号。
func main() {
fmt.Println("hello world")
}
一个进程能使用的文件描述符数量是有限的,你建立的每个连接都会占用1个文件描述符,如果目标主机在同一台机器上,则会占用2个。
也许你达到了这个限制?
你可以使用 ulimit 工具来检查这个限制。
func main() {
fmt.Println("hello world")
}
不清楚错误是什么。
错误发生时 points 的值是多少?
如果 net.Dial 返回错误,程序会因 log.Fatal 而终止,你不可能收到 SIGSEGV 信号。你怎么可能同时收到错误和 SIGSEGV 信号呢?
当 conn 为 nil 时可能会发生 SIGSEGV。你检查过当 net.Dial 返回的 err 为 nil 时,conn 是否不为 nil 吗?
请注意,当 sendPoints 完成后,你应该关闭 conn。这将释放资源。
另外需要注意的是,你不需要返回指向 int 和 string 的指针。直接返回值就足够了。
你的代码存在几个关键问题,导致在高并发场景下失败。主要问题是资源耗尽和goroutine管理不当。
问题分析
- 套接字耗尽:每个goroutine创建独立的TCP连接,当超过系统限制时会出现"cannot assign requested address"错误
- 内存泄漏:连接没有正确关闭
- 并发控制缺失:一次性创建大量goroutine会导致资源竞争
- 通道使用错误:每个goroutine向通道发送多个消息,但主程序只接收固定数量
改进方案
package main
import (
"bufio"
"fmt"
"log"
"net"
"os"
"sync"
"time"
)
type Config struct {
Endpoint string
Port string
Points int
Workers int
}
func main() {
config := getConfig()
start := time.Now()
fmt.Printf("Sending %d points to %s:%s using %d workers\n",
config.Points, config.Endpoint, config.Port, config.Workers)
// 使用工作池模式控制并发
results := make(chan string, config.Points)
var wg sync.WaitGroup
// 创建worker池
for w := 0; w < config.Workers; w++ {
wg.Add(1)
go worker(w, config, results, &wg)
}
// 等待所有worker完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
count := 0
for result := range results {
count++
if count%1000 == 0 {
fmt.Printf("Processed %d/%d points\n", count, config.Points)
}
_ = result // 处理结果,这里简单丢弃
}
fmt.Printf("Total time: %v\n", time.Since(start))
}
func worker(id int, config Config, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
// 每个worker复用连接
conn, err := net.Dial("tcp", config.Endpoint+":"+config.Port)
if err != nil {
log.Printf("Worker %d: failed to connect: %v", id, err)
return
}
defer conn.Close()
// 设置连接超时
conn.SetDeadline(time.Now().Add(30 * time.Second))
// 每个worker处理一部分任务
batchSize := config.Points / config.Workers
start := id * batchSize
end := start + batchSize
if id == config.Workers-1 {
end = config.Points // 最后一个worker处理剩余部分
}
for i := start; i < end; i++ {
message := fmt.Sprintf("prod.monitoring.test 1 source=mysource %d\n", i)
_, err := conn.Write([]byte(message))
if err != nil {
log.Printf("Worker %d: write failed: %v", id, err)
// 尝试重新连接
conn.Close()
conn, err = net.Dial("tcp", config.Endpoint+":"+config.Port)
if err != nil {
log.Printf("Worker %d: reconnection failed: %v", id, err)
return
}
conn.SetDeadline(time.Now().Add(30 * time.Second))
continue
}
// 可选:读取响应(如果需要)
// buf := make([]byte, 1024)
// n, _ := conn.Read(buf)
results <- fmt.Sprintf("Worker %d sent point %d", id, i)
}
}
func getConfig() Config {
reader := bufio.NewReader(os.Stdin)
fmt.Print("Endpoint (default: server.server.com): ")
endpoint, _ := reader.ReadString('\n')
if endpoint == "\n" {
endpoint = "server.server.com"
} else {
endpoint = endpoint[:len(endpoint)-1] // 去除换行符
}
fmt.Print("Port (default: 2878): ")
port, _ := reader.ReadString('\n')
if port == "\n" {
port = "2878"
} else {
port = port[:len(port)-1]
}
fmt.Print("Total points to send: ")
var points int
fmt.Scanf("%d", &points)
fmt.Print("Number of workers (default: 1000): ")
var workers int
_, err := fmt.Scanf("%d", &workers)
if err != nil || workers <= 0 {
workers = 1000
}
// 限制最大worker数量,避免资源耗尽
if workers > 5000 {
workers = 5000
fmt.Printf("Warning: Limiting workers to %d to avoid resource exhaustion\n", workers)
}
return Config{
Endpoint: endpoint,
Port: port,
Points: points,
Workers: workers,
}
}
关键改进点
- 工作池模式:限制并发goroutine数量,避免系统资源耗尽
- 连接复用:每个worker复用TCP连接,减少套接字创建
- 连接管理:正确关闭连接,设置超时时间
- 错误处理:添加重连机制,避免单点失败影响整个测试
- 资源限制:自动限制最大worker数量
- 进度显示:定期显示处理进度
性能优化建议
如果需要更高性能,可以考虑以下额外优化:
// 使用连接池
type ConnectionPool struct {
connections chan net.Conn
factory func() (net.Conn, error)
}
// 批量发送数据
func batchSend(conn net.Conn, messages []string) error {
var buf []byte
for _, msg := range messages {
buf = append(buf, []byte(msg+"\n")...)
}
_, err := conn.Write(buf)
return err
}
// 使用sync.Pool减少内存分配
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
这个改进版本可以处理数百万个请求而不会导致系统资源耗尽。通过控制并发数量和合理复用资源,避免了原始代码中的套接字耗尽问题。

