Golang批量邮件服务器超时 - 保持重试的最佳方法

Golang批量邮件服务器超时 - 保持重试的最佳方法 我有一个Go程序,用于向一长串收件人发送带有密码保护附件的个性化邮件。 SMTP服务器经常抛出错误导致程序崩溃,因此并非所有邮件都能成功发送。 有人能建议最佳的方式来对邮件进行排队,让程序能够持续重试而不是在执行中途失败吗?

9 回复

你目前是如何对邮件进行排队的?

更多关于Golang批量邮件服务器超时 - 保持重试的最佳方法的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


感谢这个建议!我会尝试一下并告知您结果…

目前没有进行队列处理,只是在 DialandSend 后如果抛出错误就会触发 panic

目前没有进行队列处理,只是在 DialandSend 后如果抛出错误就会引发恐慌。已经尝试过加入延迟,并在抛出错误后重试几次。

"panic"是指程序崩溃吗?你应该能够在不崩溃的情况下处理断开的连接,所以只需在一段时间后重试即可。可能是SMTP服务器上设置了速率限制器,所以在邮件之间添加等待时间可能会有所帮助。

处理错误而不是引发恐慌,稍后重试这些邮件?除非是在启动时发生的致命错误,否则代码中不应出现恐慌情况。即便如此,您可能仍希望处理错误以向用户呈现更好的错误信息。

2 个赞

GitHub

hectane/hectane

头像

hectane - 使用Go编写的轻量级SMTP客户端

可以参考指数退避算法——这是我找到的一篇文章:

Go语言中的退避与重试机制

以及一些Go语言包(按GitHub星标数排序——我尚未使用过其中任何包,因此星标数是我能提供的最佳参考指标)。

在Go中处理批量邮件发送并实现重试机制,推荐使用带队列的工作池模式。以下是具体实现方案:

package main

import (
    "crypto/tls"
    "fmt"
    "log"
    "net/smtp"
    "sync"
    "time"
)

type Email struct {
    To      string
    Subject string
    Body    string
    // 添加附件相关字段
}

type EmailWorker struct {
    smtpHost     string
    smtpPort     string
    username     string
    password     string
    maxRetries   int
    retryDelay   time.Duration
}

func NewEmailWorker(host, port, user, pass string, maxRetries int, delay time.Duration) *EmailWorker {
    return &EmailWorker{
        smtpHost:    host,
        smtpPort:    port,
        username:    user,
        password:    pass,
        maxRetries:  maxRetries,
        retryDelay:  delay,
    }
}

func (ew *EmailWorker) SendEmail(email Email) error {
    var lastErr error
    
    for attempt := 0; attempt < ew.maxRetries; attempt++ {
        err := ew.trySend(email)
        if err == nil {
            log.Printf("邮件发送成功: %s", email.To)
            return nil
        }
        
        lastErr = err
        log.Printf("发送失败 (尝试 %d/%d): %s - %v", attempt+1, ew.maxRetries, email.To, err)
        
        if attempt < ew.maxRetries-1 {
            time.Sleep(ew.retryDelay * time.Duration(attempt+1))
        }
    }
    
    return fmt.Errorf("达到最大重试次数 %d: %w", ew.maxRetries, lastErr)
}

func (ew *EmailWorker) trySend(email Email) error {
    auth := smtp.PlainAuth("", ew.username, ew.password, ew.smtpHost)
    
    tlsConfig := &tls.Config{
        ServerName: ew.smtpHost,
    }
    
    conn, err := tls.Dial("tcp", ew.smtpHost+":"+ew.smtpPort, tlsConfig)
    if err != nil {
        return err
    }
    defer conn.Close()
    
    client, err := smtp.NewClient(conn, ew.smtpHost)
    if err != nil {
        return err
    }
    defer client.Close()
    
    if err = client.Auth(auth); err != nil {
        return err
    }
    
    if err = client.Mail(ew.username); err != nil {
        return err
    }
    
    if err = client.Rcpt(email.To); err != nil {
        return err
    }
    
    wc, err := client.Data()
    if err != nil {
        return err
    }
    defer wc.Close()
    
    message := fmt.Sprintf("From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s",
        ew.username, email.To, email.Subject, email.Body)
    
    _, err = fmt.Fprint(wc, message)
    return err
}

func ProcessEmailQueue(emails []Email, worker *EmailWorker, numWorkers int) {
    emailChan := make(chan Email, len(emails))
    var wg sync.WaitGroup
    
    // 启动工作池
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for email := range emailChan {
                log.Printf("Worker %d 处理邮件: %s", workerID, email.To)
                if err := worker.SendEmail(email); err != nil {
                    log.Printf("Worker %d 最终失败: %s - %v", workerID, email.To, err)
                    // 这里可以添加失败邮件到另一个队列进行后续处理
                }
            }
        }(i)
    }
    
    // 将邮件推送到队列
    for _, email := range emails {
        emailChan <- email
    }
    close(emailChan)
    
    wg.Wait()
    log.Println("所有邮件处理完成")
}

func main() {
    // 配置邮件工作器
    worker := NewEmailWorker(
        "smtp.example.com",
        "587",
        "your-email@example.com",
        "your-password",
        3,           // 最大重试次数
        time.Second * 5, // 基础重试延迟
    )
    
    // 示例邮件列表
    emails := []Email{
        {To: "user1@example.com", Subject: "测试邮件1", Body: "这是邮件内容1"},
        {To: "user2@example.com", Subject: "测试邮件2", Body: "这是邮件内容2"},
        // 添加更多邮件...
    }
    
    // 使用5个工作协程处理邮件队列
    ProcessEmailQueue(emails, worker, 5)
}

关键特性说明:

  1. 指数退避重试:每次重试延迟时间递增,避免频繁请求
  2. 并发工作池:多个协程并行处理,提高吞吐量
  3. TLS连接:使用加密连接确保安全性
  4. 错误隔离:单封邮件失败不影响其他邮件发送
  5. 详细日志:跟踪每封邮件的发送状态

对于生产环境,建议添加以下增强功能:

// 持久化队列示例
type PersistentQueue struct {
    mu     sync.RWMutex
    emails []Email
}

func (pq *PersistentQueue) Add(email Email) {
    pq.mu.Lock()
    defer pq.mu.Unlock()
    pq.emails = append(pq.emails, email)
}

func (pq *PersistentQueue) GetFailedEmails() []Email {
    pq.mu.RLock()
    defer pq.mu.RUnlock()
    return pq.emails
}

这种架构能够确保即使SMTP服务器暂时不可用,邮件也会在后续重试中成功发送,避免程序崩溃。

回到顶部