1、ElasticSearch的備份與還原html
============================================================================================================================================================
1. 問題
Elasticsearch 副本提供了高可靠性;它們讓你能夠容忍零星的節點丟失而不會中斷服務。可是,副本並不提供對災難性故障的保護。對這種狀況,你須要的是對集羣真正的備份——在某些東西確實出問題的時候有一個完整的拷貝。
2.解決方案
經過快照的方式,將Elasticsearch集羣中的數據,備份到HDFS上,這樣數據即存在於Elasticsearch(簡稱ES)集羣當中,又存在於HDFS上。當ES集羣出現不可恢復性的故障時,能夠將數據從HDFS上快速恢復。
也能夠選擇NFS方式進行備份數據:https://www.cnblogs.com/keithtt/p/7189493.html
三、須要進行的測試
(1)如何進行快照備份
(2)如何進行快照還原
(3)在歷史日誌時間過久,還不想完全刪除掉,想要歸檔,可使用快照方式嗎。若是之後還想用,怎麼還原回來?python
https://www.jianshu.com/p/d2a189f704a2mysql
能夠採用rename_pattern和rename_replacement用於重命名index,由於沒法將index還原到open的index。web
如何刪除N個月前的數據?sql
http://www.javashuo.com/article/p-glbdcfht-cv.html數據庫
http://www.javashuo.com/article/p-zpwkbamy-ck.htmljson
一、已知:能夠全庫備份,也能夠指定index備份。服務器
二、能不能備份指定條件的呢?不知道...意義不大,研究不研究無所謂。restful
三、若是按這個方案,那麼HDFS和ES的服務器在物理上要如何規劃?好比3臺物理服務器(什麼樣的配置CPU、內存、磁盤??)架構
組成VMWARE ESXI虛擬機集羣,2臺NTB的存儲,HDFS和ES應該如何避免放在一個籃子裏?這裏須要規劃和繪製示意圖,要求教育局購買。 存儲的容量如何規劃???好比100T ,教育局會問,能用多久?之後擴容怎麼辦?
是否是須要單獨購買硬件防火牆??DDOS攻擊怎麼辦??
============================================================================================================================================================
2、總體的技術架構圖
3、各業務系統數據聚集的辦法
國內同類型軟件的實現方法:
http://www.ruisoft.com/solution-smartcity-01.html
http://www.chinawiserv.com/home/news/detail/id/526
各業務系統須要提供全量和增量數據上報,增量數據上報確定會要求上報最近新增、修改、刪除的數據,最簡單的辦法是使用觸發器,下面給出測試過的各主流數據庫觸發器方案:
http://www.javashuo.com/article/p-odpecxgl-bx.html
特殊須要說明的是:對於物理性刪除的delete狀況,咱們須要規劃一個deleted表來記錄刪除的主鍵ID。
(1) SqlServer
若是業務庫是SqlServer的話,能夠參考這篇文章:
http://www.cnblogs.com/iampkm/p/4082916.html (時間戳方案)
https://blog.csdn.net/yenange/article/details/49636215 (CDC方案)
(2) Mongodb
若是業務庫是Mongodb的話,能夠參考這篇文章:
https://blog.csdn.net/u013066244/article/details/80004153
(3)Mysql
若是業務庫是Mysql的話,還須要特殊注意下這個:
若是Mysql數據庫版本低於5.6,能夠考慮使用觸發器方式:https://blog.csdn.net/heweimingming/article/details/51315895
(4) Oracle方案
https://blog.csdn.net/strawberry1019/article/details/79422071
僞列的方案不是很優雅,不建議使用,由於alter table無效。
4、數據聚集有兩種方式:
(1)業務系統可能採起提供webservice ,http restful等形式,要求東師理想本身進行數據採集。 這個須要一個個接入。(推薦使用這種!!!)
缺點:須要單獨編碼實現,不編碼沒法完成。
優勢:東師接入能夠經過程序控制數據質量,及時發現問題。
(2) 業務系統經過東師理想大數據中心的API接口自行編碼進行數據上報。
優勢:沒有優點。
缺點:須要單獨編碼實現,不編碼沒法完成。
(3)業務系統不提供webservice,https restful等形式,主動進行數據上報。(前置機)
優勢:東師理想提供前置機代碼,經過簡單配置便可完成上報工做,業務系統無開發量,簡單。
缺點:須要將前置機部署到業務系統中,業務系統會比較反感。
1、全量和增量
其實全量和增量咱們能夠看作從哪一個時間戳開始的問題,從遙遠的古代開始,就是從頭開始。從昨天晚上18點開始,就能夠理解爲增量。其實真正的意思是同樣的。這個開始的位置,是由服務器維護的,業務系統在上報前須要申請開始位置。
2、定時和實時
這類的大數據聚集中心,與東師理想的大數據中心不一樣。東師理想的大數據中心,採用的是準實時的mysql+canal數據上報方式,延遲時間通常在1秒至3秒間(指正常工做狀況下,不出現故障的狀況下)。而長春市教育局的大數據中心,我的理解業務系統的數據聚集能夠是定時的,定時定長最小爲10分鐘,這已經足夠讓其完美的運行,如今流行的ETL工具等,都是按期調度進行數據變動聚集的思路,這無可厚非。
上面這些東西,都在一個主題:業務數據的增(改)、刪。
咱們的設計思路是,不論是增長,仍是修改,這條數據都要從新上報覆蓋掉舊的便可,因此,無所謂增長仍是修改。對於刪除,若是是僞刪除的話,即標識b_deleted=1這樣的刪除,其實就是修改,不是真刪除,沒必要討論。若是是真刪除,咱們要求必須提供在指定時間戳後,獲取刪除掉了哪些主鍵,而後咱們發送到大數據中心,對數據進行修改。
5、python前置機的功能組成
兩個線程
(1) 第一個線程是:正常的增量數據上報,數據掃描時間設置爲10分鐘。
(2) 第二個線程是:心跳,向數據中心報告其存在,1分鐘一次心跳。每次心跳時,向大數據中心獲取是否有任務讓其執行,好比:將某一個記錄上報,從某個時間點開始從新上報(這個能夠是增量,也能夠是全量,看起始時間點)。
這樣作的目的是架構最簡單,最清晰,最不易出錯。
業務系統提供一個只讀賬號給python程序,python程序以源碼形式提供給各業務系統,python程序不會把業務系統不容許的數據內容進行採集。因python程序也可能須要不斷的完善,全部應該是有兩個py文件,一個是update.py ,另外一個是senddata,py. update.py負責向oss檢查是否本地的senddata.py與oss上的同名文件是否md5值一致,不一致則下載回來,保證senddata.py 的自動更新。
6、核心 代碼
一、全量獲取es中的數據
#!user/bin/env python3 # -*- coding: gbk -*- # https://es.xiaoleilu.com/060_Distributed_Search/20_Scan_and_scroll.html from elasticsearch import Elasticsearch from elasticsearch import helpers es_servers = [{ 'host': 'data.edusoa.com', 'port': 9200 }] es = Elasticsearch(hosts=es_servers) index_v="t_wkds_info" doc_type_v="doc" # 方式1:ES的API中提供了scan和scroll,這個方法有點類型傳統數據庫中的遊標。 # query={"query" : {"match_all" : {}}} # scanResp = helpers.scan(es, query, scroll="10m", index=index_v, doc_type=doc_type_v, timeout="10m") # # for resp in scanResp: # print(len(scanResp)) # print(resp) # 方式2:按地區Area_Code='aly'爲例,以PK進行遍歷 query={ "query": { "bool": { "must": [ { "term": { "area_code": "aly" } }, { "range": { "pk": { "gt": 0 } } } ] } }, "size": 500 } # 起始值 startId=0 allCount=0 while startId>-1: query['query']['bool']['must'][1]['range']['pk']['gt']=startId _searched = es.search(index=index_v, doc_type=doc_type_v, body=query) # 輸出查詢到的結果 count=0 for hit in _searched['hits']['hits']: print(hit['_source'], flush=True) count=count+1 allCount=allCount+1 if count>0: startId=_searched['hits']['hits'][count-1]['_source']['pk'] print("allCount="+str(allCount)) else: startId=-1 print("成功結束!")
二、操做Kafka的代碼
http://www.javashuo.com/article/p-riqmndro-kt.html
https://www.lfd.uci.edu/~gohlke/pythonlibs/#pymssql
下載:pymssql-2.1.4.dev5-cp36-cp36m-win_amd64.whl
http://www.javashuo.com/article/p-vrdqibks-o.html
下載:cx_Oracle-6.3.1-cp36-cp36m-win_amd64.whl
https://pypi.org/project/cx_Oracle/
https://blog.csdn.net/mhmds/article/details/53079322
Oracle 11g Client x64
https://download.oracle.com/otn/nt/oracle11g/112010/win64_11gR2_client.zip
http://www.javashuo.com/article/p-nfhkcvmz-c.html