大數據ETL實踐探索(2)---- python 與aws 交互


大數據ETL 系列文章簡介

本系列文章主要針對ETL大數據處理這一典型場景,基於python語言使用Oracle、aws、Elastic search 、Spark 相關組件進行一些基本的數據導入導出實戰,如:html

  • oracle使用數據泵impdp進行導入操做。
  • aws使用awscli進行上傳下載操做。
  • 本地文件上傳至aws es
  • spark dataframe錄入ElasticSearch

等典型數據ETL功能的探索。python

系列文章:
1.大數據ETL實踐探索(1)---- python 與oracle數據庫導入導出
2.大數據ETL實踐探索(2)---- python 與aws 交互
3.大數據ETL實踐探索(3)---- pyspark 之大數據ETL利器
4.大數據ETL實踐探索(4)---- 之 搜索神器elastic search
5.使用python對數據庫,雲平臺,oracle,aws,es導入導出實戰
6.aws ec2 配置ftp----使用vsftpweb


本文主要介紹,使用python與典型雲平臺aws 進行交互的部分過程和經典代碼shell

簡介與實例

boto3 有了這個包,基本全部和aws 進行交互的庫均可以搞定了數據庫

aws 雲服務提供了一些基礎到高端的組合幫助咱們更好的進行交付,實現本身的想法。 我看過最經典的例子莫過於
利用 AWS Comprehend 打造近實時文本情感分析json

在這裏插入圖片描述

來自aws 官方技術博客的windows

下面咱們給出一些典型例子和場景代碼瀏覽器

讀寫本地數據到aws s3

upload csv to aws

使用awscli上傳大文件,固然直接瀏覽器上傳也行,可是好像超過4g會有問題。
Download Windows Installeroracle

Https://docs.aws.amazon.com/zh_cn/cli/latest/userguide/awscli-install-windows.html#awscli-install-windows-pathide

When installed, use

AWS --version

to confirm whether it is normal

Single file upload eg.

AWS S3 --region cn-north-1 CP CL_CLLI_LOG.csv s3://xxxx/csv/

You can use the notepad++'s block pattern, edit each table into a command, and execute the bat file in the CMD,like below:

aws s3 --region cn-north-1 cp LOG1.csv s3://xxxx/csv/ 
aws s3 --region cn-north-1 cp LOG2.csv s3://xxxx/csv/

使用python 將本地文件寫入s3

def writeJsonToS3(json,aws_access_key,aws_secret_access_key):
    client = boto3.client('s3', 'cn',aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_access_key)
    
    filename = "_".join(['result',datetime.datetime.now().strftime('%Y%m%d%H%M%S'),'score']) + '.csv'
        
        
    bucket_name  = '...'
    route = '...'
        
    client.put_object(Body=json,Bucket=bucket_name, Key=route + filename )
                          
    logger.info("score result Added to S3")
    file_url = "https://.../{0}/{1}".format(bucket_name,filename)
        
    logger.info(image_url)

讀出kinesis 中數據

def get_stream_data(stream_name, limit, timedelta):
    
    client = boto3.client('kinesis', 'cn', aws_access_key_id='',aws_secret_access_key='')
    if stream_name:
        stream = client.describe_stream(StreamName=stream_name)['StreamDescription']

        for shard in stream['Shards']:
            print ("### %s - %s"%(stream_name, shard['ShardId']))
            shard_iterator = client.get_shard_iterator(
                StreamName=stream_name,
                ShardId=shard['ShardId'],
                ShardIteratorType='AT_TIMESTAMP',  #'TRIM_HORIZON'|'LATEST'
                Timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=timedelta)
            )['ShardIterator']
            
            for i in range(0,1):
                out = client.get_records(ShardIterator=shard_iterator, Limit=limit)
                if out["Records"]:
                    for record in out["Records"]:
                        data = json.loads(record["Data"])
                        print (data)
                    break
                else:
                    print (out)

                    
    else:
        print ("Need stream name !!!")

本文同步分享在 博客「shiter」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索