使用Golang协程实现多文件FTP下载

使用Golang协程实现多文件FTP下载 我正在尝试使用 github.com/jlaffaye/ftp 库并发下载多个文件。

非并发方式(工作正常)

func main() {
	c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
	if err != nil {
		fmt.Println(err)
	}
	defer c.Quit()
	
	err = c.Login(loginID, loginPasw)
	if err != nil {
		fmt.Println(err)
	}
	
	entries, err := c.List(remoteDir)
	if err != nil {
		fmt.Println(err)
	}
	
	for _, entry := range entries {
	
		res, err := c.Retr(remoteDir + entry.Name)
		if err != nil {
			fmt.Println(err)
		}
	
		outFile, err := os.Create(localDir + entry.Name)
		if err != nil {
			fmt.Println(err)
		}
		defer outFile.Close()
	
		_, err = io.Copy(outFile, res)
		if err != nil {
			fmt.Println(err)
		}
		res.Close()
	}
}

尝试并发运行下载

func main() {
	c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
	if err != nil {
		fmt.Println(err)
	}
	defer c.Quit()

	err = c.Login(loginID, loginPasw)
	if err != nil {
		fmt.Println(err)
	}

	entries, err := c.List(remoteDir)
	if err != nil {
		fmt.Println(err)
	}

	var wg sync.WaitGroup

	for _, entry := range entries {

		wg.Add(1)
		go func(entry *ftp.Entry) {
			defer wg.Done()

			res, err := c.Retr(remoteDir + entry.Name)
			if err != nil {
				fmt.Println(entry.Name)
				fmt.Println(err)
			}
			defer res.Close()

			outFile, err := os.Create(localDir + entry.Name)
			if err != nil {
				fmt.Println(err)
			}
			defer outFile.Close()

			_, err = io.Copy(outFile, res)
			if err != nil {
				fmt.Println(err)
			}

		}(entry)

	}
	wg.Wait()
}

段错误

panic: runtime error: invalid memory address or nil pointer dereference
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0x5bca98]

goroutine 22 [running]:
github.com/jlaffaye/ftp.(*Response).Close(0xf?)
	/home/jeanluc/golang/src/github.com/jlaffaye/ftp/ftp.go:1134 +0x18
panic({0x5dcdc0, 0x77a340})
	/usr/lib/go-1.19/src/runtime/panic.go:884 +0x212
github.com/jlaffaye/ftp.(*Response).Read(0x10?, {0xc00016c000?, 0xc0000169c0?, 0xc000054c00?})
	/home/jeanluc/golang/src/github.com/jlaffaye/ftp/ftp.go:1128 +0x19
io.copyBuffer({0x668100, 0xc0000169c0}, {0x6679e0, 0x0}, {0x0, 0x0, 0x0})
	/usr/lib/go-1.19/src/io/io.go:427 +0x1b2
io.Copy(...)
	/usr/lib/go-1.19/src/io/io.go:386
os.genericReadFrom(0xc000063e38?, {0x6679e0, 0x0})
	/usr/lib/go-1.19/src/os/file.go:162 +0x67
os.(*File).ReadFrom(0xc000014060, {0x6679e0, 0x0})
	/usr/lib/go-1.19/src/os/file.go:156 +0x1b0
io.copyBuffer({0x667c00, 0xc000014060}, {0x6679e0, 0x0}, {0x0, 0x0, 0x0})
	/usr/lib/go-1.19/src/io/io.go:413 +0x14b
io.Copy(...)
	/usr/lib/go-1.19/src/io/io.go:386
main.main.func1(0xc0000a0320)
	/home/jeanluc/golang/src/jeanluc/myftp2/myftp2.go:60 +0x28a
created by main.main
	/home/jeanluc/golang/src/jeanluc/myftp2/myftp2.go:44 +0x1ff
exit status 2

第44行:go func(entry *ftp.Entry) { 第60行:_, err = io.Copy(outFile, res)

当我只保留文件创建部分时,它可以正常工作,并且在 localDir 中创建了空文件。

	for _, entry := range entries {

		wg.Add(1)
		go func(entry *ftp.Entry) {
			defer wg.Done()

			outFile, err := os.Create(localDir + entry.Name)
			if err != nil {
				fmt.Println(err)
			}
			defer outFile.Close()

		}(entry)

	}
	wg.Wait()
}

