奈學百萬大數據架構師

愛分享 愛生活 加油 2021 

百度網盤java

提取碼:qhhv 程序員

一、HashMap 和 Hashtable 區別 

 HashMap和Hashtable都實現了Map接口,但決定用哪個以前先要弄清楚它們之間的分別。主要的區別有:線程安全性,同步(synchronization),以及速度。web

HashMap幾乎能夠等價於Hashtable,除了HashMap是非synchronized的,並能夠接受null(HashMap能夠接受爲null的鍵值(key)和值(value),而Hashtable則不行)。面試

HashMap是非synchronized,而Hashtable是synchronized,這意味着Hashtable是線程安全的,多個線程能夠共享一個Hashtable;而若是沒有正確的同步的話,多個線程是不能共享HashMap的。Java 5提供了ConcurrentHashMap,它是HashTable的替代,比HashTable的擴展性更好。算法

另外一個區別是HashMap的迭代器(Iterator)是fail-fast迭代器,而Hashtable的enumerator迭代器不是fail-fast的。因此當有其它線程改變了HashMap的結構(增長或者移除元素),將會拋出ConcurrentModificationException,但迭代器自己的remove()方法移除元素則不會拋出ConcurrentModificationException異常。但這並非一個必定發生的行爲,要看JVM。這條一樣也是Enumeration和Iterator的區別。sql

因爲Hashtable是線程安全的也是synchronized,因此在單線程環境下它比HashMap要慢。若是你不須要同步,只須要單一線程,那麼使用HashMap性能要好過Hashtable。數據庫

HashMap不能保證隨着時間的推移Map中的元素次序是不變的。apache

要注意的一些重要術語:編程

sychronized意味着在一次僅有一個線程可以更改Hashtable。就是說任何線程要更新Hashtable時要首先得到同步鎖,其它線程要等到同步鎖被釋放以後才能再次得到同步鎖更新Hashtable。設計模式

Fail-safe和iterator迭代器相關。若是某個集合對象建立了Iterator或者ListIterator,而後其它的線程試圖「結構上」更改集合對象,將會拋出ConcurrentModificationException異常。但其它線程能夠經過set()方法更改集合對象是容許的,由於這並無從「結構上」更改集合。可是假如已經從結構上進行了更改,再調用set()方法,將會拋出IllegalArgumentException異常。

結構上的更改指的是刪除或者插入一個元素,這樣會影響到map的結構。

咱們可否讓HashMap同步?

HashMap能夠經過下面的語句進行同步:

Map m = Collections.synchronizeMap(hashMap);

結論

Hashtable和HashMap有幾個主要的不一樣:線程安全以及速度。僅在你須要徹底的線程安全的時候使用Hashtable,而若是你使用Java 5或以上的話,請使用ConcurrentHashMap吧。

二、Java 垃圾回收機制和生命週期 

C語言:

 
fffc7efead508b7ae9ed540f0a069c43.png
 

Java語言:

 
8f915f9d3f8d0fbc20ef3d21fa27918c.png
 

c的垃圾回收是人工的,工做量大,可是可控性高。

java是自動化的,可是可控性不好,甚至有時會出現內存溢出的狀況,

內存溢出也就是jvm分配的內存中對象過多,超出了最大可分配內存的大小。

提到java的垃圾回收機制就不得不提一個方法: ​  

System.gc()用於調用垃圾收集器,在調用時,垃圾收集器將運行以回收未使用的內存空間。它將嘗試釋放被丟棄對象佔用的內存。

然而System.gc()調用附帶一個免責聲明,沒法保證對垃圾收集器的調用。

因此System.gc()並不能說是完美主動進行了垃圾回收。

做爲java程序員仍是頗有必要了解一下gc,這也是面試過程當中常常出現的一道題目。

 咱們從三個角度來理解gc。

 1jvm怎麼肯定哪些對象應該進行回收

 2jvm會在何時進行垃圾回收的動做

 3jvm究竟是怎麼清楚垃圾對象的

jvm怎麼肯定哪些對象應該進行回收

對象是否會被回收的兩個經典算法:引用計數法,和可達性分析算法。

引用計數法

簡單的來講就是判斷對象的引用數量。實現方式:給對象共添加一個引用計數器,每當有引用對他進行引用時,計數器的值就加1,當引用失效,也就是不在執行此對象是,他的計數器的值就減1,若某一個對象的計數器的值爲0,那麼表示這個對象沒有人對他進行引用,也就是意味着是一個失效的垃圾對象,就會被gc進行回收。

 可是這種簡單的算法在當前的jvm中並無採用,緣由是他並不能解決對象之間循環引用的問題。

 假設有A和B兩個對象之間互相引用,也就是說A對象中的一個屬性是B,B中的一個屬性時A,這種狀況下因爲他們的相互引用,從而是垃圾回收機制沒法識別。

 
47c91ce181668f117e3f8407acdf6530.png
 

由於引用計數法的缺點有引入了可達性分析算法,經過判斷對象的引用鏈是否可達來決定對象是否能夠被回收。可達性分析算法是從離散數學中的圖論引入的,程序把全部的引用關係看做一張圖,經過一系列的名爲GC Roots的對象做爲起始點,從這些節點開始向下搜索,搜索所走過的路徑稱爲引用鏈。當一個對象到 GC Roots 沒有任何引用鏈相連(就是從 GC Roots 到這個對象不可達)時,則證實此對象是不可用的。

如圖:

 
60ee439b62a56c6f034fd0c546f86601.png
 

二在肯定了哪些對象能夠被回收以後,jvm會在何時進行回收

 1會在cpu空閒的時候自動進行回收

 2在堆內存存儲滿了以後

 3主動調用System.gc()後嘗試進行回收

