Python中如何实现阿里云OSS增量上传脚本

aliyun-oss-sync

此脚本是用来发布我个人博客Poison而编写的,因为工作中常用语言为 Java ,而 Python 仅是副业,代码如有不当之处,敬请指出。

逻辑很简单,递归遍历本地目录,然后判断每个文件在 OSS 里是否存在,如果不存在则直接上传,如果存在则检查 Content-Md5 是否相等,如果不相等则表明该文件内容已经发生变化,则上传该文件, OSS 会自动覆盖同名文件。

值得注意的是检查 Content-Md5 的值是用的 HTTP 的 HEAD 方法,因为我们只需要 header 中的 Content-Md5 字段的值,所以并不需要使用 GET 方法拿到响应体,这样既加快了速度也节省了 OSS 流量。

关于 oss_public_domain 变量的值,你如果在同地域内网的 ECS 上使用该脚本,建议使用内网域名,速度快并且节省了流量费用,否则使用外网域名。


Python中如何实现阿里云OSS增量上传脚本

7 回复

问题好多:

1. requirements.txt 没有,依赖也没有说明
2. oss2 本身就有 Bucket.get_object_meta(key),不需要自己 request 去请求
3. oss2 支持 Python 2.6 , 2.7 , 3.3 , 3.4 , 3.5 ,代码只兼容 python2 语法,很浪费
4. 单线程上传
5. 没有 main
6. 没有参数解析或者配置文件,硬编码路径

其它的想到了再说


我来写一个阿里云OSS增量上传的脚本。核心思路是用MD5校验文件是否变化,只上传修改过的文件。

import os
import hashlib
import json
from datetime import datetime
from oss2 import Auth, Bucket

class OSSIncrementalUploader:
    def __init__(self, access_key_id, access_key_secret, endpoint, bucket_name):
        """初始化OSS客户端"""
        auth = Auth(access_key_id, access_key_secret)
        self.bucket = Bucket(auth, endpoint, bucket_name)
        self.state_file = 'upload_state.json'  # 记录文件状态
        self.load_state()
    
    def load_state(self):
        """加载上次上传的状态记录"""
        if os.path.exists(self.state_file):
            with open(self.state_file, 'r') as f:
                self.state = json.load(f)
        else:
            self.state = {}
    
    def save_state(self):
        """保存当前状态到文件"""
        with open(self.state_file, 'w') as f:
            json.dump(self.state, f, indent=2)
    
    def calculate_md5(self, file_path):
        """计算文件的MD5值"""
        hash_md5 = hashlib.md5()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    def should_upload(self, local_path, oss_key):
        """判断是否需要上传文件"""
        # 如果本地文件不存在,跳过
        if not os.path.exists(local_path):
            return False
        
        # 获取文件信息
        file_mtime = os.path.getmtime(local_path)
        file_size = os.path.getsize(local_path)
        current_md5 = self.calculate_md5(local_path)
        
        # 检查状态记录
        if oss_key in self.state:
            old_state = self.state[oss_key]
            # 如果文件大小、修改时间或MD5有变化,需要上传
            if (old_state['size'] != file_size or 
                old_state['mtime'] != file_mtime or
                old_state['md5'] != current_md5):
                return True
            return False
        else:
            # 新文件,需要上传
            return True
    
    def upload_file(self, local_path, oss_key):
        """上传单个文件到OSS"""
        try:
            # 检查是否需要上传
            if not self.should_upload(local_path, oss_key):
                print(f"跳过未修改文件: {local_path}")
                return False
            
            print(f"上传文件: {local_path} -> {oss_key}")
            
            # 上传文件
            result = self.bucket.put_object_from_file(oss_key, local_path)
            
            if result.status == 200:
                # 更新状态记录
                self.state[oss_key] = {
                    'size': os.path.getsize(local_path),
                    'mtime': os.path.getmtime(local_path),
                    'md5': self.calculate_md5(local_path),
                    'last_upload': datetime.now().isoformat()
                }
                self.save_state()
                return True
            else:
                print(f"上传失败: {local_path}")
                return False
                
        except Exception as e:
            print(f"上传出错 {local_path}: {str(e)}")
            return False
    
    def upload_directory(self, local_dir, oss_prefix=''):
        """上传整个目录(递归)"""
        for root, dirs, files in os.walk(local_dir):
            for file in files:
                local_path = os.path.join(root, file)
                
                # 计算OSS上的路径
                relative_path = os.path.relpath(local_path, local_dir)
                oss_key = os.path.join(oss_prefix, relative_path).replace('\\', '/')
                
                self.upload_file(local_path, oss_key)

# 使用示例
if __name__ == "__main__":
    # 配置信息
    ACCESS_KEY_ID = 'your_access_key_id'
    ACCESS_KEY_SECRET = 'your_access_key_secret'
    ENDPOINT = 'https://oss-cn-hangzhou.aliyuncs.com'  # 根据你的区域修改
    BUCKET_NAME = 'your_bucket_name'
    
    # 创建上传器
    uploader = OSSIncrementalUploader(
        ACCESS_KEY_ID, 
        ACCESS_KEY_SECRET, 
        ENDPOINT, 
        BUCKET_NAME
    )
    
    # 上传目录
    local_directory = '/path/to/your/local/directory'
    uploader.upload_directory(local_directory, 'backup/')
    
    print("增量上传完成!")

这个脚本的核心机制:

  1. 状态记录:用JSON文件保存已上传文件的信息(大小、修改时间、MD5)
  2. 变化检测:通过比较文件大小、修改时间和MD5值来判断文件是否变化
  3. 增量上传:只上传有变化的文件,跳过未修改的文件

使用前需要安装OSS SDK:

pip install oss2

记得替换配置信息为你的实际值。脚本会自动创建upload_state.json文件来跟踪上传状态。

一句话总结:用MD5校验和状态记录实现增量检测。

  1. oss 里面 key 有限制,在 key 是无效的情况下怎么处理
    8. 使用普通的 Bucket.put_object ,有单文件上传大小限制,需要用分块上传或者 Bucket.append_object
    9. oss 的 md5 与程序里的 md5 能否保证是一致的,是否需要自己另外用统一的 hash ,使用额外的 meta 来处理

大兄弟,感觉你的代码有点 low

这样的文件判断也不是原子性的啊

谢谢指出,我也是接触 Python 不久,不是很熟悉相关代码规范,比如你说的参数解析这个问题,在 Java 里就可以用 jcommander ,因为不是很熟悉 Python 的生态,所以写得比较 low ,这个也是我那天下午有空草草看了下 OSS 的文档就编写出的第一个版本,你说的问题我下班了有空时都会一一修改,逐步迭代至正常的水准,额,最后能给个你的联系方式吗?到时 Python 不清楚的地方再请教下,谢谢

关于上面第二点,经测试,对同一个文件,通过 Python SDK 中的 get_object_meta(key)方法拿到的信息比 HTTP HEAD 方法拿到的头信息要少几个字段,相比 HTTP HEAD 方法,正好少了 Content-Md5 这个字段。

回到顶部