我过去使用过 go func() 字面量函数,从未遇到过这样的问题。由于我无法完全理解 Go 通道的概念,我更喜欢像上面尝试的那样使用 Go 协程。

FTP 连接是不可共享的吗?

有什么想法吗?


更多关于使用Golang协程实现多文件FTP下载的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

不幸的是,我必须使用的协议是FTPS,这是SFTP的TLS版本。

更多关于使用Golang协程实现多文件FTP下载的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


您也可以查看标准的 sftp 包。它支持以分块方式进行并发读写。

看起来使用同一个连接是不安全的。 Dial() 返回一个 ServerConn

ServerConn 表示与远程 FTP 服务器的连接。单个连接仅支持一个进行中的数据连接。它不支持并发调用。

ftp package - github.com/jlaffaye/ftp - Go Packages

Package ftp implements a FTP client as described in RFC 959.

确实。ftp.Dial 函数必须放在 goroutine 中。每个 goroutine 使用一个 ServerConn

func main() {

	c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
	if err != nil {
		fmt.Println(err)
	}
	defer c.Quit()

	err = c.Login(loginID, loginPasw)
	if err != nil {
		fmt.Println(err)
	}

	entries, err := c.List(remoteDir)
	if err != nil {
		fmt.Println(err)
	}

	var wg sync.WaitGroup
	for _, entry := range entries {

		wg.Add(1)
		go func(entry *ftp.Entry) {
			defer wg.Done()

			c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
			if err != nil {
				fmt.Println(err)
			}
			defer c.Quit()

			err = c.Login(loginID, loginPasw)
			if err != nil {
				fmt.Println(err)
			}

			res, err := c.Retr(remoteDir + entry.Name)
			if err != nil {
				fmt.Println(entry.Name)
				fmt.Println(err)
			}
			defer res.Close()

			outFile, err := os.Create(localDir + entry.Name)
			if err != nil {
				fmt.Println(err)
			}
			defer outFile.Close()

			_, err = io.Copy(outFile, res)
			if err != nil {
				fmt.Println(err)
			}
		}(entry)

	}
	wg.Wait()
}

你的并发实现存在几个关键问题。主要问题是FTP连接不是并发安全的,多个goroutine同时使用同一个连接会导致竞争条件和段错误。

问题分析

  1. FTP连接不是并发安全的github.com/jlaffaye/ftpClient 类型不支持多个goroutine同时调用其方法
  2. 变量捕获问题:在循环中直接使用 entry 变量可能导致所有goroutine使用同一个entry
  3. 错误处理不完善:发生错误时没有正确处理goroutine退出

解决方案

方案1:为每个goroutine创建独立的FTP连接(推荐)

package main

import (
    "fmt"
    "io"
    "os"
    "sync"
    "time"
    
    "github.com/jlaffaye/ftp"
)

func downloadFile(host, user, pass, remotePath, localPath string) error {
    // 为每个文件创建独立的连接
    c, err := ftp.Dial(host, ftp.DialWithTimeout(5*time.Second))
    if err != nil {
        return fmt.Errorf("dial failed: %w", err)
    }
    defer c.Quit()
    
    err = c.Login(user, pass)
    if err != nil {
        return fmt.Errorf("login failed: %w", err)
    }
    
    // 下载文件
    res, err := c.Retr(remotePath)
    if err != nil {
        return fmt.Errorf("retr failed: %w", err)
    }
    defer res.Close()
    
    outFile, err := os.Create(localPath)
    if err != nil {
        return fmt.Errorf("create file failed: %w", err)
    }
    defer outFile.Close()
    
    _, err = io.Copy(outFile, res)
    if err != nil {
        return fmt.Errorf("copy failed: %w", err)
    }
    
    return nil
}