三如何回收

 如何回收說的也就是垃圾收集的算法。

算法又有四個:標記-清除算法,複製算法,標記-整理算法,分代收集算法.

 1 標記-清除算法。

 這是最基礎的一種算法,分爲兩個步驟,第一個步驟就是標記,也就是標記處全部須要回收的對象,標記完成後就進行統一的回收掉哪些帶有標記的對象。這種算法優勢是簡單,缺點是效率問題,還有一個最大的缺點是空間問題,標記清除以後會產生大量不連續的內存碎片,當程序在之後的運行過程當中須要分配較大對象時沒法找到足夠的連續內存而形成內存空間浪費。

執行如圖:

 
931b8a41d44348967d0452cbefc3c15a.png
 

2複製算法。

複製將可用內存按容量劃分爲大小相等的兩塊,每次只使用其中的一塊。當這一塊的內存用完了,就將還存活着的對象複製到另一塊上面,而後再把已使用過的內存空間一次清理掉。這樣使得每次都是對其中的一塊進行內存回收,內存分配時也就不用考慮內存碎片等複雜狀況。只是這種算法的代價是將內存縮小爲原來的一半。

複製算法的執行過程如圖:

 
648a64b2ab4965012a0b310803e36062.png
 

複製收集算法在對象存活率較高時就要執行較多的複製操做,效率將會變低。更關鍵的是,浪費了一半的空間。

標記-整理算法:

標記整理算法與標記清除算法很類似,但最顯著的區別是:標記清除算法僅對不存活的對象進行處理,剩餘存活對象不作任何處理,形成內存碎片;而標記整理算法不只對不存活對象進行處理清除,還對剩餘的存活對象進行整理,從新整理,所以其不會產生內存碎片。

 
85af68c7175c13c445ee259387087c7f.png
 

分代收集算法:

分代收集算法是一種比較智能的算法,也是如今jvm使用最多的一種算法,他自己其實不是一個新的算法,而是他會在具體的場景自動選擇以上三種算法進行垃圾對象回收。

那麼如今的重點就是分代收集算法中說的自動根據具體場景進行選擇。這個具體場景究竟是什麼場景。

場景其實指的是針對jvm的哪個區域,1.7以前jvm把內存分爲三個區域:新生代,老年代,永久代。

 
0f7da8cafc1b4074661c48daf65d67f0.png
 

瞭解過場景以後再結合分代收集算法得出結論: 一、在新生代中,每次垃圾收集時都發現有大批對象死去,只有少許存活,那就選用複製算法。只須要付出少許存活對象的複製成本就能夠完成收集。 二、老年代中由於對象存活率高、沒有額外空間對他進行分配擔保,就必須用標記-清除或者標記-整理。

 
a045b7cba8105ecab7fec4fa1e52e985.png
 

注意:

在jdk8的時候java廢棄了永久代,可是並不意味着咱們以上的結論失效,由於java提供了與永久代相似的叫作「元空間」的技術。

廢棄永久代的緣由:因爲永久代內存常常不夠用或發生內存泄露,爆出異常java.lang.OutOfMemoryErroy。元空間的本質和永久代相似。不過元空間與永久代之間最大的區別在於:元空間並不在虛擬機中,而是使用本地內存。也就是不侷限與jvm能夠使用系統的內存。理論上取決於32位/64位系統可虛擬的內存大小。

GC垃圾回收:

    jvm按照對象的生命週期,將內存按「代」劃分(將堆劃分爲多個地址池):新生代、老年代和持久代(jdk1.8後移除持久代);

    在JVM中程序(PC)計數器、JAVA棧、本地方法棧3個區域隨線程而生、隨線程而滅,所以這幾個區域的內存分配和回收都具有肯定性,就不須要過多考慮回收的問題,由於方法結束或者線程結束時,內存天然就跟隨着回收了。而堆和方法區則不同,這部份內存的分配和回收是動態的,正是垃圾收集器所需關注的部分。

    java中新建立的對象會先被放在新生代區域,該區域對象使用頻繁,jvm會在該區域使用不一樣算法回收必定的短時間對象,若是某些對象使用次數達到必定限制後,那麼該對象就會被放入老年代區域,老年代區域要比新生代區域更大一些(堆內存大部分分配給了老年代區域),而持久代保存的是類的元數據、常量、類靜態變量等。  

方法區和永久代的區別:

   對於方法區和永久代的區別的話,人們一直將它們看做一個部件,其實永久代實現了方法區,比做java中類的話,永久代就是接口實現類,方法區就是接口。

finalize()和System.gc()方法介紹:

    提到GC就要提到finalize()方法,該方法是在jvm肯定了一個對象符合GC的條件下執行的,用於對一些外部資源的釋放等操做,可是什麼時候對這個對象回收咱們就不知道了;須要注意的是在jvm調用了該方法後,這個符合GC的對象也不必定最後就被回收了,由於在執行了finalize()方法後因爲在方法體給對該方法進行了一些操做,使得該對象不符合GC的條件,例如將一個引用指向這個對象,最終致使該對象不會被GC,但這也只能求這個對象依次。

    一樣還有System.gc()方法,這個方法的調用,jvm也不會當即執行對對象的回收,gc()僅僅是提醒jvm能夠回收該方法了,但實際上要根據jvm內存需求來肯定何實回收這個能夠回收的對象。

那麼gc()和finalize()的區別是什麼呢?

    首先finalize()方法是jvm調用的,可是在回收期間不必定每一個對象都會調用這個方法進行收尾工做,這也是這個方法不被提倡使用的緣由。而System.gc()方法能夠人爲調用進行標記一個對象能夠被回收。

