使用Golang协程实现多文件FTP下载
使用Golang协程实现多文件FTP下载
我正在尝试使用 github.com/jlaffaye/ftp 库并发下载多个文件。
非并发方式(工作正常)
func main() {
c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
if err != nil {
fmt.Println(err)
}
defer c.Quit()
err = c.Login(loginID, loginPasw)
if err != nil {
fmt.Println(err)
}
entries, err := c.List(remoteDir)
if err != nil {
fmt.Println(err)
}
for _, entry := range entries {
res, err := c.Retr(remoteDir + entry.Name)
if err != nil {
fmt.Println(err)
}
outFile, err := os.Create(localDir + entry.Name)
if err != nil {
fmt.Println(err)
}
defer outFile.Close()
_, err = io.Copy(outFile, res)
if err != nil {
fmt.Println(err)
}
res.Close()
}
}
尝试并发运行下载
func main() {
c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
if err != nil {
fmt.Println(err)
}
defer c.Quit()
err = c.Login(loginID, loginPasw)
if err != nil {
fmt.Println(err)
}
entries, err := c.List(remoteDir)
if err != nil {
fmt.Println(err)
}
var wg sync.WaitGroup
for _, entry := range entries {
wg.Add(1)
go func(entry *ftp.Entry) {
defer wg.Done()
res, err := c.Retr(remoteDir + entry.Name)
if err != nil {
fmt.Println(entry.Name)
fmt.Println(err)
}
defer res.Close()
outFile, err := os.Create(localDir + entry.Name)
if err != nil {
fmt.Println(err)
}
defer outFile.Close()
_, err = io.Copy(outFile, res)
if err != nil {
fmt.Println(err)
}
}(entry)
}
wg.Wait()
}
段错误
panic: runtime error: invalid memory address or nil pointer dereference
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0x5bca98]
goroutine 22 [running]:
github.com/jlaffaye/ftp.(*Response).Close(0xf?)
/home/jeanluc/golang/src/github.com/jlaffaye/ftp/ftp.go:1134 +0x18
panic({0x5dcdc0, 0x77a340})
/usr/lib/go-1.19/src/runtime/panic.go:884 +0x212
github.com/jlaffaye/ftp.(*Response).Read(0x10?, {0xc00016c000?, 0xc0000169c0?, 0xc000054c00?})
/home/jeanluc/golang/src/github.com/jlaffaye/ftp/ftp.go:1128 +0x19
io.copyBuffer({0x668100, 0xc0000169c0}, {0x6679e0, 0x0}, {0x0, 0x0, 0x0})
/usr/lib/go-1.19/src/io/io.go:427 +0x1b2
io.Copy(...)
/usr/lib/go-1.19/src/io/io.go:386
os.genericReadFrom(0xc000063e38?, {0x6679e0, 0x0})
/usr/lib/go-1.19/src/os/file.go:162 +0x67
os.(*File).ReadFrom(0xc000014060, {0x6679e0, 0x0})
/usr/lib/go-1.19/src/os/file.go:156 +0x1b0
io.copyBuffer({0x667c00, 0xc000014060}, {0x6679e0, 0x0}, {0x0, 0x0, 0x0})
/usr/lib/go-1.19/src/io/io.go:413 +0x14b
io.Copy(...)
/usr/lib/go-1.19/src/io/io.go:386
main.main.func1(0xc0000a0320)
/home/jeanluc/golang/src/jeanluc/myftp2/myftp2.go:60 +0x28a
created by main.main
/home/jeanluc/golang/src/jeanluc/myftp2/myftp2.go:44 +0x1ff
exit status 2
第44行:go func(entry *ftp.Entry) {
第60行:_, err = io.Copy(outFile, res)
当我只保留文件创建部分时,它可以正常工作,并且在 localDir 中创建了空文件。
for _, entry := range entries {
wg.Add(1)
go func(entry *ftp.Entry) {
defer wg.Done()
outFile, err := os.Create(localDir + entry.Name)
if err != nil {
fmt.Println(err)
}
defer outFile.Close()
}(entry)
}
wg.Wait()
}
我过去使用过 go func() 字面量函数,从未遇到过这样的问题。由于我无法完全理解 Go 通道的概念,我更喜欢像上面尝试的那样使用 Go 协程。
FTP 连接是不可共享的吗?
有什么想法吗?
更多关于使用Golang协程实现多文件FTP下载的实战教程也可以访问 https://www.itying.com/category-94-b0.html
不幸的是,我必须使用的协议是FTPS,这是SFTP的TLS版本。
更多关于使用Golang协程实现多文件FTP下载的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
看起来使用同一个连接是不安全的。 Dial() 返回一个 ServerConn
ServerConn 表示与远程 FTP 服务器的连接。单个连接仅支持一个进行中的数据连接。它不支持并发调用。
ftp package - github.com/jlaffaye/ftp - Go Packages
Package ftp implements a FTP client as described in RFC 959.
确实。ftp.Dial 函数必须放在 goroutine 中。每个 goroutine 使用一个 ServerConn。
func main() {
c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
if err != nil {
fmt.Println(err)
}
defer c.Quit()
err = c.Login(loginID, loginPasw)
if err != nil {
fmt.Println(err)
}
entries, err := c.List(remoteDir)
if err != nil {
fmt.Println(err)
}
var wg sync.WaitGroup
for _, entry := range entries {
wg.Add(1)
go func(entry *ftp.Entry) {
defer wg.Done()
c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
if err != nil {
fmt.Println(err)
}
defer c.Quit()
err = c.Login(loginID, loginPasw)
if err != nil {
fmt.Println(err)
}
res, err := c.Retr(remoteDir + entry.Name)
if err != nil {
fmt.Println(entry.Name)
fmt.Println(err)
}
defer res.Close()
outFile, err := os.Create(localDir + entry.Name)
if err != nil {
fmt.Println(err)
}
defer outFile.Close()
_, err = io.Copy(outFile, res)
if err != nil {
fmt.Println(err)
}
}(entry)
}
wg.Wait()
}
你的并发实现存在几个关键问题。主要问题是FTP连接不是并发安全的,多个goroutine同时使用同一个连接会导致竞争条件和段错误。
问题分析
- FTP连接不是并发安全的:
github.com/jlaffaye/ftp的Client类型不支持多个goroutine同时调用其方法 - 变量捕获问题:在循环中直接使用
entry变量可能导致所有goroutine使用同一个entry - 错误处理不完善:发生错误时没有正确处理goroutine退出
解决方案
方案1:为每个goroutine创建独立的FTP连接(推荐)
package main
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/jlaffaye/ftp"
)
func downloadFile(host, user, pass, remotePath, localPath string) error {
// 为每个文件创建独立的连接
c, err := ftp.Dial(host, ftp.DialWithTimeout(5*time.Second))
if err != nil {
return fmt.Errorf("dial failed: %w", err)
}
defer c.Quit()
err = c.Login(user, pass)
if err != nil {
return fmt.Errorf("login failed: %w", err)
}
// 下载文件
res, err := c.Retr(remotePath)
if err != nil {
return fmt.Errorf("retr failed: %w", err)
}
defer res.Close()
outFile, err := os.Create(localPath)
if err != nil {
return fmt.Errorf("create file failed: %w", err)
}
defer outFile.Close()
_, err = io.Copy(outFile, res)
if err != nil {
return fmt.Errorf("copy failed: %w", err)
}
return nil
}
func main() {
const (
remoteHost = "ftp.example.com"
loginID = "username"
loginPasw = "password"
remoteDir = "/remote/path/"
localDir = "./downloads/"
)
// 先获取文件列表(使用主连接)
c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
if err != nil {
fmt.Printf("Dial error: %v\n", err)
return
}
defer c.Quit()
err = c.Login(loginID, loginPasw)
if err != nil {
fmt.Printf("Login error: %v\n", err)
return
}
entries, err := c.List(remoteDir)
if err != nil {
fmt.Printf("List error: %v\n", err)
return
}
// 创建本地目录
os.MkdirAll(localDir, 0755)
var wg sync.WaitGroup
errChan := make(chan error, len(entries))
for _, entry := range entries {
wg.Add(1)
// 复制变量,避免闭包捕获问题
remotePath := remoteDir + entry.Name
localPath := localDir + entry.Name
fileName := entry.Name
go func() {
defer wg.Done()
err := downloadFile(remoteHost, loginID, loginPasw, remotePath, localPath)
if err != nil {
errChan <- fmt.Errorf("failed to download %s: %w", fileName, err)
} else {
fmt.Printf("Successfully downloaded: %s\n", fileName)
}
}()
}
// 等待所有下载完成
wg.Wait()
close(errChan)
// 输出错误信息
for err := range errChan {
fmt.Printf("Error: %v\n", err)
}
fmt.Println("All downloads completed")
}
方案2:使用连接池(更高效)
package main
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/jlaffaye/ftp"
)
type FTPPool struct {
host string
user string
pass string
pool chan *ftp.ServerConn
mu sync.Mutex
closed bool
}
func NewFTPPool(host, user, pass string, poolSize int) (*FTPPool, error) {
pool := &FTPPool{
host: host,
user: user,
pass: pass,
pool: make(chan *ftp.ServerConn, poolSize),
}
// 初始化连接池
for i := 0; i < poolSize; i++ {
conn, err := createConnection(host, user, pass)
if err != nil {
pool.Close()
return nil, err
}
pool.pool <- conn
}
return pool, nil
}
func createConnection(host, user, pass string) (*ftp.ServerConn, error) {
c, err := ftp.Dial(host, ftp.DialWithTimeout(5*time.Second))
if err != nil {
return nil, err
}
err = c.Login(user, pass)
if err != nil {
c.Quit()
return nil, err
}
return c, nil
}
func (p *FTPPool) Get() (*ftp.ServerConn, error) {
select {
case conn := <-p.pool:
return conn, nil
default:
return createConnection(p.host, p.user, p.pass)
}
}
func (p *FTPPool) Put(conn *ftp.ServerConn) {
if conn == nil {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
conn.Quit()
return
}
select {
case p.pool <- conn:
default:
conn.Quit()
}
}
func (p *FTPPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return
}
p.closed = true
close(p.pool)
for conn := range p.pool {
conn.Quit()
}
}
func main() {
const (
remoteHost = "ftp.example.com"
loginID = "username"
loginPasw = "password"
remoteDir = "/remote/path/"
localDir = "./downloads/"
poolSize = 5 // 并发连接数
)
// 创建连接池
pool, err := NewFTPPool(remoteHost, loginID, loginPasw, poolSize)
if err != nil {
fmt.Printf("Failed to create pool: %v\n", err)
return
}
defer pool.Close()
// 获取文件列表
conn, err := pool.Get()
if err != nil {
fmt.Printf("Failed to get connection: %v\n", err)
return
}
entries, err := conn.List(remoteDir)
pool.Put(conn)
if err != nil {
fmt.Printf("List error: %v\n", err)
return
}
// 创建本地目录
os.MkdirAll(localDir, 0755)
var wg sync.WaitGroup
semaphore := make(chan struct{}, poolSize) // 控制并发数
for _, entry := range entries {
wg.Add(1)
// 复制变量
fileName := entry.Name
remotePath := remoteDir + fileName
localPath := localDir + fileName
go func() {
defer wg.Done()
// 获取信号量
semaphore <- struct{}{}
defer func() { <-semaphore }()
// 从连接池获取连接
conn, err := pool.Get()
if err != nil {
fmt.Printf("Failed to get connection for %s: %v\n", fileName, err)
return
}
defer pool.Put(conn)
// 下载文件
res, err := conn.Retr(remotePath)
if err != nil {
fmt.Printf("Failed to retrieve %s: %v\n", fileName, err)
return
}
defer res.Close()
outFile, err := os.Create(localPath)
if err != nil {
fmt.Printf("Failed to create file %s: %v\n", fileName, err)
return
}
defer outFile.Close()
_, err = io.Copy(outFile, res)
if err != nil {
fmt.Printf("Failed to copy %s: %v\n", fileName, err)
return
}
fmt.Printf("Successfully downloaded: %s\n", fileName)
}()
}
wg.Wait()
fmt.Println("All downloads completed")
}
方案3:使用工作池模式
package main
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/jlaffaye/ftp"
)
type DownloadTask struct {
RemotePath string
LocalPath string
FileName string
}
func worker(id int, tasks <-chan DownloadTask, host, user, pass string, wg *sync.WaitGroup) {
defer wg.Done()
// 每个worker有自己的连接
c, err := ftp.Dial(host, ftp.DialWithTimeout(5*time.Second))
if err != nil {
fmt.Printf("Worker %d: dial failed: %v\n", id, err)
return
}
defer c.Quit()
err = c.Login(user, pass)
if err != nil {
fmt.Printf("Worker %d: login failed: %v\n", id, err)
return
}
for task := range tasks {
fmt.Printf("Worker %d: downloading %s\n", id, task.FileName)
res, err := c.Retr(task.RemotePath)
if err != nil {
fmt.Printf("Worker %d: failed to retrieve %s: %v\n", id, task.FileName, err)
continue
}
outFile, err := os.Create(task.LocalPath)
if err != nil {
res.Close()
fmt.Printf("Worker %d: failed to create file %s: %v\n", id, task.FileName, err)
continue
}
_, err = io.Copy(outFile, res)
res.Close()
outFile.Close()
if err != nil {
fmt.Printf("Worker %d: failed to copy %s: %v\n", id, task.FileName, err)
os.Remove(task.LocalPath)
} else {
fmt.Printf("Worker %d: successfully downloaded %s\n", id, task.FileName)
}
}
}
func main() {
const (
remoteHost = "ftp.example.com"
loginID = "username"
loginPasw = "password"
remoteDir = "/remote/path/"
localDir = "./downloads/"
numWorkers = 5
)
// 获取文件列表
c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
if err != nil {
fmt.Printf("Dial error: %v\n", err)
return
}
defer c.Quit()
err = c.Login(loginID, loginPasw)
if err != nil {
fmt.Printf("Login error: %v\n", err)
return
}
entries, err := c.List(remoteDir)
if err != nil {
fmt.Printf("List error: %v\n", err)
return
}
// 创建本地目录
os.MkdirAll(localDir, 0755)
// 创建任务通道
tasks := make(chan DownloadTask, len(entries))
// 启动worker
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, tasks, remoteHost, loginID, loginPasw, &wg)
}
// 发送任务
for _, entry := range entries {
tasks <- DownloadTask{
RemotePath: remoteDir + entry.Name,
LocalPath: localDir + entry.Name,
FileName: entry.Name,
}
}
close(tasks)
wg.Wait()
fmt.Println("All downloads completed")
}
关键改进点
- 每个goroutine使用独立连接:避免并发访问同一个FTP连接
- 正确的变量捕获:在循环内复制变量值,避免所有goroutine共享同一个变量
- 连接池管理:控制并发连接数,避免服务器过载
- 完善的错误处理:确保goroutine正确退出,资源正确释放
- 并发控制:使用信号量或工作池控制最大并发数
方案1最简单直接,适合大多数场景。方案2和方案3更适合需要大量并发下载的生产环境。


