2019年年終總結

概述

北京的冬天颳起風來仍是比較冷的,在這2019年的最後一天,我也該回顧一下這一年的時光了,這一年對我來講都是很重要的一年,既是轉折也是決定,既是順勢也是波折。固然寫一篇隨筆來訴苦是沒有質量的文章。言歸正傳,咱們依然從技術和其餘來完結這篇總結。redis

技術綜述

2019年技術大事記,按照時間順序。sql

mongodb的監控change-stream 多線程開發。mongodb

kafka運維實踐。多線程

hbase運維實踐。運維

spark運維實踐。socket

ESB培訓與運維。ide

MongoDB監控開發

監控主體是mongoDB經過change-stream發送到kafka的數據,對比手段經過mongo的query語句查找op-log的數據,和消費kafka中的數據進行比對,對比過程都放在hbase中對比。邏輯關係間下圖。綠色部分就是監控要作的事情。ui

 監控是多線程的劃分就是根據kafka中的消息隊列topic和處理邏輯區分的,好比要消費topic1的數據,就會有以下線程:消費topic1的線程,查找mongodb中topic1中表的數據的線程,處理hbase中topic1先關數據的線程,就是說三個線程監控一個topic,2個線程從mongo和kafka拉取數據,一個線程處理數據,處理數據在hbase的表中,至關於一個topic對應一個隊列。如今列一下比較重要的代碼。spa

kafka消費比較簡單,使用KafkaConsumer消費者中的低階API assign方法,其中須要注意的是記錄offset到redis,若是redis存在正常offset則使用這個offset,不然從最新位置開始消費。線程

 1         consumer = new KafkaConsumer<String, String>(props);
 2         /*consumer.subscribe(Arrays.asList(topic));
 3         consumer.seekToEnd();*/
 4         TopicPartition partitions = new TopicPartition(topic, partition);
 5         consumer.assign(Arrays.asList(partitions));
 6         if(RedisCheck==0){
 7         //使用assign方式進行消費
 8         consumer.seekToEnd(partitions);
 9         }else{
10         consumer.seek(partitions, RedisCheck);
11         };

mongodb中使用sql語句查找指定表的數據,核心代碼是,須要注意要鏈接secondary的節點:

 

 1     public  void init(){
 2         List<ServerAddress> addrs = new ArrayList<ServerAddress>();
 3         String mongoServer = PosterMogo.MongoServer;
 4         if(mongoServer==null){
 5             mongoServer="host1:27018,host2:27018";
 6         }
 7         if(mongoServer.indexOf(",")>0){
 8             String[] strs = mongoServer.split(",");
 9             for(String str:strs){
10                 addrs.add(new ServerAddress(str.split(":")[0], Integer.valueOf(str.split(":")[1])));
11             }
12         }else{
13             addrs.add(new ServerAddress(mongoServer.split(":")[0], Integer.valueOf(mongoServer.split(":")[1])));
14         }
15         
16         Builder options = new MongoClientOptions.Builder().readPreference(ReadPreference.secondary());
17         options.cursorFinalizerEnabled(true);
18         //options.maxConnectionLifeTime(0);
19         options.connectionsPerHost(3);
20         options.connectTimeout(6000000);
21         options.maxWaitTime(6000000);
22         options.socketTimeout(6000000);
23         options.maxConnectionIdleTime(6600000);
24         options.socketKeepAlive(true).heartbeatFrequency(1000).connectTimeout(6000000);
25         options.threadsAllowedToBlockForConnectionMultiplier(300);
26         
27         options.build();
28         String[] identified = null;
29         if(PosterMogo.MongoIdentify==null){
30             identified="root,admin,842AB52563B13D332916113FB8DFA9BF".split(",");
31         }else{
32         identified = PosterMogo.MongoIdentify.split(",");
33         }
34         String password = null;
35         try {
36         password = DesUtils.decrypt(identified[2]);
37         } catch (Exception e) {
38             Log4jLog.logError(e,"mongoIdentify","init","init");
39         }
40         MongoCredential credential = MongoCredential.createScramSha1Credential(identified[0], identified[1], password.toCharArray());
41         List<MongoCredential> credentials = new ArrayList<MongoCredential>();
42         credentials.add(credential);
43 
44         //經過鏈接認證獲取MongoDB鏈接
45         mongoClient = new MongoClient(addrs,credentials);
46         //getAdminInfo();
47         
48     }

查找hbase中表的數據進行對比的代碼:

 1 public void aggreDataNew(String firstCount,String lastCount,String topic){
 2         
 3         long corDelay=Long.MIN_VALUE;
 4         
 5         String hbaseSub=null;
 6         Table table = HBaseUtil.getTable(tableNameCache);
 7         
 8         Scan scan = new Scan();
 9         scan.setStartRow(Bytes.toBytes(firstCount));
10         scan.setStopRow(Bytes.toBytes(lastCount));
11         //scan.setBatch(50000);
12         scan.setCaching(1000);
13         scan.setCacheBlocks(false);
14 
15         ResultScanner scanner=null;
16         try {
17             scanner = table.getScanner(scan);
18             
19             for(Result rs:scanner){
20                             }
21                  } catch (IOException e) {
22             e.printStackTrace();
23         } finally{
24             if(scanner!=null){scanner.close();}
25             if(table!=null){
26                 try {
27                     table.close();
28                 } catch (IOException e) {
29                     // TODO Auto-generated catch block
30                     e.printStackTrace();
31                 }
32                 }
33         }
34     }

在這裏須要注意這3個線程須要順序限制 因此須要加鎖的代碼,加鎖原理,在第三個線程計算mongodb和kafka的數據的線程,須要線程等待。

kafka運維實踐

這年kafka運維沒有什麼值得寫的問題,簡單來講,包括硬件內存,ISR列表不一致,GC問題等等。

HBASE運維實踐

檢查一致性:hbase hbck

關閉有問題的表:DISABLE TABLE

更新phoenix索引。

SPARK運維實踐

spark也沒有什麼大問題,基本上是hdfs磁盤問題,或者yarn計算節點重啓。

總結

明天就是2020年了,感受2019過的很快,也許由於本身成長速度太慢,安於現狀吧,不激勵本身永遠也不知道本身會成長到什麼地步。共勉!

相關文章
相關標籤/搜索