最後咱們從什麼時候回收對象比較,finalize()標記的對象是在被標記後的第二次回收時進行回收,而System.gc()方法沒有這種規定,它只是被標記,什麼時候回收由jvm決定。

代碼示例:

public class Test {

@Override

protected void finalize() throws Throwable {

super.finalize();

System.out.println("調用");

}

public static void main(String[] args){

Test test = new Test();

test=null;

System.gc();

}

}

分析:

    咱們這裏建立了Test類並重寫了finalize()方法,而後我在主方法裏建立了一個Test對象,並使其引用爲空(此時符合回收條件)咱們先調用System.gc()

結果:

    調用

    咱們發現執行了finalize()方法,OK,咱們如今將System.gc()註釋掉,咱們會發現並無輸出「調用」,也就是沒有調用finalize()方法,這就是不必定每一個垃圾對象jvm都會自動調用finalize()方法。

三、怎麼解決 Kafka 數據丟失的問題 

1)消費端弄丟了數據

惟一可能致使消費者弄丟數據的狀況,就是說,你那個消費到了這個消息,而後消費者那邊自動提交了offset,讓kafka覺得你已經消費好了這個消息,其實你剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。

這不是同樣麼,你們都知道kafka會自動提交offset,那麼只要關閉自動提交offset,在處理完以後本身手動提交offset,就能夠保證數據不會丟。可是此時確實仍是會重複消費,好比你剛處理完,還沒提交offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。

生產環境碰到的一個問題,就是說咱們的kafka消費者消費到了數據以後是寫到一個內存的queue裏先緩衝一下,結果有的時候,你剛把消息寫入內存queue,而後消費者會自動提交offset。

而後此時咱們重啓了系統,就會致使內存queue裏還沒來得及處理的數據就丟失了

2)kafka弄丟了數據

這塊比較常見的一個場景,就是kafka某個broker宕機,而後從新選舉partiton的leader時。你們想一想,要是此時其餘的follower恰好還有些數據沒有同步,結果此時leader掛了,而後選舉某個follower成leader以後,他不就少了一些數據?這就丟了一些數據啊。

生產環境也遇到過,咱們也是,以前kafka的leader機器宕機了,將follower切換爲leader以後,就會發現說這個數據就丟了

因此此時通常是要求起碼設置以下4個參數:

給這個topic設置replication.factor參數:這個值必須大於1,要求每一個partition必須有至少2個副本

在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟本身保持聯繫,沒掉隊,這樣才能確保leader掛了還有一個follower吧

在producer端設置acks=all:這個是要求每條數據,必須是寫入全部replica以後,才能認爲是寫成功了

在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了

咱們生產環境就是按照上述要求配置的,這樣配置以後,至少在kafka broker端就能夠保證在leader所在broker發生故障,進行leader切換時,數據不會丟失

3)生產者會不會弄丟數據

若是按照上述的思路設置了ack=all,必定不會丟,要求是,你的leader接收到消息,全部的follower都同步到了消息以後,才認爲本次寫成功了。若是沒知足這個條件,生產者會自動不斷的重試,重試無限次。

四、zookeeper 是如何保證數據一致性的 

ZooKeeper是個集羣,內部有多個server,每一個server均可以鏈接多個client,每一個client均可以修改server中的數據

ZooKeeper能夠保證每一個server內的數據徹底一致,是如何實現的呢?

答:數據一致性是靠Paxos算法保證的,Paxos能夠說是分佈式一致性算法的鼻祖,是ZooKeeper的基礎

Paxos的基本思路:(深刻解讀zookeeper一致性原理)

假設有一個社團,其中有團員、議員(決議小組成員)兩個角色

團員能夠向議員申請提案來修改社團制度

議員坐在一塊兒,拿出本身收到的提案,對每一個提案進行投票表決,超過半數經過便可生效

爲了秩序,規定每一個提案都有編號ID,按順序自增

每一個議員都有一個社團制度筆記本,上面記着全部社團制度,和最近處理的提案編號,初始爲0

投票經過的規則:

新提案ID 是否大於 議員本中的ID,是議員舉手贊同

若是舉手人數大於議員人數的半數,即讓新提案生效

例如:

剛開始,每一個議員本子上的ID都爲0,如今有一個議員拿出一個提案:團費降爲100元,這個提案的ID自增爲1

每一個議員都和本身ID對比,一看 1>0,舉手贊同,同時修改本身本中的ID爲1

發出提案的議員一看超過半數贊成,就宣佈:1號提案生效

而後全部議員都修改本身筆記本中的團費爲100元

之後任何一個團員諮詢任何一個議員:"團費是多少?",議員能夠直接打開筆記本查看,並回答:團費爲100元

可能會有極端的狀況,就是多個議員一塊兒發出了提案,就是併發的狀況

例如

剛開始,每一個議員本子上的編號都爲0,如今有兩個議員(A和B)同時發出了提案,那麼根據自增規則,這兩個提案的編號都爲1,但只會有一個被先處理

假設A的提案在B的上面,議員們先處理A提案並經過了,這時,議員們的本子上的ID已經變爲了1,接下來處理B的提案,因爲它的ID是1,不大於議員本子上的ID,B提案就被拒絕了,B議員須要從新發起提案

上面就是Paxos的基本思路,對照ZooKeeper,對應關係就是:

團員 -client

議員 -server

議員的筆記本 -server中的數據

提案 -變動數據的請求

提案編號 -zxid(ZooKeeper Transaction Id)

提案生效 -執行變動數據的操做

