Zookeeper是一個分佈式開源框架,提供了協調分佈式應用的基本服務,它向外部應用暴露一組通用服務——分佈式同步(Distributed Synchronization)、命名服務(Naming Service)、集羣維護(Group Maintenance)等,簡化分佈式應用協調及其管理的難度,提供高性能的分佈式服務。ZooKeeper自己能夠以單機模式安裝運行,不過它的長處在於經過分佈式ZooKeeper集羣(一個Leader,多個Follower),基於必定的策略來保證ZooKeeper集羣的穩定性和可用性,從而實現分佈式應用的可靠性。html
一、zookeeper是爲別的分佈式程序服務的java
二、Zookeeper自己就是一個分佈式程序(只要有半數以上節點存活,zk就能正常服務)node
三、Zookeeper所提供的服務涵蓋:主從協調、服務器節點動態上下線、統一配置管理、分佈式共享鎖、統> 一名稱服務等linux
四、雖說能夠提供各類服務,可是zookeeper在底層其實只提供了兩個功能:程序員
管理(存儲,讀取)用戶程序提交的數據(相似namenode中存放的metadata);
併爲用戶程序提供數據節點監聽服務;redis
Zookeeper集羣的角色: Leader 和 follower
只要集羣中有半數以上節點存活,集羣就能提供服務算法
一、Zookeeper:一個leader,多個follower組成的集羣shell
二、全局數據一致:每一個server保存一份相同的數據副本,client不管鏈接到哪一個server,數據都是一致的數據庫
三、分佈式讀寫,更新請求轉發,由leader實施apache
四、更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行
五、數據更新原子性,一次數據更新要麼成功,要麼失敗
六、實時性,在必定時間範圍內,client能讀到最新數據
一、層次化的目錄結構,命名符合常規文件系統規範(相似文件系統)
二、每一個節點在zookeeper中叫作znode,而且其有一個惟一的路徑標識
三、節點Znode能夠包含數據和子節點(可是EPHEMERAL類型的節點不能有子節點)
節點類型
a、Znode有兩種類型:
短暫(ephemeral)(create -e /app1/test1 「test1」 客戶端斷開鏈接zk刪除ephemeral類型節點)
持久(persistent) (create -s /app1/test2 「test2」 客戶端斷開鏈接zk不刪除persistent類型節點)
b、Znode有四種形式的目錄節點(默認是persistent )
PERSISTENT
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
EPHEMERAL
EPHEMERAL_SEQUENTIAL
c、建立znode時設置順序標識,znode名稱後會附加一個值,順序號是一個單調遞增的計數器,由父節點維護
d、在分佈式系統中,順序號能夠被用於爲全部的事件進行全局排序,這樣客戶端能夠經過順序號推斷事件的順序
發佈與訂閱模型,即所謂的配置中心,顧名思義就是發佈者將數據發佈到ZK節點上,供訂閱者動態獲取數據,實現配置信息的集中式管理和動態更新。例如全局的配置信息,服務式服務框架的服務地址列表等就很是適合使用。
這裏說的負載均衡是指軟負載均衡。在分佈式環境中,爲了保證高可用性,一般同一個應用或同一個服務的提供方都會部署多份,達到對等服務。而消費者就需要在這些對等的服務器中選擇一個來執行相關的業務邏輯,其中比較典型的是消息中間件中的生產者,消費者負載均衡。
消息中間件中發佈者和訂閱者的負載均衡,linkedin開源的KafkaMQ和阿里開源的 metaq都是經過zookeeper來作到生產者、消費者的負載均衡。這裏以metaq爲例如講下:
生產者負載均衡:metaq發送消息的時候,生產者在發送消息的時候必須選擇一臺broker上的一個分區來發送消息,所以metaq在運行過程當中,會把全部broker和對應的分區信息所有註冊到ZK指定節點上,默認的策略是一個依次輪詢的過程,生產者在經過ZK獲取分區列表以後,會按照brokerId和partition的順序排列組織成一個有序的分區列表,發送的時候按照從頭至尾循環往復的方式選擇一個分區來發送消息。
消費負載均衡: 在消費過程當中,一個消費者會消費一個或多個分區中的消息,可是一個分區只會由一個消費者來消費。MetaQ的消費策略是:
1. 每一個分區針對同一個group只掛載一個消費者。
2. 若是同一個group的消費者數目大於分區數目,則多出來的消費者將不參與消費。
3. 若是同一個group的消費者數目小於分區數目,則有部分消費者須要額外承擔消費任務。
在某個消費者故障或者重啓等狀況下,其餘消費者會感知到這一變化(經過 zookeeper watch消費者列表),而後從新進行負載均衡,保證全部的分區都有消費者進行消費。
命名服務也是分佈式系統中比較常見的一類場景。在分佈式系統中,經過使用命名服務,客戶端應用可以根據指定名字來獲取資源或服務的地址,提供者等信息。被命名的實體一般能夠是集羣中的機器,提供的服務地址,遠程對象等等——這些咱們均可以統稱他們爲名字(Name)。其中較爲常見的就是一些分佈式服務框架中的服務地址列表。經過調用ZK提供的建立節點的API,可以很容易建立一個全局惟一的path,這個path就能夠做爲一個名稱。
阿里巴巴集團開源的分佈式服務框架Dubbo中使用ZooKeeper來做爲其命名服務,維護全局的服務地址列表, 點擊這裏查看Dubbo開源項目。在Dubbo實現中:
服務提供者在啓動的時候,向ZK上的指定節點/dubbo/${serviceName}/providers目錄下寫入本身的URL地址,這個操做就完成了服務的發佈。
服務消費者啓動的時候,訂閱/dubbo/${serviceName}/providers目錄下的提供者URL地址, 並向/dubbo/${serviceName} /consumers目錄下寫入本身的URL地址。
注意,全部向ZK上註冊的地址都是臨時節點,這樣就可以保證服務提供者和消費者可以自動感應資源的變化。 另外,Dubbo還有針對服務粒度的監控,方法是訂閱/dubbo/${serviceName}目錄下全部提供者和消費者的信息。
ZooKeeper中特有watcher註冊與異步通知機制,可以很好的實現分佈式環境下不一樣系統之間的通知與協調,實現對數據變動的實時處理。使用方法一般是不一樣系統都對ZK上同一個znode進行註冊,監聽znode的變化(包括znode自己內容及子節點的),其中一個系統update了znode,那麼另外一個系統可以收到通知,並做出相應處理
1. 另外一種心跳檢測機制:檢測系統和被檢測系統之間並不直接關聯起來,而是經過zk上某個節點關聯,大大減小系統耦合。
2. 另外一種系統調度模式:某系統有控制檯和推送系統兩部分組成,控制檯的職責是控制推送系統進行相應的推送工做。管理人員在控制檯做的一些操做,其實是修改了ZK上某些節點的狀態,而ZK就把這些變化通知給他們註冊Watcher的客戶端,即推送系統,因而,做出相應的推送任務。
3. 另外一種工做彙報模式:一些相似於任務分發系統,子任務啓動後,到zk來註冊一個臨時節點,而且定時將本身的進度進行彙報(將進度寫回這個臨時節點),這樣任務管理者就可以實時知道任務進度。
總之,使用zookeeper來進行分佈式通知和協調可以大大下降系統之間的耦合
1. 集羣機器監控:這一般用於那種對集羣中機器狀態,機器在線率有較高要求的場景,可以快速對集羣中機器變化做出響應。這樣的場景中,每每有一個監控系統,實時檢測集羣機器是否存活。過去的作法一般是:監控系統經過某種手段(好比ping)定時檢測每一個機器,或者每一個機器本身定時向監控系統彙報「我還活着」。 這種作法可行,可是存在兩個比較明顯的問題:
1. 集羣中機器有變更的時候,牽連修改的東西比較多。
2. 有必定的延時。
利用ZooKeeper有兩個特性,就能夠實現另外一種集羣機器存活性監控系統:
1. 客戶端在節點 x 上註冊一個Watcher,那麼若是 x?的子節點變化了,會通知該客戶端。
2. 建立EPHEMERAL類型的節點,一旦客戶端和服務器的會話結束或過時,那麼該節點就會消失。
例如,監控系統在 /clusterServers 節點上註冊一個Watcher,之後每動態加機器,那麼就往 /clusterServers 下建立一個 EPHEMERAL類型的節點:/clusterServers/{hostname}. 這樣,監控系統就可以實時知道機器的增減狀況,至於後續處理就是監控系統的業務了。
2. Master選舉則是zookeeper中最爲經典的應用場景了。
在分佈式環境中,相同的業務應用分佈在不一樣的機器上,有些業務邏輯(例如一些耗時的計算,網絡I/O處理),每每只須要讓整個集羣中的某一臺機器進行執行,其他機器能夠共享這個結果,這樣能夠大大減小重複勞動,提升性能,因而這個master選舉即是這種場景下的碰到的主要問題。
利用ZooKeeper的強一致性,可以保證在分佈式高併發狀況下節點建立的全局惟一性,即:同時有多個客戶端請求建立 /currentMaster 節點,最終必定只有一個客戶端請求可以建立成功。利用這個特性,就能很輕易的在分佈式環境中進行集羣選取了。
另外,這種場景演化一下,就是動態Master選舉。這就要用到EPHEMERAL_SEQUENTIAL類型節點的特性了。
上文中提到,全部客戶端建立請求,最終只有一個可以建立成功。在這裏稍微變化下,就是容許全部請求都可以建立成功,可是得有個建立順序,因而全部的請求最終在ZK上建立結果的一種可能狀況是這樣: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次選取序列號最小的那個機器做爲Master,若是這個機器掛了,因爲他建立的節點會立刻小時,那麼以後最小的那個機器就是Master了。
1. 在搜索系統中,若是集羣中每一個機器都生成一份全量索引,不只耗時,並且不能保證彼此之間索引數據一致。所以讓集羣中的Master來進行全量索引的生成,而後同步到集羣中其它機器。另外,Master選舉的容災措施是,能夠隨時進行手動指定master,就是說應用在zk在沒法獲取master信息時,能夠經過好比http方式,向一個地方獲取master。
2. 在Hbase中,也是使用ZooKeeper來實現動態HMaster的選舉。在Hbase實現中,會在ZK上存儲一些ROOT表的地址和HMaster的地址,HRegionServer也會把本身以臨時節點(Ephemeral)的方式註冊到Zookeeper中,使得HMaster能夠隨時感知到各個HRegionServer的存活狀態,同時,一旦HMaster出現問題,會從新選舉出一個HMaster來運行,從而避免了HMaster的單點問題
分佈式鎖,這個主要得益於 ZooKeeper 爲咱們保證了數據的強一致性。鎖服務能夠分爲兩類,一個是 保持獨佔,另外一個是 控制時序。
1. 所謂保持獨佔,就是全部試圖來獲取這個鎖的客戶端,最終只有一個能夠成功得到這把鎖。一般的作法是把 zk 上的一個 znode 看做是一把鎖,經過 create znode 的方式來實現。全部客戶端都去建立 /distribute_lock 節點,最終成功建立的那個客戶端也即擁有了這把鎖。
2. 控制時序,就是全部視圖來獲取這個鎖的客戶端,最終都是會被安排執行,只是有個全局時序了。作法和上面基本相似,只是這裏 /distributelock 已經預先存在,客戶端在它下面建立臨時有序節點(這個能夠經過節點的屬性控制:CreateMode.EPHEMERALSEQUENTIAL 來指定)。Zk 的父節點(/distribute_lock)維持一份 sequence, 保證子節點建立的時序性,從而也造成了每一個客戶端的全局時序。
環境要求:必需要有jdk環境,本次講課使用jdk1.8
1.安裝jdk
2.安裝Zookeeper. 在官網http://zookeeper.apache.org/下載zookeeper.我下載的是zookeeper-3.4.6版本。
解壓zookeeper-3.4.6至D:\machine\zookeeper-3.4.6.
在D:\machine 新建data及log目錄。
3.ZooKeeper的安裝模式分爲三種,分別爲:單機模式(stand-alone)、集羣模式和集羣僞分佈模式。ZooKeeper 單機模式的安裝相對比較簡單,若是第一次接觸ZooKeeper的話,建議安裝ZooKeeper單機模式或者集羣僞分佈模式。
安裝單擊模式。 至D:\machine\zookeeper-3.4.6\conf 複製 zoo_sample.cfg 並粘貼到當前目錄下,命名zoo.cfg.
環境要求:必需要有jdk環境,本次講課使用jdk1.8
一共三個節點
(zk服務器集羣規模不小於3個節點),要求服務器之間系統時間保持一致。
vi /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_71
export ZOOKEEPER_HOME=/usr/local/zookeeper
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile
cd /usr/local/zookeeper/conf
mv zoo_sample.cfg zoo.cfg
dataDir=/usr/local/zookeeper/data
server.0=bhz:2888:3888 server.1=hadoop1:2888:3888 server.2=hadoop2:2888:3888
進行復制zookeeper目錄到hadoop01和hadoop02
還有/etc/profile文件
把hadoop0一、 hadoop02中的myid文件裏的值修改成1和2
路徑(vi /usr/local/zookeeper/data/myid)
zkServer.sh status 查詢狀態
# The number of milliseconds of each tick tickTime=2000
# The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/myuser/zooA/data # the port at which the clients will connect clientPort=2181 # ZooKeeper server and its port no. # ZooKeeper ensemble should know about every other machine in the ensemble # specify server id by creating 'myid' file in the dataDir # use hostname instead of IP address for convenient maintenance server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2988:3988 server.3=127.0.0.1:2088:3088 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir # autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature <br> #autopurge.purgeInterval=1 dataLogDir=/home/myuser/zooA/log
tickTime:心跳時間,爲了確保鏈接存在的,以毫秒爲單位,最小超時時間爲兩個心跳時間
initLimit:多少個心跳時間內,容許其餘server鏈接並初始化數據,若是ZooKeeper管理的數據較大,則應相應增大這個值
clientPort:服務的監聽端口
dataDir:用於存放內存數據庫快照的文件夾,同時用於集羣的myid文件也存在這個文件夾裏(注意:一個配置文件只能包含一個dataDir字樣,即便它被註釋掉了。)
dataLogDir:用於單獨設置transaction log的目錄,transaction log分離能夠避免和普通log還有快照的競爭
syncLimit:多少個tickTime內,容許follower同步,若是follower落後太多,則會被丟棄。
server.A=B:C:D:
A是一個數字,表示這個是第幾號服務器,B是這個服務器的ip地址
C第一個端口用來集羣成員的信息交換,表示的是這個服務器與集羣中的Leader服務器交換信息的端口
D是在leader掛掉時專門用來進行選舉leader所用
ZooKeeper命令行工具相似於Linux的shell環境,不過功能確定不及shell啦,可是使用它咱們能夠簡單的對ZooKeeper進行訪問,數據建立,數據修改等操做. 使用 zkCli.sh -server 127.0.0.1:2181 鏈接到 ZooKeeper 服務,鏈接成功後,系統會輸出 ZooKeeper 的相關環境以及配置信息。
命令行工具的一些簡單操做以下:
1. 顯示根目錄下、文件: ls / 使用 ls 命令來查看當前 ZooKeeper 中所包含的內容
2. 顯示根目錄下、文件: ls2 / 查看當前節點數據並能看到更新次數等數據
3. 建立文件,並設置初始內容: create /zk "test" 建立一個新的 znode節點「 zk 」以及與它關聯的字符串
4. 獲取文件內容: get /zk 確認 znode 是否包含咱們所建立的字符串
5. 修改文件內容: set /zk "zkbak" 對 zk 所關聯的字符串進行設置
6. 刪除文件: delete /zk 將剛纔建立的 znode 刪除
7. 退出客戶端: quit
8. 幫助命令: help
Zookeeper說明
建立節點(znode) 方法:
create:
提供了兩套建立節點的方法,同步和異步建立節點方式。
同步方式:
參數1,節點路徑《名稱) : InodeName (不容許遞歸建立節點,也就是說在父節點不存在
的狀況下,不容許建立子節點)
參數2,節點內容: 要求類型是字節數組(也就是說,不支持序列化方式,若是須要實現序
列化,可以使用java相關序列化框架,如Hessian、Kryo框架)
參數3,節點權限: 使用Ids.OPEN_ACL_UNSAFE開放權限便可。(這個參數通常在權展
沒有過高要求的場景下,不必關注)
參數4,節點類型: 建立節點的類型: CreateMode,提供四種首點象型
PERSISTENT #持久化節點 PERSISTENT_SEQUENTIAL #順序自動編號持久化節點,這種節點會根據當前已存在的節點數自動加 1 EPHEMERAL #臨時節點, 客戶端session超時這類節點就會被自動刪除 EPHEMERAL_SEQUENTIAL #臨時自動編號節點
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
package com.hongmoshui.test; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class Test001 { // 鏈接地址 private static final String ADDRES = "127.0.0.1:2181"; // session 會話 private static final int SESSION_OUTTIME = 2000; // 信號量,阻塞程序執行,用戶等待zookeeper鏈接成功,發送成功信號, private static final CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(ADDRES, SESSION_OUTTIME, new Watcher() { public void process(WatchedEvent event) { // 獲取事件狀態 KeeperState keeperState = event.getState(); // 獲取事件類型 EventType eventType = event.getType(); if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { countDownLatch.countDown(); System.out.println("zk 啓動鏈接..."); } } } }); // 進行阻塞 countDownLatch.await(); String result = zk.create("/hongmoshui_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(result); zk.close(); } }
// 1. 建立持久節點,而且容許任何服務器能夠操做 String result = zk.create("/hongmoshui_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("result:" + result); // 2. 建立臨時節點 String result = zk.create("/hongmoshui_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("result:" + result);
在ZooKeeper中,接口類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個枚舉類,分別表明了通知狀態和事件類型,同時定義了事件的回調方法:process(WatchedEvent event)。
同一個事件類型在不一樣的通知狀態中表明的含義有所不一樣,表7-3列舉了常見的通知狀態和事件類型。
表7-3 Watcher通知狀態與事件類型一覽
KeeperState |
EventType |
觸發條件 |
說明 |
|
None |
客戶端與服務端成功創建鏈接 |
|
SyncConnected |
NodeCreated |
Watcher監聽的對應數據節點被建立 |
|
|
NodeDeleted |
Watcher監聽的對應數據節點被刪除 |
此時客戶端和服務器處於鏈接狀態 |
|
NodeDataChanged |
Watcher監聽的對應數據節點的數據內容發生變動 |
|
|
NodeChildChanged |
Wather監聽的對應數據節點的子節點列表發生變動 |
|
Disconnected |
None |
客戶端與ZooKeeper服務器斷開鏈接 |
此時客戶端和服務器處於斷開鏈接狀態 |
Expired |
Node |
會話超時 |
此時客戶端會話失效,一般同時也會受到SessionExpiredException異常 |
AuthFailed |
None |
一般有兩種狀況,1:使用錯誤的schema進行權限檢查 2:SASL權限檢查失敗 |
一般同時也會收到AuthFailedException異常 |
表7-3中列舉了ZooKeeper中最多見的幾個通知狀態和事件類型。
回調方法process()
process方法是Watcher接口中的一個回調方法,當ZooKeeper向客戶端發送一個Watcher事件通知時,客戶端就會對相應的process方法進行回調,從而實現對事件的處理。process方法的定義以下:
abstract public void process(WatchedEvent event);
這個回調方法的定義很是簡單,咱們重點看下方法的參數定義:WatchedEvent。
WatchedEvent包含了每個事件的三個基本屬性:通知狀態(keeperState),事件類型(EventType)和節點路徑(path),其數據結構如圖7-5所示。ZooKeeper使用WatchedEvent對象來封裝服務端事件並傳遞給Watcher,從而方便回調方法process對服務端事件進行處理。
提到WatchedEvent,不得不講下WatcherEvent實體。籠統地講,二者表示的是同一個事物,都是對一個服務端事件的封裝。不一樣的是,WatchedEvent是一個邏輯事件,用於服務端和客戶端程序執行過程當中所需的邏輯對象,而WatcherEvent由於實現了序列化接口,所以能夠用於網絡傳輸。
服務端在生成WatchedEvent事件以後,會調用getWrapper方法將本身包裝成一個可序列化的WatcherEvent事件,以便經過網絡傳輸到客戶端。客戶端在接收到服務端的這個事件對象後,首先會將WatcherEvent還原成一個WatchedEvent事件,並傳遞給process方法處理,回調方法process根據入參就可以解析出完整的服務端事件了。
須要注意的一點是,不管是WatchedEvent仍是WatcherEvent,其對ZooKeeper服務端事件的封裝都是機及其簡單的。舉個例子來講,當/zk-book這個節點的數據發生變動時,服務端會發送給客戶端一個「ZNode數據內容變動」事件,客戶端只可以接收到以下信
package com.hongmoshui; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class ZkClientWatcher implements Watcher { // 集羣鏈接地址 private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181"; // 會話超時時間 private static final int SESSIONTIME = 2000; // 信號量,讓zk在鏈接以前等待,鏈接成功後才能往下走. private static final CountDownLatch countDownLatch = new CountDownLatch(1); private static String LOG_MAIN = "【main】 "; private ZooKeeper zk; public void createConnection(String connectAddres, int sessionTimeOut) { try { zk = new ZooKeeper(connectAddres, sessionTimeOut, this); System.out.println(LOG_MAIN + "zk 開始啓動鏈接服務器...."); countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } public boolean createPath(String path, String data) { try { this.exists(path, true); this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(LOG_MAIN + "節點建立成功, Path:" + path + ",data:" + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 判斷指定節點是否存在 * * @param path 節點路徑 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } public boolean updateNode(String path, String data) throws KeeperException, InterruptedException { exists(path, true); this.zk.setData(path, data.getBytes(), -1); return false; } public void process(WatchedEvent watchedEvent) { // 獲取事件狀態 KeeperState keeperState = watchedEvent.getState(); // 獲取事件類型 EventType eventType = watchedEvent.getType(); // zk 路徑 String path = watchedEvent.getPath(); System.out.println("進入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path); // 判斷是否創建鏈接 if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { // 若是創建創建成功,讓後程序往下走 System.out.println(LOG_MAIN + "zk 創建鏈接成功!"); countDownLatch.countDown(); } else if (EventType.NodeCreated == eventType) { System.out.println(LOG_MAIN + "事件通知,新增node節點" + path); } else if (EventType.NodeDataChanged == eventType) { System.out.println(LOG_MAIN + "事件通知,當前node節點" + path + "被修改...."); } else if (EventType.NodeDeleted == eventType) { System.out.println(LOG_MAIN + "事件通知,當前node節點" + path + "被刪除...."); } } System.out.println("--------------------------------------------------------"); } public static void main(String[] args) throws KeeperException, InterruptedException { ZkClientWatcher zkClientWatcher = new ZkClientWatcher(); zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); // boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064"); zkClientWatcher.updateNode("/pa2", "7894561"); } }
多線程爲了可以提升應用程序的運行效率,在一個進程中有多條不一樣的執行路徑,同時並行執行,互不影響。
當多個線程同時共享,同一個全局變量或靜態變量,作寫的操做時,可能會發生數據衝突問題,也就是線程安全問題。可是作讀操做是不會發生數據衝突問題。
使用同步代碼塊或者Lock鎖機制,保證在多個線程共享同一個變量只能有一個線程進行操做
共享內存模型指的就是Java內存模型(簡稱JMM),JMM決定一個線程對共享變量的寫入時,能對另外一個線程可見。從抽象的角度來看,JMM定義了線程和主內存之間的抽象關係:線程之間的共享變量存儲在主內存(main memory)中,每一個線程都有一個私有的本地內存(local memory),本地內存中存儲了該線程以讀/寫共享變量的副本。本地內存是JMM的一個抽象概念,並不真實存在。它涵蓋了緩存,寫緩衝區,寄存器以及其餘的硬件和編譯器優化。
從上圖來看,線程A與線程B之間如要通訊的話,必需要經歷下面2個步驟:
1. 首先,線程A把本地內存A中更新過的共享變量刷新到主內存中去。
2. 而後,線程B到主內存中去讀取線程A以前已更新過的共享變量。
下面經過示意圖來講明這兩個步驟:
如上圖所示,本地內存A和B有主內存中共享變量x的副本。假設初始時,這三個內存中的x值都爲0。線程A在執行時,把更新後的x值(假設值爲1)臨時存放在本身的本地內存A中。當線程A和線程B須要通訊時,線程A首先會把本身本地內存中修改後的x值刷新到主內存中,此時主內存中的x值變爲了1。隨後,線程B到主內存中去讀取線程A更新後的x值,此時線程B的本地內存的x值也變爲了1。
從總體來看,這兩個步驟實質上是線程A在向線程B發送消息,並且這個通訊過程必需要通過主內存。JMM經過控制主內存與每一個線程的本地內存之間的交互,來爲java程序員提供內存可見性保證。
總結:什麼是Java內存模型:java內存模型簡稱jmm,定義了一個線程對另外一個線程可見。共享變量存放在主內存中,每一個線程都有本身的本地內存,當多個線程同時訪問一個數據的時候,可能本地內存沒有及時刷新到主內存,因此就會發生線程安全問題。
在分佈式狀況,生成全局訂單號ID
package com.hongmoshui.distributed; import java.text.SimpleDateFormat; import java.util.Date; //生成訂單類 public class OrderNumGenerator { // 全局訂單id public static int count = 0; public String getNumber() { try { Thread.sleep(200); } catch (Exception e) { } SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); return simpt.format(new Date()) + "-" + ++count; } }
package com.hongmoshui.distributed; //使用多線程模擬生成訂單號 public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); public void run() { getNumber(); } public void getNumber() { String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number); } public static void main(String[] args) { System.out.println("####生成惟一訂單號###"); for (int i = 0; i < 100; i++) { new Thread(new OrderService()).start(); } } }
使用synchronized或者loca鎖
package com.hongmoshui.distributed; //使用多線程模擬生成訂單號 public class OrderSynchronizedService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); public void run() { getNumber(); } public void getNumber() { synchronized (this) { String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number); } } public static void main(String[] args) { System.out.println("####生成惟一訂單號###"); OrderService orderService = new OrderService(); for (int i = 0; i < 100; i++) { new Thread(orderService).start(); } } }
package com.hongmoshui.distributed; import java.util.concurrent.locks.ReentrantLock; public class OrderLockService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); // 使用lock鎖 private java.util.concurrent.locks.Lock lock = new ReentrantLock(); public void run() { getNumber(); } public void getNumber() { try { // synchronized (this) { lock.lock(); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number); // } } catch (Exception e) { } finally { lock.unlock(); } } public static void main(String[] args) { System.out.println("####生成惟一訂單號###"); OrderService orderService = new OrderService(); for (int i = 0; i < 100; i++) { new Thread(orderService).start(); } } }
在分佈式狀況,生成全局訂單號ID
在分佈式(集羣)環境下,每臺JVM不能實現同步,在分佈式場景下使用時間戳生成訂單號可能會重複
1.使用數據庫實現分佈式鎖
缺點:性能差、線程出現異常時,容易出現死鎖
2.使用redis實現分佈式鎖
缺點:鎖的失效時間難控制、容易產生死鎖、非阻塞式、不可重入
3.使用zookeeper實現分佈式鎖
實現相對簡單、可靠性強、使用臨時節點,失效時間容易控制
分佈式鎖通常用在分佈式系統或者多個應用中,用來控制同一任務是否執行或者任務的執行順序。在項目中,部署了多個tomcat應用,在執行定時任務時就會遇到同一任務可能執行屢次的狀況,咱們能夠藉助分佈式鎖,保證在同一時間只有一個tomcat應用執行了定時任務
使用zookeeper建立臨時序列節點來實現分佈式鎖,適用於順序執行的程序,大致思路就是建立臨時序列節點,找出最小的序列節點,獲取分佈式鎖,程序執行完成以後此序列節點消失,經過watch來監控節點的變化,從剩下的節點的找到最小的序列節點,獲取分佈式鎖,執行相應處理,依次類推……
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
package com.hongmoshui.distributed; public interface Lock { // 獲取到鎖的資源 public void getLock(); // 釋放鎖 public void unLock(); }
package com.hongmoshui.distributed; import org.I0Itec.zkclient.ZkClient; //將重複代碼寫入子類中.. public abstract class ZookeeperAbstractLock implements Lock { // zk鏈接地址 private static final String CONNECTSTRING = "127.0.0.1:2181"; // 建立zk鏈接 protected ZkClient zkClient = new ZkClient(CONNECTSTRING); protected static final String PATH = "/lock"; public void getLock() { if (tryLock()) { System.out.println("##獲取lock鎖的資源####"); } else { // 等待 waitLock(); // 從新獲取鎖資源 getLock(); } } // 獲取鎖資源 abstract boolean tryLock(); // 等待 abstract void waitLock(); public void unLock() { if (zkClient != null) { zkClient.close(); System.out.println("釋放鎖資源..."); } } }
package com.hongmoshui.distributed; import java.util.concurrent.CountDownLatch; import org.I0Itec.zkclient.IZkDataListener; public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock { private CountDownLatch countDownLatch = null; @Override boolean tryLock() { try { zkClient.createEphemeral(PATH); return true; } catch (Exception e) { // e.printStackTrace(); return false; } } @Override void waitLock() { IZkDataListener izkDataListener = new IZkDataListener() { public void handleDataDeleted(String path) throws Exception { // 喚醒被等待的線程 if (countDownLatch != null) { countDownLatch.countDown(); } } public void handleDataChange(String path, Object data) throws Exception { } }; // 註冊事件 zkClient.subscribeDataChanges(PATH, izkDataListener); if (zkClient.exists(PATH)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } // 刪除監聽 zkClient.unsubscribeDataChanges(PATH, izkDataListener); } }
package com.hongmoshui.distributed; import com.hongmoshui.OrderNumGenerator; public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); // 使用lock鎖 // private // java.util.concurrent.locks.Lock // lock = new ReentrantLock(); private Lock lock = new ZookeeperDistrbuteLock(); public void run() { getNumber(); } public void getNumber() { try { lock.getLock(); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number); } catch (Exception e) { e.printStackTrace(); } finally { lock.unLock(); } } public static void main(String[] args) { System.out.println("####生成惟一訂單號###"); // OrderService orderService = new OrderService(); for (int i = 0; i < 100; i++) { new Thread(new OrderService()).start(); } } }
使用Zookeeper實現負載均衡原理,服務器端將啓動的服務註冊到,zk註冊中心上,採用臨時節點。客戶端從zk節點上獲取最新服務節點信息,本地使用負載均衡算法,隨機分配服務器。
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.8</version> </dependency>
ServerHandler:
package com.hongmoshui.LoadBalance; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; //ServerHandler public class ServerHandler implements Runnable { private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); if (body == null) break; System.out.println("Receive : " + body); out.println("Hello, " + body); } } catch (Exception e) { if (in != null) { try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if (out != null) { out.close(); } if (this.socket != null) { try { this.socket.close(); } catch (IOException e1) { e1.printStackTrace(); } this.socket = null; } } } }
ZkServerScoekt:
package com.hongmoshui.LoadBalance; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; //##ServerScoekt服務端 public class ZkServerScoekt implements Runnable { private int port = 18080; public static void main(String[] args) throws IOException { int port = 18080; ZkServerScoekt server = new ZkServerScoekt(port); Thread thread = new Thread(server); thread.start(); } public ZkServerScoekt(int port) { this.port = port; } public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println("Server start port:" + port); Socket socket = null; while (true) { socket = serverSocket.accept(); new Thread(new ServerHandler(socket)).start(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (serverSocket != null) { serverSocket.close(); } } catch (Exception e2) { } } } }
package com.hongmoshui.LoadBalance; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; public class ZkServerClient { public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String name; try { name = console.readLine(); if ("exit".equals(name)) { System.exit(0); } client.send(name); } catch (IOException e) { e.printStackTrace(); } } } // 註冊全部server public static void initServer() { listServer.clear(); listServer.add("127.0.0.1:18080"); } // 獲取當前server信息 public static String getServer() { return listServer.get(0); } public void send(String name) { String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(cfg[0], Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println(name); while (true) { String resp = in.readLine(); if (resp == null) break; else if (resp.length() > 0) { System.out.println("Receive : " + resp); break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
package com.hongmoshui.LoadBalance; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import org.I0Itec.zkclient.ZkClient; public class ZkServerScoekt2 implements Runnable { private static int port = 18081; public static void main(String[] args) throws IOException { ZkServerScoekt server = new ZkServerScoekt(port); Thread thread = new Thread(server); thread.start(); } public ZkServerScoekt2(int port) { this.port = port; } public void regServer() { // 向ZooKeeper註冊當前服務器 ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000); String path = "/test/server" + port; if (client.exists(path)) client.delete(path); client.createEphemeral(path, "127.0.0.1:" + port); } public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); regServer(); System.out.println("Server start port:" + port); Socket socket = null; while (true) { socket = serverSocket.accept(); new Thread(new ServerHandler(socket)).start(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (serverSocket != null) { serverSocket.close(); } } catch (Exception e2) { } } } }
package com.hongmoshui.LoadBalance; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; public class ZkServerClient2 { public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String name; try { name = console.readLine(); if ("exit".equals(name)) { System.exit(0); } client.send(name); } catch (IOException e) { e.printStackTrace(); } } } // 註冊全部server public static void initServer() { final String path = "/test"; final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); List<String> children = zkClient.getChildren(path); listServer.clear(); for (String p : children) { listServer.add((String) zkClient.readData(path + "/" + p)); } // 訂閱節點變化事件 zkClient.subscribeChildChanges("/test", new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { listServer.clear(); for (String p : currentChilds) { listServer.add((String) zkClient.readData(path + "/" + p)); } System.out.println("####handleChildChange()####listServer:" + listServer.toString()); } }); } // 請求次數 private static int count = 1; // 服務數量 private static int serverCount = 2; // 獲取當前server信息 public static String getServer() { String serverName = listServer.get(count % serverCount); ++count; return serverName; } public void send(String name) { String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(cfg[0], Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println(name); while (true) { String resp = in.readLine(); if (resp == null) break; else if (resp.length() > 0) { System.out.println("Receive : " + resp); break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
有一個向外提供的服務,服務必須7*24小時提供服務,不能有單點故障。因此採用集羣的方式,採用master、slave的結構。一臺主機多臺備機。主機向外提供服務,備機負責監聽主機的狀態,一旦主機宕機,備機要迅速接代主機繼續向外提供服務。從備機選擇一臺做爲主機,就是master選舉。
右邊三臺主機會嘗試建立master節點,誰建立成功了,就是master,向外提供。其餘兩臺就是slave。
全部slave必須關注master的刪除事件(臨時節點,若是服務器宕機了,Zookeeper會自動把master節點刪除)。若是master宕機了,會進行新一輪的master選舉。本次咱們主要關注master選舉,服務註冊、發現先不討論。
» 領導者(leader),負責進行投票的發起和決議,更新系統狀態
» 學習者(learner),包括跟隨者(follower)和觀察者(observer),follower用於接受客戶端請求並想客戶端返回結果,在選主過程當中參與投票
» Observer能夠接受客戶端鏈接,將寫請求轉發給leader,但observer不參加投票過程,只同步leader的狀態,observer的目的是爲了擴展系統,提升讀取速度
» 客戶端(client),請求發起方
• Zookeeper的核心是原子廣播,這個機制保證了各個Server之間的同步。實現這個機制的協議叫作Zab協
議。Zab協議有兩種模式,它們分別是恢復模式(選主)和廣播模式(同步)。當服務啓動或者在領導者
崩潰後,Zab就進入了恢復模式,當領導者被選舉出來,且大多數Server完成了和leader的狀態同步之後
,恢復模式就結束了。狀態同步保證了leader和Server具備相同的系統狀態。
• 爲了保證事務的順序一致性,zookeeper採用了遞增的事務id號(zxid)來標識事務。全部的提議( proposal)都在被提出的時候加上了zxid。實現中zxid是一個64位的數字,它高32位是epoch用來標識 leader關係是否改變,每次一個leader被選出來,它都會有一個新的epoch,標識當前屬於那個leader的 統治時期。低32位用於遞增計數。 • 每一個Server在工做過程當中有三種狀態: LOOKING:當前Server不知道leader是誰,正在搜尋 LEADING:當前Server即爲選舉出來的leader FOLLOWING:leader已經選舉出來,當前Server與之同步