北京的冬天颳起風來仍是比較冷的,在這2019年的最後一天,我也該回顧一下這一年的時光了,這一年對我來講都是很重要的一年,既是轉折也是決定,既是順勢也是波折。固然寫一篇隨筆來訴苦是沒有質量的文章。言歸正傳,咱們依然從技術和其餘來完結這篇總結。redis
2019年技術大事記,按照時間順序。sql
mongodb的監控change-stream 多線程開發。mongodb
kafka運維實踐。多線程
hbase運維實踐。運維
spark運維實踐。socket
ESB培訓與運維。ide
監控主體是mongoDB經過change-stream發送到kafka的數據,對比手段經過mongo的query語句查找op-log的數據,和消費kafka中的數據進行比對,對比過程都放在hbase中對比。邏輯關係間下圖。綠色部分就是監控要作的事情。ui
監控是多線程的劃分就是根據kafka中的消息隊列topic和處理邏輯區分的,好比要消費topic1的數據,就會有以下線程:消費topic1的線程,查找mongodb中topic1中表的數據的線程,處理hbase中topic1先關數據的線程,就是說三個線程監控一個topic,2個線程從mongo和kafka拉取數據,一個線程處理數據,處理數據在hbase的表中,至關於一個topic對應一個隊列。如今列一下比較重要的代碼。spa
kafka消費比較簡單,使用KafkaConsumer消費者中的低階API assign方法,其中須要注意的是記錄offset到redis,若是redis存在正常offset則使用這個offset,不然從最新位置開始消費。線程
1 consumer = new KafkaConsumer<String, String>(props); 2 /*consumer.subscribe(Arrays.asList(topic)); 3 consumer.seekToEnd();*/ 4 TopicPartition partitions = new TopicPartition(topic, partition); 5 consumer.assign(Arrays.asList(partitions)); 6 if(RedisCheck==0){ 7 //使用assign方式進行消費 8 consumer.seekToEnd(partitions); 9 }else{ 10 consumer.seek(partitions, RedisCheck); 11 };
mongodb中使用sql語句查找指定表的數據,核心代碼是,須要注意要鏈接secondary的節點:
1 public void init(){ 2 List<ServerAddress> addrs = new ArrayList<ServerAddress>(); 3 String mongoServer = PosterMogo.MongoServer; 4 if(mongoServer==null){ 5 mongoServer="host1:27018,host2:27018"; 6 } 7 if(mongoServer.indexOf(",")>0){ 8 String[] strs = mongoServer.split(","); 9 for(String str:strs){ 10 addrs.add(new ServerAddress(str.split(":")[0], Integer.valueOf(str.split(":")[1]))); 11 } 12 }else{ 13 addrs.add(new ServerAddress(mongoServer.split(":")[0], Integer.valueOf(mongoServer.split(":")[1]))); 14 } 15 16 Builder options = new MongoClientOptions.Builder().readPreference(ReadPreference.secondary()); 17 options.cursorFinalizerEnabled(true); 18 //options.maxConnectionLifeTime(0); 19 options.connectionsPerHost(3); 20 options.connectTimeout(6000000); 21 options.maxWaitTime(6000000); 22 options.socketTimeout(6000000); 23 options.maxConnectionIdleTime(6600000); 24 options.socketKeepAlive(true).heartbeatFrequency(1000).connectTimeout(6000000); 25 options.threadsAllowedToBlockForConnectionMultiplier(300); 26 27 options.build(); 28 String[] identified = null; 29 if(PosterMogo.MongoIdentify==null){ 30 identified="root,admin,842AB52563B13D332916113FB8DFA9BF".split(","); 31 }else{ 32 identified = PosterMogo.MongoIdentify.split(","); 33 } 34 String password = null; 35 try { 36 password = DesUtils.decrypt(identified[2]); 37 } catch (Exception e) { 38 Log4jLog.logError(e,"mongoIdentify","init","init"); 39 } 40 MongoCredential credential = MongoCredential.createScramSha1Credential(identified[0], identified[1], password.toCharArray()); 41 List<MongoCredential> credentials = new ArrayList<MongoCredential>(); 42 credentials.add(credential); 43 44 //經過鏈接認證獲取MongoDB鏈接 45 mongoClient = new MongoClient(addrs,credentials); 46 //getAdminInfo(); 47 48 }
查找hbase中表的數據進行對比的代碼:
1 public void aggreDataNew(String firstCount,String lastCount,String topic){ 2 3 long corDelay=Long.MIN_VALUE; 4 5 String hbaseSub=null; 6 Table table = HBaseUtil.getTable(tableNameCache); 7 8 Scan scan = new Scan(); 9 scan.setStartRow(Bytes.toBytes(firstCount)); 10 scan.setStopRow(Bytes.toBytes(lastCount)); 11 //scan.setBatch(50000); 12 scan.setCaching(1000); 13 scan.setCacheBlocks(false); 14 15 ResultScanner scanner=null; 16 try { 17 scanner = table.getScanner(scan); 18 19 for(Result rs:scanner){ 20 } 21 } catch (IOException e) { 22 e.printStackTrace(); 23 } finally{ 24 if(scanner!=null){scanner.close();} 25 if(table!=null){ 26 try { 27 table.close(); 28 } catch (IOException e) { 29 // TODO Auto-generated catch block 30 e.printStackTrace(); 31 } 32 } 33 } 34 }
在這裏須要注意這3個線程須要順序限制 因此須要加鎖的代碼,加鎖原理,在第三個線程計算mongodb和kafka的數據的線程,須要線程等待。
這年kafka運維沒有什麼值得寫的問題,簡單來講,包括硬件內存,ISR列表不一致,GC問題等等。
檢查一致性:hbase hbck
關閉有問題的表:DISABLE TABLE
更新phoenix索引。
spark也沒有什麼大問題,基本上是hdfs磁盤問題,或者yarn計算節點重啓。
明天就是2020年了,感受2019過的很快,也許由於本身成長速度太慢,安於現狀吧,不激勵本身永遠也不知道本身會成長到什麼地步。共勉!