跨雲平臺存儲文件同步熱備(aws s3 bucket文件自動同步到阿里oss和微軟雲blob)

功能實現: 實現跨雲平臺存儲資源自動熱備,aws存儲s3文件備份到阿里雲oss和微軟雲blobjson

環境:app

  1.  配置aws sqs隊列權限,容許指定的s3桶所有權限
  2. 配置aws s3的「事件」,當有「全部對象建立事件」發送通知消息到sqs隊列,s3桶需具備公網訪問權限
  3. 建立aws祕鑰(具備s3桶上傳權限和sqs隊列消息下載權限)
  4. 建立阿里雲oss對象存儲和bucket,配置上傳祕鑰
  5. 建立微軟雲存儲對象和blob、container,配置上傳祕鑰

 

配置文件 application.yaml:dom

aws: sqs:   queue.sqsurl: XXXXXXXXXXXXXXXXXXXXXXXXXX   accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX   securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX   region: xxxxxxx s3:   bucket: XXXXXXXXXXXXXXXXXXXXXXXXXX   accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX   securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX   region: xxxxxxx azure:   account: XXXXXXXXXXXXXXXXXXXXXXXXXX   blobContainer: XXXXXXXXXXXXXXXXXXXXXXXXXX   accountKey: XXXXXXXXXXXXXXXXXXXXXXXXXX   endpointSuffix: XXXXXXXXXXXXXXXXXXXXXXXXXX aliyun:   ossdomain: XXXXXXXXXXXXXXXXXXXXXXXXXX   ossbucket: XXXXXXXXXXXXXXXXXXXXXXXXXX   ramaccessKeyID: XXXXXXXXXXXXXXXXXXXXXXXXXX   ramaccessKeySecret: XXXXXXXXXXXXXXXXXXXXXXXXXX logging:   config: config/log4j.yaml

 

 方式一(單進程):ide

# encoding=utf-8

import os import sys import argparse import boto3 import json import yaml import Queue import logging import logging.config import urllib import oss2 import time import datetime from azure.storage.blob import BlockBlobService def parse_config_file(config_file): local_path = config_file print ("LOCAL PATH: " + local_path) with open(local_path) as stream: config = json.dumps(yaml.load(stream, Loader=yaml.FullLoader)) return config class Objhandle(object): def __init__(self, str): config = json.loads(str) self.sqs_accessKey = config['aws']['sqs']['accessKey'] self.sqs_securityKey = config['aws']['sqs']['securityKey'] self.sqs_queue = config['aws']['sqs']['queue.sqsurl'] self.sqs_region = config['aws']['sqs']['region'] self.s3_bucket = config['aws']['s3']['bucket'] self.s3_accessKey = config['aws']['s3']['accessKey'] self.s3_securityKey = config['aws']['s3']['securityKey'] self.s3_region = config['aws']['s3']['region'] self.oss_domain = config['aliyun']['ossdomain'] self.oss_bucket = config['aliyun']['ossbucket'] self.oss_accessKey = config['aliyun']['ramaccessKeyID'] self.oss_accessSecret = config['aliyun']['ramaccessKeySecret'] self.azure_account = config['azure']['account'] self.azure_container = config['azure']['blobContainer'] self.azure_accountKey = config['azure']['accountKey'] self.azure_endpointSuffix = config['azure']['endpointSuffix'] self.log_config = config['logging']['config'] self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey) self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey) self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret) self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket) self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix) def _queue_handle(self): sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5) if sqs_response['Messages']: msg = [] num = len(sqs_response['Messages']) for i in range(num): message = sqs_response['Messages'][i] receipt_handle = message['ReceiptHandle'] body = json.loads(message['Body']) key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8") if not key.endswith('/'): key_path = os.path.split(key)[0].encode("utf-8") # begin download files
                    local_path = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path) local_file = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key) if not os.path.exists(local_path): os.makedirs(local_path) try: self.s3_client.download_file(self.s3_bucket, key, local_file) except Exception as e: logging.error(e) else: msg_tuple = (key, local_file, receipt_handle) msg.append(msg_tuple) return msg def upload_handle(self): msg = self._queue_handle() for i in range(len(msg)): key, local_file, receipt_handle = msg[i] self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4) oss2.resumable_upload(self.oss_service, key, local_file, store=oss2.ResumableStore(root='/tmp/temp'), multipart_threshold=10 * 1024 * 1024, part_size=10 * 1024 * 1024, num_threads=4) if __name__ == '__main__': config = parse_config_file('config/application.yaml') obj = Objhandle(config) msg = obj.upload_handle()