ZooKeeper中還有一個leader的概念,就是把發起提案的權利收緊了,之前是每一個議員均可以發起提案,如今有了leader,你們就不要七嘴八舌了,先把提案都交給leader,由leader一個個發起提案

Paxos算法就是經過投票、全局編號機制,使同一時刻只有一個寫操做被批准,同時併發的寫操做要去爭取選票,只有得到過半數選票的寫操做纔會被批准,因此永遠只會有一個寫操做獲得批准,其餘的寫操做競爭失敗只好再發起一輪投票

1)一致性保證

Zookeeper是一種高性能、可擴展的服務。Zookeeper的讀寫速度很是快,而且讀的速度要比寫的速度更快。另外,在進行讀操做的時候,ZooKeeper依然可以爲舊的數據提供服務。這些都是因爲ZooKeepe所提供的一致性保證,它具備以下特色:

 順序一致性

客戶端的更新順序與它們被髮送的順序相一致。

原子性

更新操做要麼成功要麼失敗,沒有第三種結果。

單系統鏡像

不管客戶端鏈接到哪個服務器,客戶端將看到相同的ZooKeeper視圖。

 可靠性

一旦一個更新操做被應用,那麼在客戶端再次更新它以前,它的值將不會改變。。這個保證將會產生下面兩種結果:

1.若是客戶端成功地得到了正確的返回代碼,那麼說明更新已經成果。若是不可以得到返回代碼(因爲通訊錯誤、超時等等),那麼客戶端將不知道更新操做是否生效。

2.當從故障恢復的時候,任何客戶端可以看到的執行成功的更新操做將不會被回滾。

 實時性

在特定的一段時間內,客戶端看到的系統須要被保證是實時的(在十幾秒的時間裏)。在此時間段內,任何系統的改變將被客戶端看到,或者被客戶端偵測到。

給予這些一致性保證,ZooKeeper更高級功能的設計與實現將會變得很是容易,例如:leader選舉、隊列以及可撤銷鎖等機制的實現。

2)Leader選舉

ZooKeeper須要在全部的服務(能夠理解爲服務器)中選舉出一個Leader,而後讓這個Leader來負責管理集羣。此時,集羣中的其它服務器則成爲此Leader的Follower。而且,當Leader故障的時候,須要ZooKeeper可以快速地在Follower中選舉出下一個Leader。這就是ZooKeeper的Leader機制,下面咱們將簡單介紹在ZooKeeper中,Leader選舉(Leader Election)是如何實現的。

此操做實現的核心思想是:首先建立一個EPHEMERAL目錄節點,例如「/election」。而後。每個ZooKeeper服務器在此目錄下建立一個SEQUENCE|EPHEMERAL 類型的節點,例如「/election/n_」。在SEQUENCE標誌下,ZooKeeper將自動地爲每個ZooKeeper服務器分配一個比前一個分配的序號要大的序號。此時建立節點的ZooKeeper服務器中擁有最小序號編號的服務器將成爲Leader。

在實際的操做中,還須要保障:當Leader服務器發生故障的時候,系統可以快速地選出下一個ZooKeeper服務器做爲Leader。一個簡單的解決方案是,讓全部的follower監視leader所對應的節點。當Leader發生故障時,Leader所對應的臨時節點將會自動地被刪除,此操做將會觸發全部監視Leader的服務器的watch。這樣這些服務器將會收到Leader故障的消息,並進而進行下一次的Leader選舉操做。可是,這種操做將會致使「從衆效應」的發生,尤爲當集羣中服務器衆多而且帶寬延遲比較大的時候,此種狀況更爲明顯。

在Zookeeper中,爲了不從衆效應的發生,它是這樣來實現的:每個follower對follower集羣中對應的比本身節點序號小一號的節點(也就是全部序號比本身小的節點中的序號最大的節點)設置一個watch。只有當follower所設置的watch被觸發的時候,它才進行Leader選舉操做,通常狀況下它將成爲集羣中的下一個Leader。很明顯,此Leader選舉操做的速度是很快的。由於,每一次Leader選舉幾乎只涉及單個follower的操做。

五、hadoop 和 spark 在處理數據時,處理出現內存溢出的方法有哪些?

1. map過程產生大量對象致使內存溢出

這種溢出的緣由是在單個map中產生了大量的對象致使的。

例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操做在rdd中,每一個對象都產生了10000個對象,這確定很容易產生內存溢出的問題。針對這種問題,在不增長內存的狀況下,能夠經過減小每一個Task的大小,以便達到每一個Task即便產生大量的對象Executor的內存也可以裝得下。具體作法能夠在會產生大量對象的map操做以前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。

面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減小分區,不能增長分區,不會有shuffle的過程。

2.數據不平衡致使內存溢出

數據不平衡除了有可能致使內存溢出外,也有可能致使性能的問題,解決方法和上面說的相似,就是調用repartition從新分區。這裏就再也不累贅了。

3.coalesce調用致使內存溢出

這是我最近才遇到的一個問題,由於hdfs中不適合存小問題,因此Spark計算後若是產生的文件過小,咱們會調用coalesce合併文件再存入hdfs中。可是這會致使一個問題,例如在coalesce以前有100個文件,這也意味着可以有100個Task,如今調用coalesce(10),最後只產生10個文件,由於coalesce並非shuffle操做,這意味着coalesce並非按照我本來想的那樣先執行100個Task,再將Task的執行結果合併成10個,而是從頭到位只有10個Task在執行,本來100個文件是分開執行的,如今每一個Task同時一次讀取10個文件,使用的內存是原來的10倍,這致使了OOM。解決這個問題的方法是令程序按照咱們想的先執行100個Task再將結果合併成10個文件,這個問題一樣能夠經過repartition解決,調用repartition(10),由於這就有一個shuffle的過程,shuffle先後是兩個Stage,一個100個分區,一個是10個分區,就能按照咱們的想法執行。

