在遠程搭建了一個平臺,經過改遠程平臺進行數據的採集,須要將數據內容傳送至本地進行處理;爲了實現該功能,考慮了阿里雲的OSS對象儲存的服務。html
40G包月只需1元:-)python
甚至還有客服致電給你,說有問題可直接經過電話聯繫對方,15星好評緩存
關於帳號註冊,開通服務等等功能直接去阿里雲的官方進行相應操做便可ide
安裝python-devel
函數
apt-get install python-dev
安裝OSS:pip3 install oss2 -i https://pypi.tuna.tsinghua.edu.cn/simple/
網站
In [1]: import oss2 In [2]: oss2.__version__ Out[2]: '2.8.0'
方式一:直接去網站頁面進行建立便可;阿里雲
方式二:經過代碼建立設計
import oss2 auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret') # Endpoint以杭州爲例,其它Region請按實際狀況填寫。yourBucketName就是你要建立的Bucket bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName') bucket.create_bucket(oss2.models.BUCKET_ACL_PRIVATE)
yourBucketName
爲你的Bucket名稱,若已存在則會報錯;code
yourAccessKeySecret
,別告訴別哈:)server
auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret') bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')
# 此處上傳了圖片文件 image_path = './test.jpg' put_result = bucket.put_object_from_file('test.jpg', image_path) if put_result.status == 200: # 若此時的status狀態爲200,則說明上傳成功;在你選擇的Bucket中已經存在文件名爲'test.jpg'的文件了 print('put success')
# param1:oss上bucket中的文件名 # param2:保存在當地的文件路徑+文件名 get_result = bucket.get_object_to_file('test01.jpg', './get/test.jpg') if get_result == 200: print("get sucess")
import oss2 import time import cv2 import os import RPi.GPIO as GPIO class Camera(object): """攝像的初始化,配置""" def __init__(self, channel): self.capture = cv2.VideoCapture(channel) self.fps = int(self.capture.get(cv2.CAP_PROP_FPS)) self.video_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.video_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.video_width) self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.video_height) self.capture.set(cv2.CAP_PROP_FPS, self.fps) # 設置攝像頭的緩存圖片爲1,爲了能獲取實時的圖像 self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 1) def get_pic(self, image_dir=None, image_name=None, is_show=False, is_write=False): """ 寫入照片 """ if self.capture.isOpened(): # 讀取緩存中的圖片,起到清空緩存的做用 abandon_ret, abandon_frame = self.capture.read() # 用於處理/上傳的圖片 ret, frame = self.capture.read() # 判斷圖片信息是否讀取成功 if ret is False: print('get picture failed') return None # 是否須要顯示 if is_show is True: cv2.imshow('image', frame) cv2.waitKey(200) # 是否須要寫入到文件中 if is_write is True: cv2.imwrite(image_dir + '/' + image_name, frame) print('get picture success') return frame def release_camera(self): """ 釋放攝像機資源 """ self.capture.release() cv2.destroyAllWindows() class oss(object): """對象存儲類,將圖片傳至阿里雲端""" def __init__(self): self.auth = oss2.Auth('LTAIkA49HkgVYUsW', '**********************') self.bucket = oss2.Bucket(self.auth, 'http://oss-cn-hangzhou.aliyuncs.com', '20190731') def put_image(self, image_dir, image_name): put_result = self.bucket.put_object_from_file(image_name, image_dir + '/' + image_name) # 狀態爲200表示成功的上傳 if put_result.status == 200: print('put success') def get_image(self): pass class gpio(object): """觸發信號的控制""" event_flag = False # 判斷是否有電平觸發信號 def __init__(self, pin): """GPIO初始化""" self.pin = pin # 設置引腳應用模式 GPIO.setmode(GPIO.BCM) GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # 設計引腳觸發模式爲:上升沿觸發模式,防抖的時間爲1000ms GPIO.add_event_detect(self.pin, GPIO.RISING, callback=gpio.callback, bouncetime=1000) @staticmethod def callback(flag): """回調函數""" gpio.event_flag = True def stop_event(self): GPIO.remove_event_detect(self.pin) def start_event(self): GPIO.add_event_detect(self.pin, GPIO.RISING, callback=gpio.callback, bouncetime=1000) @staticmethod def release(): """GPIO釋放引腳資源""" GPIO.cleanup() if __name__ == "__main__": picture = None sleep_time = 0 picture_path = './put' picture_name = '.jpg' gpio_pin = 18 # oss配置 oss_server = oss() # 開啓攝像頭 camera = Camera(0) # GPIO初始化 event = gpio(gpio_pin) while True: if gpio.event_flag is True: gpio.event_flag = False # 標誌位置位 event.stop_event() # 關閉事件觸發 picture_name = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())) + '.jpg' # 獲取圖片 picture = camera.get_pic(picture_path, picture_name, is_show=True, is_write=True) if picture is not None: pass # oss_server.put_image(picture_path, picture_name) else: camera.release_camera() camera = Camera(0) # 刪除照片,防止內存爆炸 os.remove(picture_path + '/' + picture_name) event.start_event() # 開啓事件觸發 # time.sleep(sleep_time) gpio.release()
https://ptorch.com/news/209.html