標籤(空格分隔): 大數據平臺構建css
- 一: Streamset 簡介與系統環境介紹
- 二: 安裝軟件準備
- 三: 在CDH5.14.4 集成使用StreamSets
- 四: streamsets 基本使用案例運行
1.1: StreamSet 簡介html
StreamSets由Informatica前首席產品官Girish Pancha和Cloudera前開發團隊負責人Arvind Prabhakar於2014年創立。他們成立該公司主要是應對來自動態數據(data in motion)的挑戰 - 包括數據源,數據處理和數據自己,這是一個稱爲「數據漂移「(https://streamsets.com/reports/data-drift/)的問題。StreamSets設想從頭開始管理數據流,避免已有產品和工具的缺陷,並啓用一種管理動態數據(data in motion)的新方法。 最新的產品StreamSets Dataflow Performance Manager,也叫DPM,主要用於構建端到端的數據流。DPM是一個運行控制中心,能夠讓你映射(數據流),內置的測量和監測確保持續的數據傳輸和控制動態數據(data in motion)的性能。首先,它將你不一樣的數據流映射到支持你的每一個關鍵業務流程的拓撲中。而後監測這些拓撲的平常運行狀況,根據掌握的性能狀況,以知足應用的SLA爲目標,確保你始終提供及時和可信的數據。 StreamSet的架構處理數據流程
下載:StreamSet https://archives.streamsets.com/index.html 下載文件: STREAMSETS-3.0.0.0.jar STREAMSETS_DATACOLLECTOR-3.0.0.0-el7.parcel manifest.json
yum install -y httpd* service httpd start chkconfig httpd on mkdir -p /var/×××w/html/streamsets cp -p STREAMSETS-3.0.0.jar /opt/cloudera/csd/ chown cloudera-scm:cloudrea-scm -R /opt/cloudera/csd/ mv manifest.json /var/×××w/html/streamset/ mv STREAMSETS_DATACOLLECTOR-3.3.0-el7.parcel /var/×××w/html/streamset/ 從啓cdh的CM 服務器 service cloudera-scm-server restart
重啓cloudera-scm-server cd /etc/init.d/ ./cloudera-scm-server restart
默認用戶名:admin 密碼:admin
準備工做: 從官網下載測試數據 https://×××w.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
建立測試目錄並賦予權限: mkdir -p /flyfish/test_stream mkdir /flyfish/test_stream/data mkdir /flyfish/test_stream/error mkdir /flyfish/test_stream/out chmod -R 777 /flyfish/test_stream
將測試數據拷貝到 /flyfish/test_stream/data 目錄下 cp -p nyc_taxi_data.csv /flyfish/test_stream/data
點擊dataFormat 標籤,修改選擇以下選擇
預覽文件
添加流選擇器 ${record:value('/payment_type') == 'CRD'}
腳本放在 Jython >configuration>Jython>Script 中
try: for record in records: cc = record.value['credit_card'] if cc == '': error.write(record, "Payment type was CRD, but credit card was null") continue cc_type = '' if cc.startswith('4'): cc_type = 'Visa' elif cc.startswith(('51','52','53','54','55')): cc_type = 'MasterCard' elif cc.startswith(('34','37')): cc_type = 'AMEX' elif cc.startswith(('300','301','302','303','304','305','36','38')): cc_type = 'Diners Club' elif cc.startswith(('6011','65')): cc_type = 'Discover' elif cc.startswith(('2131','1800','35')): cc_type = 'JCB' else: cc_type = 'Other' record.value['credit_card_type'] = cc_type output.write(record) except Exception as e: error.write(record, e.message)
使用Field Masker來屏蔽信用卡的信息
配置寫入目的地
流程預覽測試
添加Expression Evaluator處理器
編輯流運行與輸出