Python使用阿里雲OSS服務

Python使用阿里雲OSS服務

前言:

在遠程搭建了一個平臺,經過改遠程平臺進行數據的採集,須要將數據內容傳送至本地進行處理;爲了實現該功能,考慮了阿里雲的OSS對象儲存的服務。html

40G包月只需1元:-)python

甚至還有客服致電給你,說有問題可直接經過電話聯繫對方,15星好評緩存

OSS安裝

關於帳號註冊,開通服務等等功能直接去阿里雲的官方進行相應操做便可ide

  • 安裝python-devel函數

    • win:此過程不須要,在安裝Python時已經包含了;
    • Debian/Ubuntu:apt-get install python-dev
  • 安裝OSS:pip3 install oss2 -i https://pypi.tuna.tsinghua.edu.cn/simple/網站

    • 安裝完成後可用python進行驗證:
    In [1]: import oss2
    
    In [2]: oss2.__version__
    Out[2]: '2.8.0'

OSS使用

建立存儲空間:

  • 方式一:直接去網站頁面進行建立便可;阿里雲

  • 方式二:經過代碼建立設計

    import oss2
    auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
    # Endpoint以杭州爲例,其它Region請按實際狀況填寫。yourBucketName就是你要建立的Bucket
    bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')
    bucket.create_bucket(oss2.models.BUCKET_ACL_PRIVATE)

    yourBucketName爲你的Bucket名稱,若已存在則會報錯;code

    yourAccessKeySecret,別告訴別哈:)server

鏈接OSS:

auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')

上傳文件:

# 此處上傳了圖片文件
image_path = './test.jpg'
put_result = bucket.put_object_from_file('test.jpg', image_path)
if put_result.status == 200:
    # 若此時的status狀態爲200,則說明上傳成功;在你選擇的Bucket中已經存在文件名爲'test.jpg'的文件了
    print('put success')

下載文件:

# param1:oss上bucket中的文件名
# param2:保存在當地的文件路徑+文件名
get_result = bucket.get_object_to_file('test01.jpg', './get/test.jpg')
if get_result == 200:
    print("get sucess")

總體代碼:

  • 根據觸發信號獲取攝像頭的圖片;
  • 獲取攝像頭的實時圖片;
  • 將先圖片保存到本地->上傳阿里雲->在本地進行刪除
import oss2
import time
import cv2
import os
import RPi.GPIO as GPIO

class Camera(object):
    """攝像的初始化,配置"""
    def __init__(self, channel):
        self.capture = cv2.VideoCapture(channel)

        self.fps = int(self.capture.get(cv2.CAP_PROP_FPS))
        self.video_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self.video_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))

        self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.video_width)
        self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.video_height)
        self.capture.set(cv2.CAP_PROP_FPS, self.fps)
        # 設置攝像頭的緩存圖片爲1,爲了能獲取實時的圖像
        self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)

    def get_pic(self, image_dir=None, image_name=None, is_show=False, is_write=False):
        """
        寫入照片
        """
        if self.capture.isOpened():
            # 讀取緩存中的圖片,起到清空緩存的做用
            abandon_ret, abandon_frame = self.capture.read()
            # 用於處理/上傳的圖片
            ret, frame = self.capture.read()
            # 判斷圖片信息是否讀取成功
            if ret is False:
                print('get picture failed')
                return None
            # 是否須要顯示
            if is_show is True:
                cv2.imshow('image', frame)
                cv2.waitKey(200)
            # 是否須要寫入到文件中
            if is_write is True:
                cv2.imwrite(image_dir + '/' + image_name, frame)
            print('get picture success')
            return frame

    def release_camera(self):
        """
        釋放攝像機資源
        """
        self.capture.release()
        cv2.destroyAllWindows()


class oss(object):
    """對象存儲類,將圖片傳至阿里雲端"""
    def __init__(self):
        self.auth = oss2.Auth('LTAIkA49HkgVYUsW', '**********************')
        self.bucket = oss2.Bucket(self.auth, 'http://oss-cn-hangzhou.aliyuncs.com', '20190731')

    def put_image(self, image_dir, image_name):
        put_result = self.bucket.put_object_from_file(image_name, image_dir + '/' + image_name)
        # 狀態爲200表示成功的上傳
        if put_result.status == 200:
            print('put success')

    def get_image(self):
        pass


class gpio(object):
    """觸發信號的控制"""
    event_flag = False  # 判斷是否有電平觸發信號

    def __init__(self, pin):
        """GPIO初始化"""
        self.pin = pin
        # 設置引腳應用模式
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
        # 設計引腳觸發模式爲:上升沿觸發模式,防抖的時間爲1000ms
        GPIO.add_event_detect(self.pin, GPIO.RISING, callback=gpio.callback, bouncetime=1000)

    @staticmethod
    def callback(flag):
        """回調函數"""
        gpio.event_flag = True

    def stop_event(self):
        GPIO.remove_event_detect(self.pin)

    def start_event(self):
        GPIO.add_event_detect(self.pin, GPIO.RISING, callback=gpio.callback, bouncetime=1000)

    @staticmethod
    def release():
        """GPIO釋放引腳資源"""
        GPIO.cleanup()


if __name__ == "__main__":
    picture = None
    sleep_time = 0
    picture_path = './put'
    picture_name = '.jpg'
    gpio_pin = 18

    # oss配置
    oss_server = oss()
    # 開啓攝像頭
    camera = Camera(0)
    # GPIO初始化
    event = gpio(gpio_pin)

    while True:
        if gpio.event_flag is True:
            gpio.event_flag = False  # 標誌位置位
            event.stop_event()  # 關閉事件觸發
            picture_name = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())) + '.jpg'
            # 獲取圖片
            picture = camera.get_pic(picture_path, picture_name, is_show=True, is_write=True)

            if picture is not None:
                pass
                # oss_server.put_image(picture_path, picture_name)
            else:
                camera.release_camera()
                camera = Camera(0)
            # 刪除照片,防止內存爆炸
            os.remove(picture_path + '/' + picture_name)

            event.start_event()  # 開啓事件觸發
            # time.sleep(sleep_time)

    gpio.release()

參考:

https://ptorch.com/news/209.html

相關文章
相關標籤/搜索