func main() {
    const (
        remoteHost = "ftp.example.com"
        loginID    = "username"
        loginPasw  = "password"
        remoteDir  = "/remote/path/"
        localDir   = "./downloads/"
    )
    
    // 先获取文件列表(使用主连接)
    c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
    if err != nil {
        fmt.Printf("Dial error: %v\n", err)
        return
    }
    defer c.Quit()
    
    err = c.Login(loginID, loginPasw)
    if err != nil {
        fmt.Printf("Login error: %v\n", err)
        return
    }
    
    entries, err := c.List(remoteDir)
    if err != nil {
        fmt.Printf("List error: %v\n", err)
        return
    }
    
    // 创建本地目录
    os.MkdirAll(localDir, 0755)
    
    var wg sync.WaitGroup
    errChan := make(chan error, len(entries))
    
    for _, entry := range entries {
        wg.Add(1)
        
        // 复制变量,避免闭包捕获问题
        remotePath := remoteDir + entry.Name
        localPath := localDir + entry.Name
        fileName := entry.Name
        
        go func() {
            defer wg.Done()
            
            err := downloadFile(remoteHost, loginID, loginPasw, remotePath, localPath)
            if err != nil {
                errChan <- fmt.Errorf("failed to download %s: %w", fileName, err)
            } else {
                fmt.Printf("Successfully downloaded: %s\n", fileName)
            }
        }()
    }
    
    // 等待所有下载完成
    wg.Wait()
    close(errChan)
    
    // 输出错误信息
    for err := range errChan {
        fmt.Printf("Error: %v\n", err)
    }
    
    fmt.Println("All downloads completed")
}

方案2:使用连接池(更高效)

package main

import (
    "fmt"
    "io"
    "os"
    "sync"
    "time"
    
    "github.com/jlaffaye/ftp"
)

type FTPPool struct {
    host     string
    user     string
    pass     string
    pool     chan *ftp.ServerConn
    mu       sync.Mutex
    closed   bool
}

func NewFTPPool(host, user, pass string, poolSize int) (*FTPPool, error) {
    pool := &FTPPool{
        host:   host,
        user:   user,
        pass:   pass,
        pool:   make(chan *ftp.ServerConn, poolSize),
    }
    
    // 初始化连接池
    for i := 0; i < poolSize; i++ {
        conn, err := createConnection(host, user, pass)
        if err != nil {
            pool.Close()
            return nil, err
        }
        pool.pool <- conn
    }
    
    return pool, nil
}

func createConnection(host, user, pass string) (*ftp.ServerConn, error) {
    c, err := ftp.Dial(host, ftp.DialWithTimeout(5*time.Second))
    if err != nil {
        return nil, err
    }
    
    err = c.Login(user, pass)
    if err != nil {
        c.Quit()
        return nil, err
    }
    
    return c, nil
}

func (p *FTPPool) Get() (*ftp.ServerConn, error) {
    select {
    case conn := <-p.pool:
        return conn, nil
    default:
        return createConnection(p.host, p.user, p.pass)
    }
}

func (p *FTPPool) Put(conn *ftp.ServerConn) {
    if conn == nil {
        return
    }
    
    p.mu.Lock()
    defer p.mu.Unlock()
    
    if p.closed {
        conn.Quit()
        return
    }
    
    select {
    case p.pool <- conn:
    default:
        conn.Quit()
    }
}

func (p *FTPPool) Close() {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    if p.closed {
        return
    }
    
    p.closed = true
    close(p.pool)
    
    for conn := range p.pool {
        conn.Quit()
    }
}

func main() {
    const (
        remoteHost = "ftp.example.com"
        loginID    = "username"
        loginPasw  = "password"
        remoteDir  = "/remote/path/"
        localDir   = "./downloads/"
        poolSize   = 5 // 并发连接数
    )
    
    // 创建连接池
    pool, err := NewFTPPool(remoteHost, loginID, loginPasw, poolSize)
    if err != nil {
        fmt.Printf("Failed to create pool: %v\n", err)
        return
    }
    defer pool.Close()
    
    // 获取文件列表
    conn, err := pool.Get()
    if err != nil {
        fmt.Printf("Failed to get connection: %v\n", err)
        return
    }
    
    entries, err := conn.List(remoteDir)
    pool.Put(conn)
    
    if err != nil {
        fmt.Printf("List error: %v\n", err)
        return
    }
    
    // 创建本地目录
    os.MkdirAll(localDir, 0755)
    
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, poolSize) // 控制并发数
    
    for _, entry := range entries {
        wg.Add(1)
        
        // 复制变量
        fileName := entry.Name
        remotePath := remoteDir + fileName
        localPath := localDir + fileName
        
        go func() {
            defer wg.Done()
            
            // 获取信号量
            semaphore <- struct{}{}
            defer func() { <-semaphore }()
            
            // 从连接池获取连接
            conn, err := pool.Get()
            if err != nil {
                fmt.Printf("Failed to get connection for %s: %v\n", fileName, err)
                return
            }
            defer pool.Put(conn)
            
            // 下载文件
            res, err := conn.Retr(remotePath)
            if err != nil {
                fmt.Printf("Failed to retrieve %s: %v\n", fileName, err)
                return
            }
            defer res.Close()
            
            outFile, err := os.Create(localPath)
            if err != nil {
                fmt.Printf("Failed to create file %s: %v\n", fileName, err)
                return
            }
            defer outFile.Close()
            
            _, err = io.Copy(outFile, res)
            if err != nil {
                fmt.Printf("Failed to copy %s: %v\n", fileName, err)
                return
            }
            
            fmt.Printf("Successfully downloaded: %s\n", fileName)
        }()
    }
    
    wg.Wait()
    fmt.Println("All downloads completed")
}