4.shuffle後內存溢出

shuffle內存溢出的狀況能夠說都是shuffle後,單個文件過大致使的。在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,須要傳入一個partitioner,大部分Spark中的shuffle操做,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數,這個參數經過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism參數只對HashPartitioner有效,因此若是是別的Partitioner或者本身實現的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的併發量了。若是是別的partitioner致使的shuffle內存溢出,就須要從partitioner的代碼增長partitions的數量。

5. standalone模式下資源分配不均勻致使內存溢出

在standalone的模式下若是配置了–total-executor-cores 和 –executor-memory 這兩個參數,可是沒有配置–executor-cores這個參數的話,就有可能致使,每一個Executor的memory是同樣的,可是cores的數量不一樣,那麼在cores數量多的Executor中,因爲可以同時執行多個Task,就容易致使內存溢出的狀況。這種狀況的解決方法就是同時配置–executor-cores或者spark.executor.cores參數,確保Executor資源分配均勻。

6.在RDD中,共用對象可以減小OOM的狀況

這個比較特殊,這裏說記錄一下,遇到過一種狀況,相似這樣rdd.flatMap(x=>for(i <- 1 to 1000) yield (「key」,」value」))致使OOM,可是在一樣的狀況下,使用rdd.flatMap(x=>for(i <- 1 to 1000) yield 「key」+」value」)就不會有OOM的問題,這是由於每次(「key」,」value」)都產生一個Tuple對象,而」key」+」value」,無論多少個,都只有一個對象,指向常量池。具體測試以下:

 
fd02bb3c9c0b7f28b5939ebbf085ef67.png
 

這個例子說明(「key」,」value」)和(「key」,」value」)在內存中是存在不一樣位置的,也就是存了兩份,可是」key」+」value」雖然出現了兩次,可是隻存了一份,在同一個地址,這用到了JVM常量池的知識.因而乎,若是RDD中有大量的重複數據,或者Array中須要存大量重複數據的時候咱們均可以將重複數據轉化爲String,可以有效的減小內存使用.

六、java 實現快速排序 

高快省的排序算法

有沒有既不浪費空間又能夠快一點的排序算法呢?那就是「快速排序」啦!光聽這個名字是否是就以爲很高端呢。

假設咱們如今對「6 1 2 7 9 3 4 5 10 8」這個10個數進行排序。首先在這個序列中隨便找一個數做爲基準數(不要被這個名詞嚇到了,就是一個用來參照的數,待會你就知道它用來作啥的了)。爲了方便,就讓第一個數6做爲基準數吧。接下來,須要將這個序列中全部比基準數大的數放在6的右邊,比基準數小的數放在6的左邊,相似下面這種排列:

3 1 2 5 4 6 9 7 10 8

在初始狀態下,數字6在序列的第1位。咱們的目標是將6挪到序列中間的某個位置,假設這個位置是k。如今就須要尋找這個k,而且以第k位爲分界點,左邊的數都小於等於6,右邊的數都大於等於6。想想,你有辦法能夠作到這點嗎?

排序算法顯神威

方法其實很簡單:分別從初始序列「6 1 2 7 9 3 4 5 10 8」兩端開始「探測」。先從找一個小於6的數,再從找一個大於6的數,而後交換他們。這裏能夠用兩個變量i和j,分別指向序列最左邊和最右邊。咱們爲這兩個變量起個好聽的名字「哨兵i」和「哨兵j」。剛開始的時候讓哨兵i指向序列的最左邊(即i=1),指向數字6。讓哨兵j指向序列的最右邊(即=10),指向數字。

 
50f6b0de6e772c06d175477d367618be.jpeg
 

首先哨兵j開始出動。由於此處設置的基準數是最左邊的數,因此須要讓哨兵j先出動,這一點很是重要(請本身想想爲何)。哨兵j一步一步地向左挪動(即j–),直到找到一個小於6的數停下來。接下來哨兵i再一步一步向右挪動(即i++),直到找到一個數大於6的數停下來。最後哨兵j停在了數字5面前,哨兵i停在了數字7面前。

 
b031ee227541b07a6b92bc7c0bb3bb2c.jpeg
 
 
4acbd95c53f4466bc05d5e2fae934d56.jpeg
 

如今交換哨兵i和哨兵j所指向的元素的值。交換以後的序列以下:

6 1 2 5 9 3 4 7 10 8

 
68d7e024b3d1351d368602a237e156e4.jpeg
 
 
6fbe4556cf5e1996af7e3154d9d4a010.jpeg
 

到此,第一次交換結束。接下來開始哨兵j繼續向左挪動(再友情提醒,每次必須是哨兵j先出發)。他發現了4(比基準數6要小,知足要求)以後停了下來。哨兵i也繼續向右挪動的,他發現了9(比基準數6要大,知足要求)以後停了下來。此時再次進行交換,交換以後的序列以下:

6 1 2 5 4 3 9 7 10 8

第二次交換結束,「探測」繼續。哨兵j繼續向左挪動,他發現了3(比基準數6要小,知足要求)以後又停了下來。哨兵i繼續向右移動,糟啦!此時哨兵i和哨兵j相遇了,哨兵i和哨兵j都走到3面前。說明此時「探測」結束。咱們將基準數6和3進行交換。交換以後的序列以下:

