最近不是一直在學習大數據框架和引用嘛(我是按照尚硅谷B站視頻先學習過一遍路線,之後找準方向研究),除了本身手動利用Kafka和HDFS寫一個簡單的分佈式文件傳輸(分佈式課程開放性實驗,恰好用上了所學的來練練手)之外,還學習這個學習路線一個項目,電信客服實戰。在這個項目裏面仍是學習到了很多內容,包括Java上不足的不少地方、Java工程開發上的要求和少數框架的複習。不排除本身太菜,啥都不知道,認爲一些常見的東西不常見的狀況(哭html
鑑於網上有不少相似的內容,這裏我只是將我學習和復code的過程當中,學習到的知識和遇到問題的解決方案寫下,以做記錄和回顧。
1.個人手敲代碼,2.老師源碼,含數據和筆記(提取碼:pfbv)
聲明一下,沒有任何廣告意思,這種渠道是很容易找到而且也不少的,我只是剛好學習了這個,而且我以爲還不錯(求生欲極強java
項目主要是模擬電信中的信息部門,從生產環境中獲取通話信息數據,根據業務需求存儲和分析數據。node
統計天天、每個月以及每一年的每一個人的通話次數及時長。git
編寫代碼的流程分爲四步:①數據生產,②數據消費,③數據分析,④數據展現。我在學習過程當中,沒有學習數據展現部分。github
主要任務是利用contact.log(聯繫人文件)的數據,生成不一樣聯繫人之間通話記錄的流程。
這個Part,老師有句話我以爲很在理,「大數據開發人員雖然無論數據怎麼來的,怎麼出去的,可是必須知道和了解這個過程才能按照需求code」。腦海中閃過中間件redis
在之前的編程學習過程當中,老是一古腦兒的猛寫代碼,雖然我自認爲我在咱們宿舍已是模塊化思想最爲嚴重的了,可是從未接觸到面向接口編程。這學期也學習了軟件工程,(雖然咱們學得很水),這門課雖然不是在教咱們寫代碼,但倒是教咱們如何正確的作項目和寫代碼(暈。
面向接口編程也是如此,在這個項目中,瞭解了咱們的數據來源和需求後,第一步要作的是弄清楚須要的對象和須要的功能,即接口,在共同的模塊中肯定好接口和接口的方法簽名,接下來纔是對接口模塊的實現和實現業務。
在這個項目中,創建了一個ct-common模塊做爲公共模塊,簡單介紹幾個:編程
接口或抽象類 | 描述 |
---|---|
Val | 通常數據都須要的實現的接口,只包括名稱意義上的獲取值value()方法 |
DataIn | 數據輸入接口,功能有設置輸入路徑,讀取數據,故存在setPath()和read()方法 |
Producer | 數據生產者接口,功能有獲取輸入信息,設置生產輸出和生產,故存在setIn()和和setOut()和produce()方法 |
下圖是ct-common的代碼結構:數組
這個思想其實我在以前的編碼過程當中就有點領悟了,之因此在這裏提出,是由於在這個跟進過程當中,更加體會到Java編碼就是各類對象組合調用的含義。或許是老師項目拉得太快,讓我感受本身太菜,skrskr
我以前編碼過程當中,也會不停的封裝對象,但通常都是那些很明顯的功能集成對象,更別說是對數據進行封裝成數據集成對象了。換句話說,就是我以前封裝的對象都是含有必定動做的(除了getter&setter)。可是對於一些對象之間傳遞的數據,若是每次都傳相同的數據而且數量>1的話,最好的是封裝成對象,提升擴展性。在業務須要增添一個數據傳遞的狀況下,封裝數據對象只須要更改對象的屬性和對象的構成,不然每一個傳遞的地方(語句)都須要增添傳遞的數據變量。緩存
下面用一張圖表示,在該項目中封裝Calllog和Contact對象的效果:
若是不封裝對象,若是聯繫人對象裏面在加入一個new item(好比性別),那麼幾乎全部的地方都須要修改;反之,只須要在Contact類中增添new item屬性和在Calllog中增添A.new&B.new屬性,以及修改構造方法就能夠了,同時在Producer過程當中,沒有增添和修改過多代碼。網絡
在這個Part中主要仍是熟悉任務就能夠完成,沒遇到什麼問題。若是不用上述的tricks那這不就是一個讀入文件和寫入文件的代碼嘛(我一main方法就能搞定),可是用了以後感受就明顯不一樣,更加工程化,邏輯感更強。
主要操做是利用Flume和Kafka將收集不斷生產的數據,而且將數據插入到HBase中。
主要是學到了一些新的知識,還有知識的簡單運用,我並無深究這些新概念(估計得學到頭禿)。
類加載器:類加載器是負責將多是網絡上、也多是磁盤上的class文件加載到內存中。併爲其生成對應的java.lang.class對象。
有三種類加載器,分別按照順序是啓動類加載器BootstrapClassLoader、擴展類加載器Extension ClassLoader和系統類加載器App ClassLoader。還存在一種雙親委派模型,簡單的意思就是說當一個類加載器收到加載請求時,首先會向上層(父)類加載器發出加載請求。而且每個類加載器都是如此,因此每一個類加載器的請求都會被傳遞到最頂層的類加載器中,一開始我以爲很麻煩,不過這確實能夠避免類的重複加載。
在電信客服的項目中,類加載器被用於加載resource文件夾的配置文件。
Properties prop = new Properties(); // 利用類加載器獲取配置文件 prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
ThreadLocal:這是一個線程內維護的存儲變量數組。舉個簡單的比方,在Java運行的時候有多個線程,存在一個Map<K,V>,K就是每一個線程的Id,V則是每一個線程內存儲的數據變量。
這是多線程相同的變量的訪問衝突問題解決方法之一,是經過給每一個線程單獨一份存儲空間(犧牲空間)來解決訪問衝突;而熟悉的Synchronized經過等待(犧牲時間)來解決訪問衝突。同時ThreadLocal還具備線程隔離的做用,即A線程不能訪問B線程的V。
在電信客服的項目中,ThreadLocal被用來持久化Connection和Admin鏈接。由於在HBase的DDL和DML操做中,不一樣的操做都須要用到鏈接,因此將其和該線程進行綁定,加快獲取的鏈接的速度和減小內存佔用。固然也能夠直接new 幾個對象,最後統一關閉。
// 經過ThreadLocal保證同一個線程中能夠不重複建立鏈接和Admin。 private ThreadLocal<Connection> connHolder = new ThreadLocal<Connection>(); private ThreadLocal<Admin> adminHolder = new ThreadLocal<Admin>(); private Connection getConnection() throws IOException { Connection conn = connHolder.get(); if (conn == null) { Configuration conf = HBaseConfiguration.create(); conn = ConnectionFactory.createConnection(conf); connHolder.set(conn); } return conn; } private Admin getAdmin() throws IOException { Admin admin = adminHolder.get(); if (admin == null) { getConnection(); admin = connHolder.get().getAdmin(); adminHolder.set(admin); } return admin; }
分區鍵的設計通常是機器數量。rowKey的設計基於表的分區數,而且知足長度原則(10~100KB便可,最好是8的倍數)、惟一性原則和散列性原則(負載均衡,防止出現數據熱點)
本項目中共6個分區,故分區號爲"0|"、"1|"、"2|"、"3|"、"4|"。舉一個例子,3****的第二位不管是任何數字都會小於"|"(第二大的字符),因此"2|"<"3****"<"3|",故分到第四個分區。
設計好了分區鍵後,rowKey的設計主要是根據業務需求哪些數據須要彙集在一塊兒方便查詢,那就利用那些數據設計數據的分區號。
數據含有主叫用戶(13312341234)、被叫用戶(14443214321)、通話日期(20181010)和通話時長(0123)。業務要求咱們將常常須要統計一個用戶在某一月內的通話記錄,即主叫用戶和通話日期中的年月是關鍵數據。根據這些數據計算分區號,保證同一用戶在同一月的通話記錄在HBase上是緊鄰的(還有一個前提要求是rowkey還必須是分,分區號+主叫用戶+通話日期+others,不然在一個分區上仍是有多是亂的)。下面是計算分區號的代碼:
/** * 計算獲得一條數據的分區編號 * * @param tel 數據的主叫電話 * @param date 數據的通話日期 * @return regionNum 分區編號 */ protected int genRegionNum(String tel, String date) { // 獲取電話號碼的隨機部分 String userCode = tel.substring(tel.length() - 4); // 獲取年月 String yearMonth = date.substring(0, 6); // 哈希 int userCodeHash = userCode.hashCode(); int yearMonthHash = yearMonth.hashCode(); // crc 循環冗餘校驗 int crc = Math.abs(userCodeHash ^ yearMonthHash); // 取餘,保證分區號在分區鍵範圍內 int regionNum = crc & ValueConstants.REGION_NUMS; return regionNum; }
例子:查詢13312341234用戶在201810的通話記錄
startKey <- genRegionNum("13312341234","201810")+""+"13312341234"+"_"+"201810"
endKey <- genRegionNum("13312341234","201810")+""+"13312341234"+"_"+"201810"+"|"
電信客服中一般須要計算兩個客戶之間親密度,計算的數據來源於二者的通話記錄。舉個例子,計算A和B的親密度,那麼須要A和B之間的通話記錄,特別注意的是不只須要A call B的記錄,還須要B call A的記錄。
就比如MySQL中的觸發器同樣,MySQL的觸發器有針對update、insert和delete的,還有before和after等等,協處理器也有相似的對應函數。好比,在本項目中,須要的是再插入一條數據後,協處理器被觸發插入另一條「重複數據」,因此複寫的方法是postPut。
設計具體邏輯是:根據插入的Put得到插入的數據信息,而後判斷插入的標誌位Flag是否是1,若是是1,則插入另一條重複數據。
下面是代碼:
public class InsertCalleeCoprocessor extends BaseRegionObserver { /** * 這是HBase上的協處理器方法,在一次Put以後接下來的動做 * * @param e * @param put * @param edit * @param durability * @throws IOException */ @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { // 1. 獲取表對象 Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue())); // 2. 構造Put // 在rowKey中存在不少數據信息,這一點就不具有普適性 String values = Bytes.toString(put.getRow()); String[] split = values.split("_"); String call1 = split[1]; String call2 = split[2]; String callTime = split[3]; String duration = split[4]; String flag = split[5]; // 在協處理器中也發生了Put操做,可是此時的Put不引起協處理器再次響應 // 必須得關閉錶鏈接 if ("0".equals(flag)) { table.close(); return; } CoprocessorDao dao = new CoprocessorDao(); String rowKey = dao.genRegionNums(call2, callTime) + "_" + call2 + "_" + call1 + "_" + callTime + "_" + duration + "_" + "0"; Put calleePut = new Put(Bytes.toBytes(rowKey)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("call1"), Bytes.toBytes(call2)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("call2"), Bytes.toBytes(call1)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("callTime"), Bytes.toBytes(callTime)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("duration"), Bytes.toBytes(duration)); calleePut.addColumn(Bytes.toBytes(Names.CF_CALLEE.getValue()), Bytes.toBytes("flag"), Bytes.toBytes("0")); // 3. 插入Put table.put(calleePut); // 4. 關閉資源,不然內存會溢出 table.close(); } private class CoprocessorDao extends BaseDao { public int genRegionNums(String tel, String date) { return super.genRegionNum(tel, date); } } }
須要注意的問題:
這個流程是我學習最多的流程,除了複習這個大數據框架的API,更多的是對個人Java有了更多的拓展。除了上述提到的,還有一些註解,泛型和泛型的PECS原則等等。另外就是學習怎麼一步步排除錯誤和尋找本身的(低級)錯誤的方法了,這種DeBug的方式對於我來講很新鮮。
同時利用redis緩存數據,利用MapReduce將HBase中的數據提取到MySQL中。
出現的問題:MapReduce任務執行成功,可是MySQL中未插入數據,同時查看MapReduce8088端口,看不到日誌,顯示no log for container available。
問題分析:
1.觀察MapReduce的任務,發現Reduce的確是正確輸出了字節,可是MySQL沒有插入數據,那隻能多是編寫的OutputFormat出現了問題。
2.no log for container available, 在網上查閱資料提示有多是內存不足的問題。
3.查看MapReduce的Reduce任務,發現是在nodemanager是在slave1上運行,而slave1只分配了2G內存。
4.kill slave1和slave2的nodemanager,只運行master的nodemanager,由於master我分配了4G內存。
5.查看日誌成功,尋找錯誤。
6.發現是MySQL語句出現了語法錯誤????????(離譜,就**離譜)
7.修改MySQL語句,任務成功執行。
這是個人一個學習上手的大數據項目,雖然簡單可是也學習很多。作這個項目的時候是考試周,也算是忙裏偷閒完成了!主要是這個項目和咱們小隊準備參加的服創大賽的項目很相似,也算是提早練練手,熟悉下基本的流程。不過咱們小隊的項目最好仍是得上Spark和好的機器(虛擬機老拉跨,因此繼續學習!!!