1.優酷上傳git
1)調用優酷的sdk完成優酷視頻的上傳首先須要將實例化YoukuUpload類實例化,傳入的參數爲(client_id,access_token,文件地址)json
實例化時執行__init__()api
2)實例化完成後獲得類的對象,經過對象調用upload方法,傳入參數爲字典(dict),字典內的必傳參數爲title,其他可爲默認,其中的一些參數是爲了控制視頻一些信息的,具體參見代碼的註釋服務器
3)在upload方法中less
(1)會先判斷upload_token 這個參數是否存在,該參數爲優酷返回,存在就繼續以前的上傳,若是不存在的話就判斷爲新上傳。socket
(2)在新上傳中限制性create方法來在服務端建立上傳,在此以前會對傳進來的參數和默認的參數會放入新的字典中ide
def prepare_video_params(self, title=None, tags='Others', description='', copyright_type='original', public_type='all', category=None, watch_password=None, latitude=None, longitude=None, shoot_time=None ): # 準備視頻因此須要的一些參數 params = {} if title is None: # 若是沒有傳title的話,title默認等於文件名 title = self.file_name elif len(title) > 50: # 若是title過長,長度大於50的話,就截取前50個字符做爲title title = title[:50] params['title'] = title params['tags'] = tags # tags就等於你傳進來的那個tags,默認爲others params['description'] = description # 描述默認爲空,能夠傳進來 params['copyright_type'] = copyright_type # 版權全部 original: 原創 reproduced: 轉載 params['public_type'] = public_type # 公開類型 all: 公開 friend: 僅好友觀看 password: 輸入密碼觀看 private: 私有 if category: params['category'] = category # 視頻分類 默認爲空,可傳 if watch_password: params['watch_password'] = watch_password # 觀看密碼,默認爲空,可傳 """ latitude/longitude用戶記錄地理位置信息 shoot_time用來標記視頻中正片的開始時間 """ if latitude: params['latitude'] = latitude # 緯度 if longitude: params['longitude'] = longitude # 經度 if shoot_time: params['shoot_time'] = shoot_time log.debug("prepare_video_params:%s" % params) return params
(3)在create方法中,會用get方法訪問url = 'https://api.youku.com/uploads/create.json',並將參數傳過去,建立oss客戶端,優酷會返回一些字段post
def create(self, params): # prepare file info params['file_name'] = self.file_name params['file_size'] = self.file_size # 在__init__中獲取到了,也就是在類的實例化時就已經根據文件的地址獲取了文件的大小 params['file_md5'] = self.file_md5 = self.checksum_md5file(self.file) # 將文件信息作md5 校驗,根據文件名打開文件,而後每次讀取8192B,進行md5更新後再轉爲十六進制返回 self.logger.info('upload file %s, size: %d bytes' % (self.file_name, self.file_size)) self.logger.info('md5 of %s: %s' % (self.file_name, self.file_md5)) params['client_id'] = self.client_id # client_id params['access_token'] = self.access_token # access_token url = 'https://api.youku.com/uploads/create.json' r = requests.get(url, params=params)# 以get方法將文件信息發送到'https://api.youku.com/uploads/create.json' check_error(r, 201) result = r.json() log.debug("file--->vid:%s,return_result:%s" % (self.v_vid, result)) self.upload_token = result['upload_token'] self.logger.info('upload token of %s: %s' % (self.file_name, self.upload_token)) self.upload_server_ip = socket.gethostbyname( result['upload_server_uri']) self.logger.info('upload_server_ip of %s: %s' % (self.file_name, self.upload_server_ip)) log.debug("file_vid:%s ip:%s" % (self.v_vid, self.upload_server_ip))
(4) 調用create_file方法,將文件的大小、token、每次上傳切片大小傳到上一步返回的ip所指向的服務器url
(5) 調用new_slice方法 告訴服務器才準備上傳文件切片,目的在於檢查服務器狀態和返回服務器中這個新建切片的信息,沒有報錯就執行_save_slice_state方法spa
(6)在_save_slice_state方法中,將切片的信息進行更新或者保存,以後判斷返回的task_id是否爲0,若是爲零就直接提交commit完成整個上傳,不爲0就 調用upload_slice方法上傳文件切片
(7)在upload_slice方法中,每次打開文件移動指針位置到上一次上傳的地方,而後讀取2048kb的數據,以後調用url進行上傳。
def upload_slice(self): # 上傳文件切片 seek():移動文件讀取指針到指定位置 data = None with open(self.file, 'rb') as f: f.seek(self.slice_offset) data = f.read(self.slice_length) # 2048 params = { 'upload_token': self.upload_token, 'slice_task_id': self.slice_task_id, 'offset': self.slice_offset, 'length': self.slice_length, # Byte 'hash': self.checksum_md5data(data)# hash這個字段是爲了在服務端進行校驗,來驗證上傳的文件有沒有出現丟失或錯誤 } url = 'http://%s/gupload/upload_slice' % self.upload_server_ip r = requests.post(url, params=params, data=data) check_error(r, 201) self._save_slice_state(r.json())
(8) 循環判斷task_id,若task_id爲0則結束上傳(即調用commit())
總體流程:應該是先本地將文件的信息上傳到服務器,在服務器建立同名文件,而後根據文件的size和每次對文件進行切片的大小控制上傳,在上傳過程當中服務端會先返回此次要上傳的切片的task_id,若是在服務端的文件大小等於你上傳的參數中提交的size,就會返回task_id爲0,不然就不爲0。本地會根據這個參數來判斷是否結束上傳。
2. 完整優酷上傳sdk代碼
"""Youku cloud Python Client doc: http://cloud.youku.com/docs/doc?id=110 """ import os import requests import json import time import hashlib import socket import logging from time import sleep from util import check_error, YoukuError if not os.path.exists('/var/log/youku/'): os.makedirs('/var/log/youku') log = logging.getLogger() log.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S") stream_handler = logging.FileHandler( '/var/log/youku/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time())))) stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(fmt) log.addHandler(stream_handler) class YoukuUpload(object): """Youku Upload Client. Upload file to Youku Video. Support resume upload if interrupted. Should use one instance of YoukuUpload for one upload file in one thread, since it has internal state of upload process. doc: http://cloud.youku.com/docs/doc?id=110 """ def __init__(self, client_id, access_token, file, v_vid = None, logger=None): """ Args: file: string, include path and filename for open(). filename must contain video file extension. """ super(YoukuUpload, self).__init__() self.client_id = client_id self.access_token = access_token self.v_vid = v_vid # 上傳的視頻id號 self.logger = logger or logging.getLogger(__name__) # file info self.file = file self.file_size = os.path.getsize(self.file) # int 獲取文件的大小 self.file_dir, self.file_name = os.path.split(self.file) # string 分割路徑來獲取文件名 if self.file_dir == '': self.file_dir = '.' self.file_ext = self.file_name.rsplit('.', 1)[1], # file extension self.file_md5 = None # string, do later # upload state self.upload_token = None # string self.upload_server_ip = None # string self.slice_task_id = None # int self.slice_offset = None # int self.slice_length = None # int self.transferred = None # int for bytes has uploaded self.finished = False # boolean # resume upload state self._read_upload_state_from_file() def prepare_video_params(self, title=None, tags='Others', description='', copyright_type='original', public_type='all', category=None, watch_password=None, latitude=None, longitude=None, shoot_time=None ): # 準備視頻因此須要的一些參數 """ util method for create video params to upload. Only need to provide a minimum of two essential parameters: title and tags, other video params are optional. All params spec see: http://cloud.youku.com/docs?id=110#create . Args: title: string, 2-50 characters. tags: string, 1-10 tags joind with comma. description: string, less than 2000 characters. copyright_type: string, 'original' or 'reproduced' public_type: string, 'all' or 'friend' or 'password' watch_password: string, if public_type is password. latitude: double. longitude: double. shoot_time: datetime. Returns: dict params that upload/create method need. """ params = {} if title is None: # 若是沒有傳title的話,title默認等於文件名 title = self.file_name elif len(title) > 50: # 若是title過長,長度大於50的話,就截取前50個字符做爲title title = title[:50] params['title'] = title params['tags'] = tags # tags就等於你傳進來的那個tags,默認爲others params['description'] = description # 描述默認爲空,能夠傳進來 params['copyright_type'] = copyright_type # 版權全部 original: 原創 reproduced: 轉載 params['public_type'] = public_type # 公開類型 all: 公開 friend: 僅好友觀看 password: 輸入密碼觀看 private: 私有 if category: params['category'] = category # 視頻分類 默認爲空,可傳 if watch_password: params['watch_password'] = watch_password # 觀看密碼,默認爲空,可傳 """ latitude/longitude用戶記錄地理位置信息 shoot_time用來標記視頻中正片的開始時間 """ if latitude: params['latitude'] = latitude # 緯度 if longitude: params['longitude'] = longitude # 經度 if shoot_time: params['shoot_time'] = shoot_time log.debug("prepare_video_params:%s" % params) return params def create(self, params): # prepare file info params['file_name'] = self.file_name params['file_size'] = self.file_size # 在__init__中獲取到了,也就是在類的實例化時就已經根據文件的地址獲取了文件的大小 params['file_md5'] = self.file_md5 = self.checksum_md5file(self.file) # 將文件信息作md5 校驗,根據文件名打開文件,而後每次讀取8192B,進行md5更新後再轉爲十六進制返回 self.logger.info('upload file %s, size: %d bytes' % (self.file_name, self.file_size)) self.logger.info('md5 of %s: %s' % (self.file_name, self.file_md5)) params['client_id'] = self.client_id # client_id params['access_token'] = self.access_token # access_token url = 'https://api.youku.com/uploads/create.json' r = requests.get(url, params=params)# 以get方法將文件信息發送到'https://api.youku.com/uploads/create.json' check_error(r, 201) result = r.json() log.debug("file--->vid:%s,return_result:%s" % (self.v_vid, result)) self.upload_token = result['upload_token'] self.logger.info('upload token of %s: %s' % (self.file_name, self.upload_token)) self.upload_server_ip = socket.gethostbyname( result['upload_server_uri']) self.logger.info('upload_server_ip of %s: %s' % (self.file_name, self.upload_server_ip)) log.debug("file_vid:%s ip:%s" % (self.v_vid, self.upload_server_ip)) def _save_upload_state_to_file(self): """if create and create_file has execute, save upload state to file for next resume upload if current upload process is interrupted. """ # 保存文件的上傳信息,先判斷文件是否可寫、可讀、可執行 # 保存的信息包括文件的上傳upload_token.上傳到的服務器ip # 保存的文件會在上傳完畢以後刪除 if os.access(self.file_dir, os.W_OK | os.R_OK | os.X_OK): save_file = '/tmp' + 'youku.upload' data = { 'upload_token': self.upload_token, 'upload_server_ip': self.upload_server_ip } with open(save_file, 'w') as f: json.dump(data, f) def _read_upload_state_from_file(self): save_file = '/tmp' + 'youku.upload' try: with open(save_file) as f: data = json.load(f) self.upload_token = data['upload_token'] self.upload_server_ip = data['upload_server_ip'] # check upload_token expired try: self.check() except YoukuError, e: if e.code == 120010223: # Expired upload token self.upload_token = None self.upload_server_ip = None self._delete_upload_state_file() except: pass def _delete_upload_state_file(self): try: os.remove('/tmp' + 'youku.upload') except: pass def checksum_md5file(self, filename): md5 = hashlib.md5() with open(filename, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): md5.update(chunk) return md5.hexdigest() def checksum_md5data(self, data): md5 = hashlib.md5() md5.update(data) return md5.hexdigest() def create_file(self): params = { 'upload_token': self.upload_token, 'file_size': self.file_size, # Byte 'ext': self.file_ext, 'slice_length': 2048 # KB } # 上傳文件每次傳2048KB url = 'http://%s/gupload/create_file' % self.upload_server_ip r = requests.post(url, data=params) check_error(r, 201) # save upload state to resume upload self._save_upload_state_to_file() def new_slice(self): params = { 'upload_token': self.upload_token } url = 'http://%s/gupload/new_slice' % self.upload_server_ip r = requests.get(url, params=params) check_error(r, 201) self._save_slice_state(r.json()) def _save_slice_state(self, result): # 更新切片狀態 self.slice_task_id = result['slice_task_id'] self.slice_offset = result['offset'] self.slice_length = result['length'] self.transferred = result['transferred'] self.finished = result['finished'] def upload_slice(self): # 上傳文件切片 seek():移動文件讀取指針到指定位置 data = None with open(self.file, 'rb') as f: f.seek(self.slice_offset) data = f.read(self.slice_length) # 2048 params = { 'upload_token': self.upload_token, 'slice_task_id': self.slice_task_id, 'offset': self.slice_offset, 'length': self.slice_length, # Byte 'hash': self.checksum_md5data(data) } url = 'http://%s/gupload/upload_slice' % self.upload_server_ip r = requests.post(url, params=params, data=data) check_error(r, 201) self._save_slice_state(r.json()) def check(self): params = { 'upload_token': self.upload_token } url = 'http://%s/gupload/check' % self.upload_server_ip r = requests.get(url, params=params) check_error(r, 200) return r.json() def commit(self): status = self.check()# 檢查上傳狀態 if status['status'] == 4: raise ValueError('upload has not complete, should not commit') while status['status'] != 1: # status is 2 or 3 sleep(10) status = self.check() params = { 'access_token': self.access_token, 'client_id': self.client_id, 'upload_token': self.upload_token, 'upload_server_ip': status['upload_server_ip'] } url = 'https://api.youku.com/uploads/commit.json' r = requests.post(url, data=params) check_error(r, 200) self.finished = True self._delete_upload_state_file()# 刪除記錄視頻上傳信息的文件 log.debug("sdk---->vid:%s youku video_id:%s" % (self.v_vid, r.json()['video_id'])) return r.json()['video_id'] def cancel(self): status = self.check() params = { 'access_token': self.access_token, 'client_id': self.client_id, 'upload_token': self.upload_token, 'upload_server_ip': status['upload_server_ip'] } url = 'https://api.youku.com/uploads/cancel.json' r = requests.get(url, params=params) check_error(r, 200) self._delete_upload_state_file() return r.json()['upload_token'] def spec(self): url = 'https://api.youku.com/schemas/upload/spec.json' r = requests.get(url) check_error(r, 200) return r.json() def transferred_percent(self): """return current transferred percent """ return int(self.transferred / self.file_size) def upload(self, params={}): """start uploading the file until upload is complete or error. This is the main method to used, If you do not care about state of process. Args: params: a dict object describe video info, eg title, tags, description, category. all video params see the doc of prepare_video_params. Returns: return video_id if upload successfully """ if self.upload_token is not None: # resume upload status = self.check() if status['status'] != 4: return self.commit() else: self.new_slice() while self.slice_task_id != 0: self.upload_slice() return self.commit() else: # new upload try: log.debug('youku upload params:%s' % params) # 記錄上傳參數 except Exception as e: pass self.create(self.prepare_video_params(**params)) # 建立上傳 self.create_file() self.new_slice() while self.slice_task_id != 0: self.upload_slice() return self.commit()