3 1 2 5 4 6 9 7 10 8

 
f277e824750d67378883e08480fa4cd6.jpeg
 
 
d9b5fa46b31ed943a9a306aa86b90b8b.jpeg
 
 
a9cee21525657fe3e7bb1f5cba9b9a4f.jpeg
 

到此第一輪「探測」真正結束。此時以基準數6爲分界點,6左邊的數都小於等於6,6右邊的數都大於等於6。回顧一下剛纔的過程,其實哨兵j的使命就是要找小於基準數的數,而哨兵i的使命就是要找大於基準數的數,直到i和j碰頭爲止。

OK,解釋完畢。如今基準數6已經歸位,它正好處在序列的第6位。此時咱們已經將原來的序列,以6爲分界點拆分紅了兩個序列,左邊的序列是「3 1 2 5 4」,右邊的序列是「9 7 10 8」。接下來還須要分別處理這兩個序列。由於6左邊和右邊的序列目前都仍是很混亂的。不過沒關係,咱們已經掌握了方法,接下來只要模擬剛纔的方法分別處理6左邊和右邊的序列便可。如今先來處理6左邊的序列現吧。

左邊的序列是「3 1 2 5 4」。請將這個序列以3爲基準數進行調整,使得3左邊的數都小於等於3,3右邊的數都大於等於3。好了開始動筆吧

若是你模擬的沒有錯,調整完畢以後的序列的順序應該是:

2 1 3 5 4

OK,如今3已經歸位。接下來須要處理3左邊的序列「2 1」和右邊的序列「5 4」。對序列「2 1」以2爲基準數進行調整,處理完畢以後的序列爲「1 2」,到此2已經歸位。序列「1」只有一個數,也不須要進行任何處理。至此咱們對序列「2 1」已所有處理完畢,獲得序列是「1 2」。序列「5 4」的處理也仿照此方法,最後獲得的序列以下:

1 2 3 4 5 6 9 7 10 8

對於序列「9 7 10 8」也模擬剛纔的過程,直到不可拆分出新的子序列爲止。最終將會獲得這樣的序列,以下

1 2 3 4 5 6 7 8 9 10

到此,排序徹底結束。細心的同窗可能已經發現,快速排序的每一輪處理其實就是將這一輪的基準數歸位,直到全部的數都歸位爲止,排序就結束了。下面上個霸氣的圖來描述下整個算法的處理過程。

 
5d06fc82aa9417e9eead11bfa67c534d.jpeg
 

這是爲何呢?

快速排序之所比較快,由於相比冒泡排序,每次交換是跳躍式的。每次排序的時候設置一個基準點,將小於等於基準點的數所有放到基準點的左邊,將大於等於基準點的數所有放到基準點的右邊。這樣在每次交換的時候就不會像冒泡排序同樣每次只能在相鄰的數之間進行交換,交換的距離就大的多了。所以總的比較和交換次數就少了,速度天然就提升了。固然在最壞的狀況下,仍多是相鄰的兩個數進行了交換。所以快速排序的最差時間複雜度和冒泡排序是同樣的都是O(N2),它的平均時間複雜度爲O(NlogN)。其實快速排序是基於一種叫作「二分」的思想。咱們後面還會遇到「二分」思想,到時候再聊。先上代碼,以下

代碼實現:

public class QuickSort {
    public static void quickSort(int[] arr,int low,int high){ 
         int i,j,temp,t; 
         if(low>high){ 
             return; 
         } 
         i=low; 
         j=high; 
         //temp就是基準位
         temp = arr[low]; 
         
        while (i<j) { 
             //先看右邊,依次往左遞減 
             while (temp<=arr[j]&&i<j) { 
                     j--; 
             }
             //再看左邊,依次往右遞增 
             while (temp>=arr[i]&&i<j) { 
                     i++; 
             } 
             //若是知足條件則交換 
             if (i<j) {
                 t = arr[j]; 
                 arr[j] = arr[i]; 
                 arr[i] = t; 
              } 
         } 
         //最後將基準爲與i和j相等位置的數字交換 
         arr[low] = arr[i]; 
         arr[i] = temp; 
         //遞歸調用左半數組 
         quickSort(arr, low, j-1); 
         //遞歸調用右半數組 
         quickSort(arr, j+1, high);
 } 
    public static void main(String[] args){ 
         int[] arr = {10,7,2,4,7,62,3,4,2,1,8,9,19}; 
         quickSort(arr, 0, arr.length-1);
         for (int i = 0; i < arr.length; i++) { 
         System.out.println(arr[i]); 
         } 
     }
}

輸出爲

1
2
2
3
4
4
7
7
8
9
10
19
62

七、設計微信羣發紅包數據庫表結構(包含表名稱、字段名稱、類型) 

舉例:

drop table if exists wc_groupsend_rp;
create external table wc_groupsend_rp (
     imid string, --設備ID
     wcid string, --微信號
     wcname string, --微信名
     wcgroupName string, --羣名稱
     rpamount double, --紅包金額
     rpid string, --紅包標識
     rpcount int, --紅包數量
     rptype int, --紅包類型 好比1拼手氣紅包,2爲普通紅包,3爲指定人領取紅包
     giverpdt string, --發紅包時間
    setuprpdt string, --建立紅包時間 點擊紅包按鈕的時間     paydt string, --支付時間
) COMMENT '羣發紅包表'
PARTITIONED BY (`giverpdt` string)
row format delimited fields terminated by '\t';
 drop table if exists wc_groupcash_rp;