View Code

 

 

方式二(多進程):阿里雲

# encoding=utf-8

import os import sys import argparse import boto3 import json import yaml import Queue import logging import logging.config import urllib import oss2 import time import datetime from azure.storage.blob import BlockBlobService from multiprocessing import Process # set up logging 
def setup_logging(default_path = "config/logging.yaml",default_level = logging.INFO): path = default_path if os.path.exists(path): with open(path,"r") as f: config = yaml.load(f) logging.config.dictConfig(config) else: logging.basicConfig(level = default_level) def parse_config_file(config_file): local_path = config_file print ("LOCAL PATH: " + local_path) with open(local_path) as stream: config = json.dumps(yaml.load(stream)) return config class Objhandle(Process): def __init__(self, str): super(Objhandle, self).__init__() config = json.loads(str) self.sqs_accessKey = config['aws']['sqs']['accessKey'] self.sqs_securityKey = config['aws']['sqs']['securityKey'] self.sqs_queue = config['aws']['sqs']['queue.sqsurl'] self.sqs_region = config['aws']['sqs']['region'] self.s3_bucket = config['aws']['s3']['bucket'] self.s3_accessKey = config['aws']['s3']['accessKey'] self.s3_securityKey = config['aws']['s3']['securityKey'] self.s3_region = config['aws']['s3']['region'] self.oss_domain = config['aliyun']['ossdomain'] self.oss_bucket = config['aliyun']['ossbucket'] self.oss_accessKey = config['aliyun']['ramaccessKeyID'] self.oss_accessSecret = config['aliyun']['ramaccessKeySecret'] self.azure_account = config['azure']['account'] self.azure_container = config['azure']['blobContainer'] self.azure_accountKey = config['azure']['accountKey'] self.azure_endpointSuffix = config['azure']['endpointSuffix'] self.log_config = config['logging']['config'] self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey) self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey) self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret) self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket) self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix) def _queue_handle(self): msg = [] sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5) if 'Messages' in sqs_response.keys(): num = len(sqs_response['Messages']) for i in range(num): message = sqs_response['Messages'][i] receipt_handle = message['ReceiptHandle'] body = json.loads(message['Body']) action = body['Records'][0]['eventName'] key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8") if action == "ObjectCreated:Put" and not key.endswith('/'): key_path = os.path.split(key)[0].encode("utf-8") # begin download files
                    local_path = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path) local_file = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key) if not os.path.exists(local_path): os.makedirs(local_path) try: self.s3_client.download_file(self.s3_bucket, key, local_file) except Exception as e: logging.error("Download object error %s", e) else: msg_tuple = (key, local_file, receipt_handle) msg.append(msg_tuple) else: self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle) else: time.sleep(20) logger.info('Queue messages handle') return msg def run(self): while True: msg = self._queue_handle() for i in range(len(msg)): key, local_file, receipt_handle = msg[i] try: self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4) self.oss_service.put_object_from_file(key, local_file) except Exception as e: logging.error("upload error %s", e) else: logger.info("upload successed %s" , key) self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle) if os.path.exists(local_file): os.remove(local_file) if __name__ == '__main__': config = parse_config_file('config/application.yaml') # set logging from logfile
    if os.path.exists('config/log4j.yaml'): setup_logging(default_path = "config/log4j.yaml") logger = logging.getLogger(__name__) for i in range(4): p = Objhandle(config) p.start()
View Code
相關文章
相關標籤/搜索