# -*- coding: utf-8 -*- import os import oss2 import configparser from Config import * class AliOss: def __init__(self): # 讀取配置文件 self._cf = configparser.ConfigParser() self._cf.read(ConfigPath) # 讀取環境變量或定義常量 self._access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', self._cf.get("oss","OSS_TEST_ACCESS_KEY_ID")) self._access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', self._cf.get("oss","OSS_TEST_ACCESS_KEY_SECRET")) self._bucket_name = os.getenv('OSS_TEST_BUCKET', self._cf.get("oss","OSS_TEST_BUCKET")) self._endpoint = os.getenv('OSS_TEST_ENDPOINT', self._cf.get("oss","OSS_TEST_ENDPOINT")) # self._filepath = self._cf.get("oss","FILEPATH") # OSS認證信息 auth = oss2.Auth(self._access_key_id, self._access_key_secret) # 建立Bucket對象,全部Object相關的接口均可以經過Bucket對象來進行 self._bucket = oss2.Bucket(auth, self._endpoint, self._bucket_name) # 上傳文件至oss # # [Params] # from_file: 上傳對象文件 # to_oss: 上傳至oss的文件名 # remove_from_file: 是否刪除原文件 # async def upload(self, from_file, to_oss=None, remove_from_file=False): # 上傳oss的文件名未指定,則默認爲當前文件名 oss_file_name = os.path.basename(from_file) if to_oss is None else to_oss self._bucket.put_object_from_file(oss_file_name, from_file) # 刪除原文件 if remove_from_file: os.remove(from_file) return True # 下載文件 # # [Params] # from_oss: 上傳對象文件 # out_path: 上傳至oss的文件名 # def download(self, from_oss, out_path): # 文件不存在返回False if not self.exists(from_oss): return False # 未指定輸出路徑,默認爲當前路徑下保存 out = from_oss if out_path is None else out_path self._bucket.get_object_to_file(from_oss, out) return True # Stream # # [Params] # oss_key: 上傳對象Key # oss_value: 上傳內容 # def put(self, oss_key, oss_value): self._bucket.put_object(oss_key, oss_value) return True # Stream # # [Params] # oss_key: 上傳對象Key # def stream(self, oss_key): # oss對象不存在則返回None if not self.exists(oss_key): return None # 返回流式對象 return self._bucket.get_object(oss_key) # 文件是否存在 # # [Params] # oss_file_name: oss的文件名 # def exists(self, oss_file_name): return self._bucket.object_exists(oss_file_name) # 刪除文件 # # [Params] # oss_file_name: oss的文件名 # async def delete(self, oss_file_name): self._bucket.delete_object(oss_file_name) return True def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60): result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime) result.replace("-internal","") return result
對上面的函數進行解釋和說明python
self._bucket.put_object_from_file(oss_file_name, from_file) 接受的是本地的文件名稱
能夠這樣使用
bucket.put_object_from_file('./example.jpg','./example.jpg')
out = from_oss if out_path is None else out_path
self._bucket.get_object_to_file(from_oss, out)
第一個參數是fromOss,第二個參數是本地的輸出路徑json
bucket.get_object_to_file('example.jpg', 'example2.jpg')
self._bucket.put_object(oss_key, oss_value)
須要打開的操做
with open(oss2.to_unicode('本地座右銘.txt'), 'rb') as f: bucket.put_object('雲上座右銘.txt', f)
def stream(self, oss_key):
# oss對象不存在則返回None
if not self.exists(oss_key):
return None
# 返回流式對象
return self._bucket.get_object(oss_key)
直接去指定oss文件上的文件名稱
result=bucket.get_object('5a055c56705deb3b31d3bcab.json') print(result.read())
def exists(self, oss_file_name): return self._bucket.object_exists(oss_file_name)
async def delete(self, oss_file_name): self._bucket.delete_object(oss_file_name)
bucket.sign_url('請求的方法get或post', 'oss上的文件名', '過時的時間')
def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60): result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime) result.replace("-internal","") return result