Python中如何处理AWS S3 bucket中的视频文件
想用 aws 的 rekognition 模块处理视频,看了文档,
创建 Amazon SNS 主题。
创建 Amazon SQS 队列。
为 Amazon Rekognition Video 提供将视频分析操作的完成状态发布到 Amazon SNS 主题的权限。
为 Amazon SQS 队列订阅 Amazon SNS 主题。必须走这个流程吗?
想请教一下代码该如何写
Python中如何处理AWS S3 bucket中的视频文件
2 回复
处理AWS S3中的视频文件,核心是使用boto3库。这里给你一个完整的操作示例,包括上传、下载、获取信息和生成预签名URL。
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class S3VideoHandler:
def __init__(self, bucket_name, region='us-east-1'):
"""
初始化S3客户端
:param bucket_name: S3存储桶名称
:param region: AWS区域
"""
self.s3_client = boto3.client('s3', region_name=region)
self.bucket_name = bucket_name
def upload_video(self, local_file_path, s3_key):
"""
上传视频文件到S3
:param local_file_path: 本地视频文件路径
:param s3_key: S3中的对象键(路径)
"""
try:
self.s3_client.upload_file(
local_file_path,
self.bucket_name,
s3_key,
ExtraArgs={'ContentType': 'video/mp4'} # 根据实际视频格式调整
)
logger.info(f"视频上传成功: {s3_key}")
return True
except FileNotFoundError:
logger.error(f"本地文件不存在: {local_file_path}")
return False
except NoCredentialsError:
logger.error("AWS凭证未配置")
return False
def download_video(self, s3_key, local_file_path):
"""
从S3下载视频文件
:param s3_key: S3中的对象键
:param local_file_path: 本地保存路径
"""
try:
self.s3_client.download_file(
self.bucket_name,
s3_key,
local_file_path
)
logger.info(f"视频下载成功: {local_file_path}")
return True
except ClientError as e:
logger.error(f"下载失败: {e}")
return False
def get_video_info(self, s3_key):
"""
获取视频文件信息
:param s3_key: S3中的对象键
:return: 文件元数据字典
"""
try:
response = self.s3_client.head_object(
Bucket=self.bucket_name,
Key=s3_key
)
info = {
'size': response['ContentLength'],
'content_type': response['ContentType'],
'last_modified': response['LastModified'],
'etag': response['ETag']
}
logger.info(f"视频信息: {info}")
return info
except ClientError as e:
logger.error(f"获取信息失败: {e}")
return None
def generate_presigned_url(self, s3_key, expiration=3600):
"""
生成预签名URL(用于临时访问)
:param s3_key: S3中的对象键
:param expiration: URL有效期(秒)
:return: 预签名URL
"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': s3_key},
ExpiresIn=expiration
)
logger.info(f"生成预签名URL: {url}")
return url
except ClientError as e:
logger.error(f"生成URL失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 初始化处理器
handler = S3VideoHandler(bucket_name='your-video-bucket')
# 上传视频
handler.upload_video('local_video.mp4', 'videos/my_video.mp4')
# 获取视频信息
info = handler.get_video_info('videos/my_video.mp4')
print(f"视频大小: {info['size']} bytes")
# 生成分享链接(1小时有效)
url = handler.generate_presigned_url('videos/my_video.mp4', expiration=3600)
print(f"分享链接: {url}")
# 下载视频
handler.download_video('videos/my_video.mp4', 'downloaded_video.mp4')
关键点说明:
- 使用
boto3库与S3交互 upload_file/download_file处理文件传输head_object获取文件元数据generate_presigned_url生成临时访问链接- 注意异常处理和日志记录
依赖安装:
pip install boto3
AWS凭证配置:
- 通过环境变量:
AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY - 或使用
~/.aws/credentials文件
一句话建议: 根据视频大小考虑使用分段上传(upload_fileobj)或异步处理。
看文档介绍,不是必须这个流程吧