方案3:使用工作池模式

package main

import (
    "fmt"
    "io"
    "os"
    "sync"
    "time"
    
    "github.com/jlaffaye/ftp"
)

type DownloadTask struct {
    RemotePath string
    LocalPath  string
    FileName   string
}

func worker(id int, tasks <-chan DownloadTask, host, user, pass string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    // 每个worker有自己的连接
    c, err := ftp.Dial(host, ftp.DialWithTimeout(5*time.Second))
    if err != nil {
        fmt.Printf("Worker %d: dial failed: %v\n", id, err)
        return
    }
    defer c.Quit()
    
    err = c.Login(user, pass)
    if err != nil {
        fmt.Printf("Worker %d: login failed: %v\n", id, err)
        return
    }
    
    for task := range tasks {
        fmt.Printf("Worker %d: downloading %s\n", id, task.FileName)
        
        res, err := c.Retr(task.RemotePath)
        if err != nil {
            fmt.Printf("Worker %d: failed to retrieve %s: %v\n", id, task.FileName, err)
            continue
        }
        
        outFile, err := os.Create(task.LocalPath)
        if err != nil {
            res.Close()
            fmt.Printf("Worker %d: failed to create file %s: %v\n", id, task.FileName, err)
            continue
        }
        
        _, err = io.Copy(outFile, res)
        res.Close()
        outFile.Close()
        
        if err != nil {
            fmt.Printf("Worker %d: failed to copy %s: %v\n", id, task.FileName, err)
            os.Remove(task.LocalPath)
        } else {
            fmt.Printf("Worker %d: successfully downloaded %s\n", id, task.FileName)
        }
    }
}

func main() {
    const (
        remoteHost   = "ftp.example.com"
        loginID      = "username"
        loginPasw    = "password"
        remoteDir    = "/remote/path/"
        localDir     = "./downloads/"
        numWorkers   = 5
    )
    
    // 获取文件列表
    c, err := ftp.Dial(remoteHost, ftp.DialWithTimeout(5*time.Second))
    if err != nil {
        fmt.Printf("Dial error: %v\n", err)
        return
    }
    defer c.Quit()
    
    err = c.Login(loginID, loginPasw)
    if err != nil {
        fmt.Printf("Login error: %v\n", err)
        return
    }
    
    entries, err := c.List(remoteDir)
    if err != nil {
        fmt.Printf("List error: %v\n", err)
        return
    }
    
    // 创建本地目录
    os.MkdirAll(localDir, 0755)
    
    // 创建任务通道
    tasks := make(chan DownloadTask, len(entries))
    
    // 启动worker
    var wg sync.WaitGroup
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, tasks, remoteHost, loginID, loginPasw, &wg)
    }
    
    // 发送任务
    for _, entry := range entries {
        tasks <- DownloadTask{
            RemotePath: remoteDir + entry.Name,
            LocalPath:  localDir + entry.Name,
            FileName:   entry.Name,
        }
    }
    
    close(tasks)
    wg.Wait()
    
    fmt.Println("All downloads completed")
}

关键改进点

  1. 每个goroutine使用独立连接:避免并发访问同一个FTP连接
  2. 正确的变量捕获:在循环内复制变量值,避免所有goroutine共享同一个变量
  3. 连接池管理:控制并发连接数,避免服务器过载
  4. 完善的错误处理:确保goroutine正确退出,资源正确释放
  5. 并发控制:使用信号量或工作池控制最大并发数

方案1最简单直接,适合大多数场景。方案2和方案3更适合需要大量并发下载的生产环境。

回到顶部