memcached分佈式主要實現的功能爲:每份數據即有主服務器又有備份服務器,全部的數據會按照必定的策略保存到不一樣的服務器,從而下降外部請求對服務器的壓力。要實現這部分功能須要keepalived和repcached這連個插件。html
一、每一個數據經過客戶端xmemcached,將數據按照哈希一致性分發到不一樣的主服務器。java
二、keepalived的功能主要是用來將外部請求按照必定路由規則分發到主服務器或者備份服務器。node
三、主服務器和備份服務器之間的數據同步時經過repcached實現。python
如今主要分爲連部分進行介紹memcached的分佈式,基礎設備介紹和分佈式配置。linux
XMemcached是一個新java memcached client。經過Xmemcached與memcached進行數據交互。算法
Memcached的分佈只能經過客戶端來實現,XMemcached實現了此功能,而且提供了一致性哈希(consistent hash)算法的實現。編程
XMemcached容許經過設置節點的權重來調節memcached的負載,設置的權重越高,該memcached節點存儲的數據將越多,所承受的負載越大。後端
XMemcached容許經過JMX或者代碼編程實現節點的動態添加或者移除,方便用戶擴展和替換節點等。數組
剛纔已經提到java nio一般對一個memcached節點使用一個鏈接,而XMemcached一樣提供了設置鏈接池的功能,對同一個memcached能夠建立N個鏈接組成鏈接池來提升客戶端在高併發環境下的表現,而這一切對使用者來講倒是透明的。啓用鏈接池的前提條件是保證數據之間的獨立性或者數據更新的同步,對同一個節點的各個鏈接之間是沒有作更新同步的,所以應用須要保證數據之間是相互獨立的或者所有采用CAS更新來保證原子性。緩存
XMemcached一樣支持客戶端的分佈策略,默認分佈的策略是按照key的哈希值模以鏈接數獲得的餘數,對應的鏈接就是將要存儲的節點。若是使用默認的分佈策略,你不須要作任何配置或者編程。
XMemcached一樣支持一致性哈希(consistent hash),經過編程設置:
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddressMap(「192.168.121.16:11211」));
builder.setSessionLocator(new KetamaMemcachedSessionLocator());//設置哈希一致策略
MemcachedClient client=builder.build();
Memcached是經過cas協議實現原子更新,所謂原子更新就是compare and set,原理相似樂觀鎖,每次請求存儲某個數據同時要附帶一個cas值,memcached比對這個cas值與當前存儲數據的cas值是否相等,若是相等就讓新的數據覆蓋老的數據,若是不相等就認爲更新失敗,這在併發環境下特別有用。XMemcached提供了對CAS協議的支持(不管是文本協議仍是二進制協議),CAS協議實際上是分爲兩個步驟:獲取CAS值和嘗試更新。先經過gets方法獲取一個GetsResponse,此對象包裝了存儲的數據和cas值,而後經過cas方法嘗試原子更新,若是失敗打印」cas error」。顯然,這樣的方式很繁瑣,而且若是你想嘗試多少次原子更新就須要一個循環來包裝這一段代碼,所以XMemcached提供了一個*CASOperation*接口包裝了這部分操做,容許你嘗試N次去原子更新某個key存儲的數據,無需顯式地調用gets獲取cas值。所以一個典型的使用場景以下:
GetsResponse<Integer> result = client.gets("a"); long cas = result.getCas(); //嘗試將a的值更新爲2 if (!client.cas("a", 0, 2, cas)) { System.err.println("cas error"); }
Memcached提供了統計協議用於查看統計信息,getStats方法返回一個map,其中存儲了全部已經鏈接而且有效的memcached節點返回的統計信息:
Map<InetSocketAddress,Map<String,String>> result=client.getStats();
你也能夠統計具體的項目,如統計items項目:
Map<InetSocketAddress,Map<String,String>> result=client.getStatsByItem("items");
Memcached 1.4.3開始支持SASL驗證客戶端,在服務器配置啓用SASL以後,客戶端須要經過受權驗證才能夠跟memcached繼續交互,不然將被拒絕請求。XMemcached 1.2.5開始支持這個特性。假設memcached設置了SASL驗證,典型地使用CRAM-MD5或者PLAIN的文本用戶名和密碼的驗證機制,假設用戶名爲cacheuser,密碼爲123456,那麼編程的方式以下:
MemcachedClientBuilder builder = new XMemcachedClientBuilder( AddrUtil.getAddresses("192.168.121.16:11211")); builder.addAuthInfo(AddrUtil.getOneAddress("192.168.121.16:11211"), AuthInfo.typical("cacheuser", "123456"));
傳入一個int數組,裏面的元素就是節點對應的權重值,好比這裏設置"192.168.121.16:11211"節點的權重爲3。注意,xmemcached的權重是經過複製鏈接的多個引用來實現的,好比權重爲3,那麼就複製3個同一個鏈接的引用放在集合中讓MemcachedSessionLocator查找。
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("192.168.121.16:11211"),new int[]{3}); MemcachedClient memcachedClient = builder.build();
MemcachedClient client=new XMemcachedClient(AddrUtil.getAddresses("192.168.121.16:11211")); //Add two new memcached nodes client.addServer("192.168.121.17:11211"); //Remove memcached servers client.removeServer("192.168.121.16:11211");
Xmemcached是基於java nio的client實現,默認對一個memcached節點只有一個鏈接,這在一般狀況下已經有很是優異的表現。可是在典型的高併發環境下,nio的單鏈接也會遇到性能瓶頸。所以XMemcached支持設置nio的鏈接池,容許創建多個鏈接到同一個memcached節點,可是請注意,這些鏈接之間是不一樣步的,所以你的應用須要本身保證數據更新的同步,啓用鏈接池能夠經過下面代碼:
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("192.168.121.16:11211")); builder.setConnectionPoolSize(5);
從1.3版本開始,xmemcached支持failure模式。所謂failure模式是指,當一個memcached節點down掉的時候,發往這個節點的請求將直接失敗,而不是發送給下一個有效的memcached節點。具體能夠看memcached的文檔。默認不啓用failure模式,啓用failure模式能夠經過下列代碼:
MemcachedClientBuilder builder=…… builder.setFailureMode(true);
不只如此,xmemcached還支持主輔模式,你能夠設置一個memcached的節點的備份節點,當主節點down掉的狀況下,會將原本應該發往主節點的請求轉發給standby備份節點。使用備份節點的前提是啓用failure模式。備份節點設置以下:
MemcachedClient builder=new XmemcachedClientBuilder(AddrUtil.getAddressMap("localhost:11211,localhost:11212 host2:11211,host2:11212"));
上面的例子,將localhost:11211的備份節點設置爲localhost:11212,而將host2:11211的備份節點設置爲host2:11212
memcached存儲大數據的效率是比較低的,當數據比較大的時候xmemcached會幫你壓縮在存儲,取出來的時候自動解壓並反序列化,這個大小閥值默認是16K,能夠經過Transcoder接口的setCompressionThreshold(1.2.1引入)方法修改閥值,好比設置爲1K:
memcachedClient.getTranscoder()).setCompressionThreshold(1024);
這個方法是在1.2.1引入到Transcoder接口,在此以前,你須要經過強制轉換來設置:
((SerializingTranscoder)memcachedClient.getTranscoder()).setCompressionThreshold(1024);
Memcached的序列化轉換器在序列化數值類型的時候有個特殊處理,若是前面N個字節都是0,那麼將會去除這些0,縮減後的數據將更小,例如數字3序列化是0x0003,那麼前面3個0將去除掉成一個字節0x3。反序列化的時候將自動在前面根據數值類型補0。這一特性默認是開啓的,若是考慮到與其餘client兼容的話須要關閉此特性能夠經過:
memcachedClient.getTranscoder()).setPackZeros(false);
在官方客戶端有提供一個sanitizeKeys選項,當選擇用URL當key的時候,MemcachedClient會自動將URL encode再存儲。默認是關閉的,想啓用能夠經過:
memcachedClient.setSanitizeKeys(true);
keepalived是以VRRP協議爲實現基礎的,VRRP全稱Virtual Router Redundancy Protocol,即虛擬路由冗餘協議。虛擬路由冗餘協議,能夠認爲是實現路由器高可用的協議,即將N臺提供相同功能的路由器組成一個路由器組,這個組裏面有一個master和多個backup,master上面有一個對外提供服務的vip(該路由器所在局域網內其餘機器的默認路由爲該vip),master會發組播,當backup收不到vrrp包時就認爲master宕掉了,這時就須要根據VRRP的優先級來選舉一個backup當master。這樣的話就能夠保證路由器的高可用了。keepalived主要有三個模塊,分別是core、check和vrrp。core模塊爲keepalived的核心,負責主進程的啓動、維護以及全局配置文件的加載和解析。check負責健康檢查,包括常見的各類檢查方式。vrrp模塊是來實現VRRP協議的。原理圖爲下圖:
keepalived只有一個配置文件keepalived.conf,裏面主要包括如下幾個配置區域,分別是global_defs、static_ipaddress、static_routes、vrrp_script、vrrp_instance和virtual_server。
global_defs { notification_email { a@abc.com b@abc.com ... } notification_email_from alert@abc.com smtp_server smtp.abc.com smtp_connect_timeout 30 enable_traps router_id host163 }
notification_email 故障發生時給誰發郵件通知。
notification_email_from 通知郵件從哪一個地址發出。
smpt_server 通知郵件的smtp地址。
smtp_connect_timeout 鏈接smtp服務器的超時時間。
enable_traps 開啓SNMP陷阱(Simple Network Management Protocol)。
router_id 標識本節點的字條串,一般爲hostname,但不必定非得是hostname。故障發生時,郵件通知會用到。
static_ipaddress { 10.210.214.163/24 brd 10.210.214.255 dev eth0 ... } static_routes { 10.0.0.0/8 via 10.210.214.1 dev eth0 ... }
static_ipaddress和static_routes區域配置的是是本節點的IP和路由信息。若是你的機器上已經配置了IP和路由,那麼這兩個區域能夠不用配置。其實,通常狀況下你的機器都會有IP地址和路由信息的,所以不必再在這兩個區域配置。
vrrp_script chk_http_port { script "</dev/tcp/127.0.0.1/80" interval 1 weight -10 }
用來作健康檢查的,當時檢查失敗時會將vrrp_instance
的priority
減小相應的值。以上意思是若是script
中的指令執行失敗,那麼相應的vrrp_instance
的優先級會減小10個點。
virtual_server IP Port { delay_loop <INT> lb_algo rr|wrr|lc|wlc|lblc|sh|dh lb_kind NAT|DR|TUN persistence_timeout <INT> persistence_granularity <NETMASK> protocol TCP ha_suspend virtualhost <STRING> alpha omega quorum <INT> hysteresis <INT> quorum_up <STRING>|<QUOTED-STRING> quorum_down <STRING>|<QUOTED-STRING> sorry_server <IPADDR> <PORT> real_server <IPADDR> <PORT> { weight <INT> inhibit_on_failure notify_up <STRING>|<QUOTED-STRING> notify_down <STRING>|<QUOTED-STRING> # HTTP_GET|SSL_GET|TCP_CHECK|SMTP_CHECK|MISC_CHECK HTTP_GET|SSL_GET { url { path <STRING> # Digest computed with genhash digest <STRING> status_code <INT> } connect_port <PORT> connect_timeout <INT> nb_get_retry <INT> delay_before_retry <INT> } } }
delay_loop 延遲輪詢時間(單位秒)。
lb_algo 後端調試算法(load balancing algorithm)。
virtualhost 用來給HTTP_GET和SSL_GET配置請求header的。
sorry_server 當全部real server宕掉時,sorry server頂替。
real_server 真正提供服務的服務器。
weight 權重。
notify_up/down 當real server宕掉或啓動時執行的腳本。
健康檢查的方式,N多種方式。
path 請求real serserver上的路徑。
digest/status_code 分別表示用genhash算出的結果和http狀態碼。
connect_port 健康檢查,若是端口通則認爲服務器正常。
connect_timeout,nb_get_retry,delay_before_retry分別表示超時時長、重試次數,下次重試的時間延遲。
vrrp_sync_group VG_1 { group { inside_network # name of vrrp_instance (below) outside_network # One for each moveable IP. ... } notify_master /path/to_master.sh notify_backup /path/to_backup.sh notify_fault "/path/fault.sh VG_1" notify /path/notify.sh smtp_alert } vrrp_instance VI_1 { state MASTER interface eth0 use_vmac <VMAC_INTERFACE> dont_track_primary track_interface { eth0 eth1 } mcast_src_ip <IPADDR> lvs_sync_daemon_interface eth1 garp_master_delay 10 virtual_router_id 1 priority 100 advert_int 1 authentication { auth_type PASS auth_pass 12345678 } virtual_ipaddress { 10.210.214.253/24 brd 10.210.214.255 dev eth0 192.168.1.11/24 brd 192.168.1.255 dev eth1 } virtual_routes { 172.16.0.0/12 via 10.210.214.1 192.168.1.0/24 via 192.168.1.1 dev eth1 default via 202.102.152.1 } track_script { chk_http_port } nopreempt preempt_delay 300 debug notify_master <STRING>|<QUOTED-STRING> notify_backup <STRING>|<QUOTED-STRING> notify_fault <STRING>|<QUOTED-STRING> notify <STRING>|<QUOTED-STRING> smtp_alert }
vrrp_instance用來定義對外提供服務的VIP區域及其相關屬性。
vrrp_rsync_group用來定義vrrp_intance組,使得這個組內成員動做一致。舉個例子來講明一下其功能:
兩個vrrp_instance同屬於一個vrrp_rsync_group,那麼其中一個vrrp_instance發生故障切換時,另外一個vrrp_instance也會跟着切換(即便這個instance沒有發生故障)。
notify_master/backup/fault 分別表示切換爲主/備/出錯時所執行的腳本。
notify 表示任何一狀態切換時都會調用該腳本,而且該腳本在以上三個腳本執行完成以後進行調用,keepalived會自動傳遞三個參數($1 = "GROUP"|"INSTANCE",$2 = name of group or instance,$3 = target state of transition(MASTER/BACKUP/FAULT))。
smtp_alert 表示是否開啓郵件通知(用全局區域的郵件設置來發通知)。
state 能夠是MASTER或BACKUP,不過當其餘節點keepalived啓動時會將priority比較大的節點選舉爲MASTER,所以該項其實沒有實質用途。
interface 節點固有IP(非VIP)的網卡,用來發VRRP包。
use_vmac 是否使用VRRP的虛擬MAC地址。
dont_track_primary 忽略VRRP網卡錯誤。(默認未設置)
track_interface 監控如下網卡,若是任何一個不通就會切換到FALT狀態。(可選項)
mcast_src_ip 修改vrrp組播包的源地址,默認源地址爲master的IP。(因爲是組播,所以即便修改了源地址,該master仍是能收到迴應的)
lvs_sync_daemon_interface 綁定lvs syncd的網卡。
garp_master_delay 當切爲主狀態後多久更新ARP緩存,默認5秒。
repcached的全稱 replication cached是由日本人發明的memcached的高可用性技術,簡稱複製緩衝區技術。它是一個單master單 slave的方案,但它的 master/slave都是可讀寫的,並且能夠相互同步,若是 master 宕機, slave偵測到鏈接斷了,它會自動 listen而成爲 master;而若是 slave壞掉, master也會偵測到鏈接斷,它就會從新 listen等待新的 slave加入。
如今有一臺主服務器IP爲192.168.121.12,從服務器:192.168.121.14,虛擬ip:192.168.121.16。分別在12和14服務器上配置keepalived和memcached-repcached。
安裝
cd /usr/local/softnew wget http://www.keepalived.org/software/keepalived-1.4.1.tar.gz tar -zxvf keepalived-1.4.1.tar.gz cd keepalived-1.4.1 ./configure --prefix=/usr/local/keepalived make make --prefix=/usr/local/keepalived install
配置
tree -l /usr/local/keepalived/etc -- keepalived | |-- keepalived.conf | `-- samples | |-- keepalived.conf.status_code | |-- keepalived.conf.track_interface | |-- keepalived.conf.vrrp | |-- 。。。 |-- rc.d | `-- init.d | `-- keepalived `-- sysconfig `-- keepalived
若是沒有找到init.d目錄,能夠到keepalived-1.4.1/keepalived/etc/init.d/目錄下找keepalivew。
將配置文件拷貝到系統對應的目錄下:
cp /usr/local/keepalived/etc/keepalived.conf /etc/keepalived/keepalived.conf cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/rc.d/init.d/keepalived cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/keepalived
配置文件修改:
! Configuration File for keepalived global_defs { notification_email { acassen@firewall.loc failover@firewall.loc sysadmin@firewall.loc } notification_email_from Alexandre.Cassen@firewall.loc # smtp_server 192.168.200.1 # smtp_connect_timeout 30 router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict vrrp_garp_interval 0 vrrp_gna_interval 0 } vrrp_instance VI_1 { state BACKUP interface ens33 virtual_router_id 51 priority 100 advert_int 1 authentication { auth_type PASS auth_pass 1111 } virtual_ipaddress { 192.168.121.16 } } virtual_server 192.168.121.16 11211 { delay_loop 6 lb_algo rr lb_kind NAT persistence_timeout 7200 protocol TCP nat_mask 255.255.255.0 # sorry_server 192.168.200.200 1358 real_server 192.168.121.12 11211 { weight 1 TCP_CHECK { connect_timeout 3 retry 3 delay_before_retry 3 connect_port 11211 } } real_server 192.168.121.14 11211 { weight 1 TCP_CHECK { connect_timeout 3 retry 3 delay_before_retry 3 connect_port 11211 } } }
啓動與中止
service keepalived start #啓動服務 service keepalived stop #中止服務 service keepalived restart #重啓服務
repcached最新下載地址:http://sourceforge.net/projects/repcached/files/repcached有兩種方式,一種經過打補丁形式,另外一種直接安裝融合版的memcached。這裏選擇第二種。
安裝
一、安裝libevent環境。
二、安裝memcached
啓動
啓動master:
./memcached -v -d -p 11211 -l 192.168.121.12 -u root
啓動slave:
./memcached -v -d -p 11211 -l 192.168.121.14 -u root -x 192.168.121.12
package memcached; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import net.rubyeye.xmemcached.MemcachedClient; import net.rubyeye.xmemcached.MemcachedClientBuilder; import net.rubyeye.xmemcached.XMemcachedClientBuilder; import net.rubyeye.xmemcached.auth.AuthInfo; import net.rubyeye.xmemcached.command.BinaryCommandFactory; import net.rubyeye.xmemcached.exception.MemcachedException; import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator; import net.rubyeye.xmemcached.transcoders.CompressionMode; import net.rubyeye.xmemcached.transcoders.SerializingTranscoder; import net.rubyeye.xmemcached.utils.AddrUtil; public class TestCluster { private static Map<Integer, MemcachedClient> clientMap = new HashMap<Integer, MemcachedClient>();//client的集合 private static int maxClient = 3; private static int expireTime = 900;//900s(默認的緩存過時時間) private static int maxConnectionPoolSize = 1;//每一個客戶端池子的鏈接數 private static long op_time = 2000L;//操做超時時間 private static String servers = "192.168.121.16:11211"; private static final String KEY_SPLIT = "-";//用於隔開緩存前綴與緩存鍵值 /** * 構建MemcachedClient的map集合 */ static{ MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddressMap(servers)); builder.setConnectionPoolSize(1);//這個默認也是1 builder.setSessionLocator(new KetamaMemcachedSessionLocator(true));//使用一致性hash算法 // builder.addAuthInfo(AddrUtil.getOneAddress("localhost:11211"), AuthInfo // .typical("cacheuser", "123456"));//對服務進行受權認證 SerializingTranscoder transcoder = new SerializingTranscoder(1024*1024);//序列化轉換器,指定最大的數據大小1M transcoder.setCharset("UTF-8");//默認爲UTF-8,這裏可去掉 transcoder.setCompressionThreshold(1024*1024);//單位:字節,壓縮邊界值,任何一個大於該邊界值(這裏是:1M)的數據都要進行壓縮 transcoder.setCompressionMode(CompressionMode.GZIP);//壓縮算法 builder.setFailureMode(true);//在此模式下,某個節點掛掉的狀況下,往這個節點的請求都將直接拋出MemcachedException的異常。 builder.setTranscoder(transcoder); builder.setCommandFactory(new BinaryCommandFactory());//命令工廠 //構建10個MemcachedCient,並放入clientMap for(int i=0;i<maxClient;i++){ try { MemcachedClient client = builder.build(); client.setOpTimeout(op_time);//設置操做超時時間,默認爲1s clientMap.put(i, client); } catch (IOException e) { e.printStackTrace(); } } } public static MemcachedClient getMemcachedClient(){ /* * Math.random():產生[0,1)之間的小數 * Math.random()*maxClient:[0~maxClient)之間的小數 * (int)(Math.random()*maxClient):[0~maxClient)之間的整數 */ return clientMap.get((int)(Math.random()*maxClient)); } public static boolean setCache(String key,Object value,int exp){ boolean setCacheSuccess = false; try { MemcachedClient client = getMemcachedClient(); setCacheSuccess = client.add(key, exp, value); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } return setCacheSuccess; } /** * 獲取緩存 **/ public static Object getCache(String key){ Object value = null; try { MemcachedClient client = getMemcachedClient(); value = client.get(key); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } return value; } public static void main(String[] args) throws Exception { /*for(int i=0;i<100;i++){ System.out.println(Math.random()); }*/ for(int i=0;i<1000;i++){ System.out.println(TestCluster.setCache("hello"+i, "world"+i,0)); System.out.println(TestCluster.getCache("hello"+i)); Thread.sleep(10000); } /*System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2"));*/ } }