Oss文件存儲

 

包含文件的上傳下載和生成臨時的url

# -*- coding: utf-8 -*-

import os
import oss2
import configparser
from Config import *

class AliOss:
    def __init__(self):
        # 讀取配置文件
        self._cf = configparser.ConfigParser()
        self._cf.read(ConfigPath)

        # 讀取環境變量或定義常量
        self._access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', self._cf.get("oss","OSS_TEST_ACCESS_KEY_ID"))
        self._access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', self._cf.get("oss","OSS_TEST_ACCESS_KEY_SECRET"))
        self._bucket_name = os.getenv('OSS_TEST_BUCKET', self._cf.get("oss","OSS_TEST_BUCKET"))
        self._endpoint = os.getenv('OSS_TEST_ENDPOINT', self._cf.get("oss","OSS_TEST_ENDPOINT"))
        # self._filepath = self._cf.get("oss","FILEPATH")
        
        # OSS認證信息
        auth = oss2.Auth(self._access_key_id, self._access_key_secret)

        # 建立Bucket對象,全部Object相關的接口均可以經過Bucket對象來進行
        self._bucket = oss2.Bucket(auth, self._endpoint, self._bucket_name)

    # 上傳文件至oss
    #
    # [Params]
    # from_file: 上傳對象文件
    # to_oss: 上傳至oss的文件名
    # remove_from_file: 是否刪除原文件
    #
    async def upload(self, from_file, to_oss=None, remove_from_file=False):
        # 上傳oss的文件名未指定,則默認爲當前文件名
        oss_file_name = os.path.basename(from_file) if to_oss is None else to_oss
        self._bucket.put_object_from_file(oss_file_name, from_file)
        
        # 刪除原文件
        if remove_from_file:
            os.remove(from_file)
        
        return True

    # 下載文件
    #
    # [Params]
    # from_oss: 上傳對象文件
    # out_path: 上傳至oss的文件名
    #
    def download(self, from_oss, out_path):
        # 文件不存在返回False
        if not self.exists(from_oss):
            return False

        # 未指定輸出路徑,默認爲當前路徑下保存
        out = from_oss if out_path is None else out_path
        self._bucket.get_object_to_file(from_oss, out)
        
        return True

    # Stream
    #
    # [Params]
    # oss_key: 上傳對象Key
    # oss_value: 上傳內容
    #
    def put(self, oss_key, oss_value):
        self._bucket.put_object(oss_key, oss_value)
        return True

    # Stream
    #
    # [Params]
    # oss_key: 上傳對象Key
    #
    def stream(self, oss_key):
        # oss對象不存在則返回None
        if not self.exists(oss_key):
            return None

        # 返回流式對象
        return self._bucket.get_object(oss_key)


    # 文件是否存在
    #
    # [Params]
    # oss_file_name: oss的文件名
    #
    def exists(self, oss_file_name):
        return self._bucket.object_exists(oss_file_name)
        

    # 刪除文件
    #
    # [Params]
    # oss_file_name: oss的文件名
    #
    async def delete(self, oss_file_name):
        self._bucket.delete_object(oss_file_name)
        
        return True

    def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60):
        result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime)
        result.replace("-internal","")
        return result

對上面的函數進行解釋和說明python

上傳文件

self._bucket.put_object_from_file(oss_file_name, from_file) 接受的是本地的文件名稱
能夠這樣使用
bucket.put_object_from_file('./example.jpg','./example.jpg')

  

下載文件

out = from_oss if out_path is None else out_path
self._bucket.get_object_to_file(from_oss, out)

第一個參數是fromOss,第二個參數是本地的輸出路徑json

bucket.get_object_to_file('example.jpg', 'example2.jpg')

  

上傳文件(以流的形式)

self._bucket.put_object(oss_key, oss_value)
須要打開的操做
with open(oss2.to_unicode('本地座右銘.txt'), 'rb') as f:
    bucket.put_object('雲上座右銘.txt', f)

 

下載文件(以流的形式)

def stream(self, oss_key):
# oss對象不存在則返回None
if not self.exists(oss_key):
return None

# 返回流式對象
return self._bucket.get_object(oss_key)
直接去指定oss文件上的文件名稱
result=bucket.get_object('5a055c56705deb3b31d3bcab.json')
print(result.read())

 

判斷oss上是否存在這個文件 

直接把須要判斷的文件名當參數進行判斷便可 

def exists(self, oss_file_name):
    return self._bucket.object_exists(oss_file_name)

刪除oss上的文件

async def delete(self, oss_file_name):
    self._bucket.delete_object(oss_file_name)

 

生成一個臨時的簽名url供客戶端下載

bucket.sign_url('請求的方法get或post', 'oss上的文件名', '過時的時間')
    def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60):
        result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime)
        result.replace("-internal","")
        return result
相關文章
相關標籤/搜索