create external table wc_groupcash_rp (
    rpid string, --紅包標識
     imid string, --設備ID
     wcid string, --微信號
     wcname string, --微信名
    wcgroupName string, --羣名稱
     cashdt stirng, --紅包領取時間 每領取一次更新一條數據 
     cashcount int, --領取人數
     cashamount double, --領取金額
     cashwcid string, --領取人的微信
     cashwcname string, --領取人微信暱稱
     cashsum double, --已領取總金額
) COMMENT '紅包領取表'
PARTITIONED BY (`rpid` string)
row format delimited fields terminated by '\t'; 

八、如何選型:業務場景、性能要求、維護和擴展性、成本、開源活躍度 

九、Spark如何調優 

1)使用foreachPartitions替代foreach。

原理相似於「使用mapPartitions替代map」,也是一次函數調用處理一個partition的全部數據,而不是一次函數調用處理一條數據。在實踐中發現,foreachPartitions類的算子,對性能的提高仍是頗有幫助的。好比在foreach函數中,將RDD中全部數據寫MySQL,那麼若是是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會建立一個數據庫鏈接,此時就勢必會頻繁地建立和銷燬數據庫鏈接,性能是很是低下;可是若是用foreachPartitions算子一次性處理一個partition的數據,那麼對於每一個partition,只要建立一個數據庫鏈接便可,而後執行批量插入操做,此時性能是比較高的。實踐中發現,對於1萬條左右的數據量寫MySQL,性能能夠提高30%以上。

 

2)設置num-executors參數

參數說明:該參數用於設置Spark做業總共要用多少個Executor進程來執行。Driver在向YARN集羣管理器申請資源時,YARN集羣管理器會盡量按照你的設置來在集羣的各個工做節點上,啓動相應數量的Executor進程。這個參數很是之重要,若是不設置的話,默認只會給你啓動少許的Executor進程,此時你的Spark做業的運行速度是很是慢的。

 

參數調優建議:該參數設置的太少,沒法充分利用集羣資源;設置的太多的話,大部分隊列可能沒法給予充分的資源。針對數據交換的業務場景,建議該參數設置1-5。

 

3)設置executor-memory參數

參數說明:該參數用於設置每一個Executor進程的內存。Executor內存的大小,不少時候直接決定了Spark做業的性能,並且跟常見的JVM OOM異常也有直接的關聯。

 

參數調優建議:針對數據交換的業務場景,建議本參數設置在512M及如下。

 

4) executor-cores

參數說明:該參數用於設置每一個Executor進程的CPU core數量。這個參數決定了每一個Executor進程並行執行task線程的能力。由於每一個CPU core同一時間只能執行一個task線程,所以每一個Executor進程的CPU core數量越多,越可以快速地執行完分配給本身的全部task線程。

 

參數調優建議:Executor的CPU core數量設置爲2~4個較爲合適。建議,若是是跟他人共享一個隊列,那麼num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,避免影響其餘人的做業運行。

 

5) driver-memory

參數說明:該參數用於設置Driver進程的內存。

 

參數調優建議:Driver的內存一般來講不設置,或者設置512M如下就夠了。惟一須要注意的一點是,若是須要使用collect算子將RDD的數據所有拉取到Driver上進行處理,那麼必須確保Driver的內存足夠大,不然會出現OOM內存溢出的問題。

 

6) spark.default.parallelism

參數說明:該參數用於設置每一個stage的默認task數量。這個參數極爲重要,若是不設置可能會直接影響你的Spark做業性能。

 

參數調優建議:若是不設置這個參數, Spark本身根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。Spark官網建議的設置原則是,設置該參數爲num-executors * executor-cores的2~3倍較爲合適,此時能夠充分地利用Spark集羣的資源。針對數據交換的場景,建議此參數設置爲1-10。

 

7) spark.storage.memoryFraction

參數說明:該參數用於設置RDD持久化數據在Executor內存中能佔的比例,默認是0.6。也就是說,默認Executor 60%的內存,能夠用來保存持久化的RDD數據。根據你選擇的不一樣的持久化策略,若是內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。

 

參數調優建議:若是Spark做業中,有較多的RDD持久化操做,該參數的值能夠適當提升一些,保證持久化的數據可以容納在內存中。避免內存不夠緩存全部的數據,致使數據只能寫入磁盤中,下降了性能。可是若是Spark做業中的shuffle類操做比較多,而持久化操做比較少,那麼這個參數的值適當下降一些比較合適。若是發現做業因爲頻繁的gc致使運行緩慢(經過spark web ui能夠觀察到做業的gc耗時),意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。針對數據交換的場景,建議下降此參數值到0.2-0.4。

 

8) spark.shuffle.memoryFraction

參數說明:該參數用於設置shuffle過程當中一個task拉取到上個stage的task的輸出後,進行聚合操做時可以使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操做。shuffle操做在進行聚合時,若是發現使用的內存超出了這個20%的限制,那麼多餘的數據就會溢寫到磁盤文件中去,此時就會極大地下降性能。

 

參數調優建議:若是Spark做業中的RDD持久化操做較少,shuffle操做較多時,建議下降持久化操做的內存佔比,提升shuffle操做的內存佔比比例,避免shuffle過程當中數據過多時內存不夠用,必須溢寫到磁盤上,下降了性能。若是發現做業因爲頻繁的gc致使運行緩慢,意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。針對數據交換的場景,建議此值設置爲0.1或如下。

 

資源參數參考示例

 

如下是一份spark-submit命令的示例,能夠參考一下,並根據本身的實際狀況進行調節:

./bin/spark-submit \

  --master yarn-cluster \

  --num-executors 1 \

  --executor-memory 512M \

  --executor-cores 2 \

  --driver-memory 512M \

  --conf spark.default.parallelism=2 \

  --conf spark.storage.memoryFraction=0.2 \

  --conf spark.shuffle.memoryFraction=0.1 \

