django storage 文件上传与存储
2024年6月2日大约 5 分钟约 1412 字
OSS存储标准类
"""
官方文档: https://docs.djangoproject.com/zh-hans/5.0/howto/custom-file-storage/
OSS文档: https://help.aliyun.com/zh/oss/developer-reference/preface
Debian9依赖:apt install python-dev-is-python3
python依赖:pip install oss2 # 代码使用版本为 2.18.5
请在OSS中配置跨域,否则less和woff2等文件无法加载到浏览器。
settings.py文件配置举例:
# --------------------OSS存储----------------
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'] = 'xxx'
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'] = 'xxx'
STATICFILES_STORAGE = 'utils.app.storage.OssStaticStorage'
DEFAULT_FILE_STORAGE = 'utils.app.storage.OssMediaStorage'
OSS_ENDPOINT = 'https://oss-cn-hangzhou.aliyuncs.com'
OSS_BUCKET_NAME = 'django-rest-framework-template-media'
OSS_ACCESS_URL = 'https://oa-media.oss-cn-hangzhou.aliyuncs.com' # 仅访问文件使用
STATIC_URL = 'static/' # 访问静态文件的 URL 前缀
STATICFILES_DIRS = (os.path.join(BASE_DIR, 'static'),) # 需要额外收集的静态文件根目录
"""
import shutil
import os
from tempfile import SpooledTemporaryFile
from urllib.parse import urljoin
from datetime import datetime, UTC
from django.core.files import File
from django.core.files.utils import validate_file_name
from django.core.files.storage import Storage
from django.core.exceptions import SuspiciousOperation
from django.conf import settings
from django.utils.crypto import get_random_string
from django.utils.deconstruct import deconstructible
import oss2.utils
import oss2.exceptions
from oss2 import Auth, Service, Bucket, ObjectIterator, BUCKET_ACL_PRIVATE
class OssError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
@deconstructible
class OssStorage(Storage):
"""
Aliyun OSS Storage
"""
location = settings.MEDIA_URL
def __init__(self):
self.access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', getattr(settings, 'ALIBABA_CLOUD_ACCESS_KEY_ID', ''))
self.access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', getattr(settings, 'ALIBABA_CLOUD_ACCESS_KEY_SECRET', ''))
# yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
self.end_point = settings.OSS_ENDPOINT
self.bucket_name = settings.OSS_BUCKET_NAME
self.expire_time = getattr(settings, 'OSS_EXPIRE_TIME', 60 * 60 * 24 * 30)
self.auth = Auth(self.access_key_id, self.access_key_secret)
self.service = Service(self.auth, self.end_point)
self.bucket = Bucket(self.auth, self.end_point, self.bucket_name, connect_timeout=30)
# 尝试获取桶的acl用来检查桶是否存在
try:
self.bucket_acl = self.bucket.get_bucket_acl().acl
except oss2.exceptions.NoSuchBucket:
raise SuspiciousOperation("桶 '%s' 不存在." % self.bucket_name)
def _open(self, name, mode='rb') -> File:
"""打开一个文件。
return: OssFile实例
"""
target_name = self.path(name)
try:
# Load the key into a temporary file
tmpf = SpooledTemporaryFile(max_size=10 * 1024 * 1024) # 10MB
obj = self.bucket.get_object(target_name)
if obj.content_length is None:
shutil.copyfileobj(obj, tmpf)
else:
oss2.utils.copyfileobj_and_verify(obj, tmpf, obj.content_length, request_id=obj.request_id)
tmpf.seek(0)
return OssFile(tmpf, target_name, self)
except oss2.exceptions.NoSuchKey:
raise OssError("%s does not exist" % name)
except Exception as e:
raise OssError("Failed to open %s" % name)
def _save(self, name: str, content: File) -> str:
"""保存一个文件
Args:
name: 文件相对路径(不带/media/前缀)。例 news/abc.jpg
content: 文件流数据
Returns:
str: 文件实际保存后的相对路径(不带/media/前缀)。例 news/abc.jpg
保存时可能会修改文件名,所以返回值可能与传入的name不同,否则值一样。
"""
target_name = self.path(name)
self.bucket.put_object(target_name, content)
return os.path.normpath(name)
def path(self, name) -> str:
"""在路径加入前缀 static/ 或 media/ 等。
Args:
self.location: /media/
name : css/test.txt
Returns:
str: media/css/test.txt
"""
# return safe_join(self.location, name)
name = name.lstrip('/')
final_path = urljoin(self.location + "/", name)
name = os.path.normpath(final_path.lstrip('/')) # 将url路径标准化,/a/c/..b , /a/b/c/.. 等改为/a/b/
# 将/添加到path的末尾,因为os.path.normpath会删除它
if final_path.endswith('/') and not name.endswith('/'):
name += '/'
return name.replace('\\', '/')
def create_dir(self, dirname):
"""创建一个目录。"""
target_name = self.path(dirname)
if not target_name.endswith('/'):
target_name += '/'
self.bucket.put_object(target_name, '')
def exists(self, name) -> bool:
"""检查文件或目录是否存在(如果该名称可用于新文件则返回True)。
Returns:
bool:
"""
# 如果是上传静态文件,直接返回False,因为是覆盖模式
if self.location == settings.STATIC_URL:
return False
target_name = self.path(name)
# 如果看起来像一个目录。但是OSS没有目录的概念
if name.endswith("/"):
# 需要检查 key 是否以该前缀开头
result = self.bucket.list_objects(prefix=target_name, delimiter='', marker='', max_keys=1)
if len(result.object_list) == 0:
print(f'对象列表: {result.object_list}')
else:
print(f'对象列表: {result.object_list[0].key}')
# 无论如何都会返回
return bool(result.object_list)
# 检查文件是否存在
exist = self.bucket.object_exists(target_name)
# 它不是一个文件,但可能是一个目录,再次检查是否有同名目录存在
if not exist:
name2 = name + "/"
return self.exists(name2)
return exist
def get_available_name(self, name, max_length=None):
"""检查文件路径是否已存在并返回一个可保存的空闲文件路径(会在文件名后面加7位随机数,作为简单的加密)。
save()中会调用此方法,用来识别并返回一个空闲的文件路径。
"""
# 如果是收集上传静态文件,则不修改文件名
if self.location == settings.STATIC_URL:
return name
name = str(name).replace("\\", "/")
# 目录名称,文件名称
dir_name, file_name = os.path.split(name)
validate_file_name(file_name)
# 将文件名和格式名分开
file_root, file_ext = os.path.splitext(file_name)
file_name = "%s_%s%s" % (file_root, get_random_string(7), file_ext) # 将文件名加上随机后缀
name = os.path.join(dir_name, file_name)
return name
def get_file_meta(self, name):
"""获取文件meta信息。"""
name = self.path(name)
return self.bucket.get_object_meta(name)
def size(self, name):
"""获取文件大小。"""
file_meta = self.get_file_meta(name)
return file_meta.content_length
def modified_time(self, name):
file_meta = self.get_file_meta(name)
return datetime.fromtimestamp(file_meta.last_modified)
created_time = accessed_time = modified_time
def get_modified_time(self, name):
"""返回文件最后修改时间的 datetime。"""
file_meta = self.get_file_meta(name)
if settings.USE_TZ:
return datetime.fromtimestamp(file_meta.last_modified, tz=UTC)
else:
return datetime.fromtimestamp(file_meta.last_modified)
get_created_time = get_accessed_time = get_modified_time
def content_type(self, name):
name = self.path(name)
file_info = self.bucket.head_object(name)
return file_info.content_type
def listdir(self, name):
"""列出一个目录中的文件名。"""
if name == ".":
name = ""
name = self.path(name)
if not name.endswith('/'):
name += "/"
files = []
dirs = []
for obj in ObjectIterator(self.bucket, prefix=name, delimiter='/'):
if obj.is_prefix():
dirs.append(obj.key)
else:
files.append(obj.key)
return dirs, files
def url(self, name: str) -> str:
"""通过文件相对路径,获取文件的url(不带文件名)。
这个函数是在前端获取静态文件或媒体文件时使用。
Args:
name: 文件相对路径(不带/media/前缀)。例:img/abc.jpg
Returns:
str: 返回文件完整可访问的URL。例:https://www.example.com/media/img/abc.jpg
"""
# 加上前缀 static/ 或 media/
key = self.path(name)
# 加上https://***.oss-cn-hangzhou.aliyuncs.com/***
full_url_str = self.bucket.sign_url('GET', key, expires=self.expire_time)
# 如果是公共读('public-read')
if self.bucket_acl != BUCKET_ACL_PRIVATE:
idx = full_url_str.find('?')
if idx > 0:
# 将url中'%2F'转换为'/'
full_url_str = full_url_str[:idx].replace('%2F', '/')
return full_url_str
def delete(self, name):
"""删除一个文件"""
name = self.path(name)
result = self.bucket.delete_object(name)
def delete_with_slash(self, dirname):
"""删除一个目录。"""
name = self.path(dirname)
if not name.endswith('/'):
name += '/'
result = self.bucket.delete_object(name)
class OssMediaStorage(OssStorage):
def __init__(self):
self.location = settings.MEDIA_URL
super(OssMediaStorage, self).__init__()
class OssStaticStorage(OssStorage):
def __init__(self):
self.location = settings.STATIC_URL
super(OssStaticStorage, self).__init__()
class OssFile(File):
"""
A file returned from AliCloud OSS
"""
def __init__(self, file, name, storage):
super(OssFile, self).__init__(file, name)
self._storage = storage
def open(self, mode="rb", *args, **kwargs):
if self.closed:
self.file = self._storage.open(self.name, mode).file
return super(OssFile, self).open(mode)
将文件上传到阿里云OSS
#>>> vim views.py
from commons.ossutils import upload_oss
BucketName = 'shareditor-shareditor'
def body_upload(request):
print request.FILES
if 'upload' in request.FILES:
image_name = request.FILES['upload'].name
image_content = request.FILES['upload'].read()
url = upload_oss(BucketName, image_name, image_content)
if url:
return render(request, 'web/body_upload.html', {'url': url})
return HttpResponse('upload fail')
#>>> vim urls.py
url(r'^uploader/body_upload', views.body_upload, name='body_upload'),