大數據ETL 系列文章簡介
本系列文章主要針對ETL大數據處理這一典型場景,基於python語言使用Oracle、aws、Elastic search 、Spark 相關組件進行一些基本的數據導入導出實戰,如:html
- oracle使用數據泵impdp進行導入操做。
- aws使用awscli進行上傳下載操做。
- 本地文件上傳至aws es
- spark dataframe錄入ElasticSearch
等典型數據ETL功能的探索。python
系列文章:
1.大數據ETL實踐探索(1)---- python 與oracle數據庫導入導出
2.大數據ETL實踐探索(2)---- python 與aws 交互
3.大數據ETL實踐探索(3)---- pyspark 之大數據ETL利器
4.大數據ETL實踐探索(4)---- 之 搜索神器elastic search
5.使用python對數據庫,雲平臺,oracle,aws,es導入導出實戰
6.aws ec2 配置ftp----使用vsftpweb
本文主要介紹,使用python與典型雲平臺aws 進行交互的部分過程和經典代碼shell
簡介與實例
boto3 有了這個包,基本全部和aws 進行交互的庫均可以搞定了數據庫
aws 雲服務提供了一些基礎到高端的組合幫助咱們更好的進行交付,實現本身的想法。 我看過最經典的例子莫過於
利用 AWS Comprehend 打造近實時文本情感分析json
來自aws 官方技術博客的windows
下面咱們給出一些典型例子和場景代碼瀏覽器
讀寫本地數據到aws s3
upload csv to aws
使用awscli上傳大文件,固然直接瀏覽器上傳也行,可是好像超過4g會有問題。
Download Windows Installeroracle
When installed, use
AWS --version
to confirm whether it is normal
Single file upload eg.
AWS S3 --region cn-north-1 CP CL_CLLI_LOG.csv s3://xxxx/csv/
You can use the notepad++'s block pattern, edit each table into a command, and execute the bat file in the CMD,like below:
aws s3 --region cn-north-1 cp LOG1.csv s3://xxxx/csv/ aws s3 --region cn-north-1 cp LOG2.csv s3://xxxx/csv/
使用python 將本地文件寫入s3
def writeJsonToS3(json,aws_access_key,aws_secret_access_key): client = boto3.client('s3', 'cn',aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_access_key) filename = "_".join(['result',datetime.datetime.now().strftime('%Y%m%d%H%M%S'),'score']) + '.csv' bucket_name = '...' route = '...' client.put_object(Body=json,Bucket=bucket_name, Key=route + filename ) logger.info("score result Added to S3") file_url = "https://.../{0}/{1}".format(bucket_name,filename) logger.info(image_url)
讀出kinesis 中數據
def get_stream_data(stream_name, limit, timedelta): client = boto3.client('kinesis', 'cn', aws_access_key_id='',aws_secret_access_key='') if stream_name: stream = client.describe_stream(StreamName=stream_name)['StreamDescription'] for shard in stream['Shards']: print ("### %s - %s"%(stream_name, shard['ShardId'])) shard_iterator = client.get_shard_iterator( StreamName=stream_name, ShardId=shard['ShardId'], ShardIteratorType='AT_TIMESTAMP', #'TRIM_HORIZON'|'LATEST' Timestamp=datetime.datetime.utcnow() - datetime.timedelta(minutes=timedelta) )['ShardIterator'] for i in range(0,1): out = client.get_records(ShardIterator=shard_iterator, Limit=limit) if out["Records"]: for record in out["Records"]: data = json.loads(record["Data"]) print (data) break else: print (out) else: print ("Need stream name !!!")
本文同步分享在 博客「shiter」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。