十、Flink和spark的通訊框架有什麼異同 

 

首先它們有哪些共同點?flink和spark都是apache 軟件基金會(ASF)旗下頂級項目,都是通用數據處理平臺。它們能夠應用在不少的大數據應用和處理環境。而且有以下擴展:

 

 
a181113bbf64860b03293b7be5ed86bb.png
 

而且二者都可在不依賴於其餘環境的狀況下運行於standalone模式,或是運行在基於hadoop(YARN,HDFS)之上,因爲它們均是運行於內存,因此他們表現的都比hadoop要好不少。

然而它們在實現上仍是有不少不一樣點:

在spark 1.5.x以前的版本,數據集的大小不能大於機器的內存數。

Flink在進行集合的迭代轉換時能夠是循環或是迭代計算處理。這使得Join算法、對分區的連接和重用以及排序能夠選擇最優算法。固然flink也是一個很強大的批處理工具。flink的流式處理的是真正的流處理。流式數據一但進入就實時進行處理,這就容許流數據靈活地在操做窗口。它甚至能夠在使用水印的流數中處理數據(It is even capable of handling late data in streams by the use of watermarks)。此外,flink的代碼執行引擎還對現有使用storm,mapreduce等有很強的兼容性。

Spark 在另外一方面是基於 彈性分佈式數據集(RDD),這(主要的)給於spark基於內存內數據結構的函數式編程。它能夠經過固定的內存給於大批量的計算。spark streaming 把流式數據封裝成小的批處理,也就是它收集在一段時間內到達的全部數據,並在收集的數據上運行一個常規批處理程序。同時一邊收集下一個小的批處理數據。

十一、Java的代理 

代理模式是什麼

代理模式是一種設計模式,簡單說便是在不改變源碼的狀況下,實現對目標對象功能擴展

好比有個歌手對象叫Singer,這個對象有一個唱歌方法叫sing()。

public class Singer{
    public void sing(){
        System.out.println("唱一首歌");
    }
}

假如你但願,經過你的某種方式生產出來的歌手對象,在唱歌先後還要想觀衆問好和答謝,也即對目標對象Singer的sing方法進行功能擴展。

public void sing(){
     System.out.println("向觀衆問好");
     System.out.println("唱一首歌");
     System.out.println("謝謝你們");

可是每每你又不能直接對源代碼進行修改,多是你但願原來的對象還保持原來的樣子,又或許你提供的只是一個可插拔的插件,甚至你有可能都不知道你要對哪一個目標對象進行擴展。這時就須要用到java的代理模式了。網上好多用生活中的經理人的例子來解釋「代理」,看似通俗易懂,但我以爲不適合程序員去理解。程序員應該從代碼的本質入手。

 

Java的三種代理模式

想要實現以上的需求有三種方式,這一部分咱們只看三種模式的代碼怎麼寫,先不涉及實現原理的部分。

1.靜態代理

 

 

public interface ISinger {
    voidsing();
}
/**
*  目標對象實現了某一接口
*/
public class Singer implements ISinger{
    public void sing(){
         System.out.println("唱一首歌");
    }
}
/**
*  代理對象和目標對象實現相同的接口
*/
public class SingerProxy implementsI Singer{
    //接收目標對象,以便調用sing方法
    private ISinger target;
    public UserDaoProxy(ISinger target){
        this.target=target;
    }
    //對目標對象的sing方法進行功能擴展
    public void sing() {
        System.out.println("向觀衆問好");
        target.sing();
        System.out.println("謝謝你們");
    }
 }

 

 

測試

 

 

/**
* 測試類
*/
public classTest {
    public static void main(String[] args) {
        //目標對象
         ISinger target =newSinger();
        //代理對象
         ISinger proxy =newSingerProxy(target);
        //執行的是代理的方法
        proxy.sing();
    }
 }

 

 

  總結:其實這裏作的事情無非就是,建立一個代理類SingerProxy,繼承了ISinger接口並實現了其中的方法。只不過這種實現特地包含了目標對象的方法,正是這種特徵使得看起來像是「擴展」了目標對象的方法。假使代理對象中只是簡單地對sing方法作了另外一種實現而沒有包含目標對象的方法,也就不能算做代理模式了。因此這裏的包含是關鍵。

缺點:這種實現方式很直觀也很簡單,但其缺點是代理對象必須提早寫出,若是接口層發生了變化,代理對象的代碼也要進行維護。若是能在運行時動態地寫出代理對象,不但減小了一大批代理類的代碼,也少了不斷維護的煩惱,不過運行時的效率一定受到影響。這種方式就是接下來的動態代理。

 

2.動態代理(也叫JDK代理)

 跟靜態代理的前提同樣,依然是對Singer對象進行擴展

 

 

public interface ISinger {
    void sing();
}
/**
*  目標對象實現了某一接口
*/
public class Singer implements ISinger{
    public void sing(){
         System.out.println("唱一首歌");
    }
}

 

 

這回直接上測試,因爲java底層封裝了實現細節(以後會詳細講),因此代碼很是簡單,格式也基本上固定。

調用Proxy類的靜態方法newProxyInstance便可,該方法會返回代理類對象

static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces,InvocationHandler h )

接收的三個參數依次爲:

 ·ClassLoader loader:指定當前目標對象使用類加載器,寫法固定

·Class<?>[] interfaces:目標對象實現的接口的類型,寫法固定

·InvocationHandler h:事件處理接口,需傳入一個實現類,通常直接使用匿名內部類

相關文章
相關標籤/搜索