Oscache分佈式集羣配置總結

因爲項目須要,須要用到oscache的分佈式集羣,爲了實現分佈式環境下消息的通知,目前兩種比較流行的作法是使用 JMSJGROUPS,這兩種方式都在底層實現了廣播發布消息。 因爲JGroups能夠提供可靠的廣播通訊.因此我準備採用JGroupshtml

須要用的jar: oscache.jar 2.4.1緩存組件),jgroups.jar2.8.GAIP組播,commons-loggin.jar1.1(日誌記錄用的),concurrent-1.3.2.jar(線程同步用的)。儘可能使用當前最高版本。java

按照oscache的官方文檔說明,docs/wiki/Documentation.html下的Tutorial3Clustering OSCache,提到了JavaGroups Configurationweb

Just make sure you have jgroups-all.jar file in your classpath (for a webapp put it in WEB-INF/lib), and add the JavaGroups broadcasting listener to your oscache.properties file like this:數據庫

cache.event.listeners=com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListenerapache

In most cases, that's it! OSCache will now broadcast any cache flush events across the LAN. The jgroups-all.jar library is not included with the binary distribution due to its size, however you can obtain it either by downloading the full OSCache distribution, or by visiting the JavaGroups website.緩存

If you want to run more than one OSCache cluster on the same LAN, you will need to use different multicast IP addresses. This allows the caches to exist in separate multicast groups and therefore not interfere with each other. The IP to use can be specified in your oscache.properties file by the cache.cluster.multicast.ip property. The default value is 231.12.21.132, however you can use any class D IP address. Class D address fall in the range 224.0.0.0 through 239.255.255.255.服務器

If you need more control over the multicast configuration (eg setting network timeout or time-to-live values), you can use the cache.cluster.properties configuration property. Use this instead of the cache.cluster.multicast.ip property. The default value is:session

UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;/app

mcast_send_buf_size=150000;mcast_recv_buf_size=80000):/框架

PING(timeout=2000;num_initial_members=3):/

MERGE2(min_interval=5000;max_interval=10000):/

FD_SOCK:VERIFY_SUSPECT(timeout=1500):/

pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):/

UNICAST(timeout=300,600,1200,2400):/

pbcast.STABLE(desired_avg_gossip=20000):/

FRAG(frag_size=8096;down_thread=false;up_thread=false):/

pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)

這些配置你們確定都看過,並且看爛了,可是也的確是要這麼配置的。

個人開發環境是Myeclipse8.0+Spring2.0+Oracle10g,其中還有些其餘框架就不提了,主要是說說如何配置並實現oscache分佈式集羣。

首先要修改oscache.properties配置文件,我使用的是內存緩存,修改的地方不是不少,

cache.memory=truecache.blocking=true這個要打開,同步增長cache 須要用到這個塊),cache.capacity=100000,後面的Clustering 方面的配置就按照上面的官方說明那麼配就能夠了。而後是LOG4J,這個你們應該都用到了的,接着配這個日誌:

log4j.logger.com.opensymphony.oscache.base=INFO,A6

log4j.logger.com.opensymphony.oscache.plugins.clustersupport.AbstractBroadcastingListener=INFO,A6

log4j.logger.com.opensymphony.oscache.general.GeneralCacheAdministrator=INFO,A6

log4j.logger.com.opensymphony.oscache.plugins.clustersupport.ClusterNotification=INFO,A6

log4j.logger.com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener=INFO,A6

log4j.logger.org.jgroups.blocks.NotificationBus=INFO,A6

//說明:我主要是記錄在使用到這幾類的時候的一些後臺日誌,並且也是必需要記錄這幾個的,否則你怎麼知道集羣相關的信息還有cache有沒有更新同步等。

log4j.appender.A6.Threshold = INFO

log4j.appender.A6=org.apache.log4j.DailyRollingFileAppender

log4j.appender.A6.File=/data/logs/mmbosslogs/oscache.log

log4j.appender.A6.DatePattern  =  '_'yyyy-MM-dd'.log'

log4j.appender.A6.layout=org.apache.log4j.PatternLayout

log4j.appender.A6.MaxFileSize = 1024KB

log4j.appender.A6.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n

//日誌記錄配置本身按本身的路徑去配吧

而後是配置commons-logging.properties

org.apache.commons.logging.Log=org.apache.commons.logging.impl.Log4JLogger

這樣就OK了。

在項目中的每一個頁面須要cache的地方配置好cachekey還有自動刷新的時間,默認是3600秒,key最好精確一點,好比在默認的URI後面再加上其餘的ID參數等,我是這樣配置的:

String cachekey=request.getRequestURI()+"|"+albumid+pageno;

int cacheTime = 3600;

<cache:cachetime="<%=cacheTime%>" key="<%=cachekey%>" >

2個服務器上,把須要用到cache緩存的頁面按照上面的配置都配置好,而後就能夠部署並啓動服務器了。

最開始我只實現了同步Flush,就是在一個節點服務器上清空某個頁面的cache,而後通知其餘集羣節點也去Flush這個頁面,也就是當這個頁面再次被訪問時,這個頁面將會從新去數據庫讀數據,cache也將更新。我作了一個只用來Flush緩存的頁面,在裏面用到了

GeneralCacheAdministratorflushAll()方法。2行代碼就OK了。

補充:我在sys-service中加入了:

<beanid="cacheAdministrator"

class="com.opensymphony.oscache.general.GeneralCacheAdministrator"

destroy-method="destroy"/>

GeneralCacheAdministratorcom.opensymphony.oscache.general.GeneralCacheAdministrator中。

在這個時候我用,jgroups.包是2.2.8版本的。

這些準備工做作好之後,啓動服務器,而後我就去訪問一個頁面, 固然這個頁面有用到cache緩存。若是正常,就會在服務器後臺打印以下信息:

-----------------------------------------------------------------

GMS: address=ThinkPad-48956, cluster=OSCacheBus, physical address=192.168.100.85:1351

-------------------------------------------------------------------上面的信息就說明JavaGroupsBroadcastingListener已初始化啓動完成。

可是沒有打印出這個信息,就說明出問題了。固然我第一次測試的時候並無出現上面的信息,這時候我從日誌記錄裏看到了報錯信息,就是LOG4J裏配置的osache.log

ERROR AbstractCacheAdministrator:330 - Could not initialize listener 'com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener'. Listener ignored.

com.opensymphony.oscache.base.InitializationException: Initialization failed: ChannelException: failed loading class: java.lang.ClassNotFoundException: [Lorg.jgroups.Address;

[Lorg.jgroups.Address這個東東在網上搜了很久,有些老外說是JDKBUG,不支持,緣由是ClassLoader.loadClass is not supposed to support the array class name
syntax,可是jdk一直沒有修復這個bug。其次網上還有兄弟說
運行ClassConfiguratormain函數也有問題,這段代碼之前是沒有問題,確定是環境的問題。
而後繼續查找,發現是jg-magic-map.xml的問題網上說刪除<class>
       <description>Object Array</description>
       <class-name>[Ljava.lang.Object;</class-name>
       <preload>true</preload>
       <magic-number>37</magic-number>
</class>
就ok。我在一些國外的網站上也查了,有些老外也是這麼說的,把這個Lorg.jgroups.Address屬性給他刪除了就能夠了,可是我測試發現,沒用,你把這個刪除後,它又來報錯了:
看看日誌:

Could not initialize listener 'com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener'. Listener ignored.

com.opensymphony.oscache.base.InitializationException: Initialization failed: ChannelException: failed loading class: java.lang.ClassNotFoundException: [Ljava.lang.Object;

一樣 [Ljava.lang.Object也在jg-magic-map.xml裏,這個是JAVA的基礎對象包,咋可能不認識。。。JavaGroupsBroadcastingListener仍是不能正常初始化鬱悶了。。。

接着幾天一直想着這個問題,網上慢慢的焦急的搜尋着答案。一直沒搜到,後來直接去了JGROUPS的官網,看到有最新版2.8.GA,換個高級版本是否是就沒問題了?

換上去了,而後我仍是去訪問有cache緩存的那個頁面,果真換了這個高級版本的包還真沒這個問題了。

-----------------------------------------------------------------

GMS: address=ThinkPad-48956, cluster=OSCacheBus, physical address=192.168.100.85:1351

--------------------------------------------------------------這個信息在服務器後臺打印出來了,監聽啓動OK了,並且從新訪問這個頁面的時候,打開速度仍是很快,說明cache生效了。再

INFO JavaGroupsBroadcastingListener:117 - JavaGroups clustering support started successfully

INFO AbstractBroadcastingListener:40 - AbstractBroadcastingListener registered

一切正常,並且監聽也已註冊,註冊就是在知道有你這個監聽點,好比另外一個服務器啓動,也初始化正常,也註冊了,這個服務器就知道集羣中又多了一個節點。以下日誌:

INFO JavaGroupsBroadcastingListener:189 - A new member at address 'ThinkPad-49675' has joined the cluster

若是有一個註冊過的服務器掛了或者是關閉了,就會看到以下日誌信息:

INFO JavaGroupsBroadcastingListener:132 - JavaGroups shutting down...

INFO JavaGroupsBroadcastingListener:201 - Member at address 'ThinkPad-30262' left the cluster

INFO JavaGroupsBroadcastingListener:144 - JavaGroups shutdown complete.

下面接着測試下同步FLUSH,打開那個用來FLUSH緩存的頁面(刷新範圍是appliction級的,也就是所有的cache都清空),頁面運行完成後這個服務器的全部cache都清空了,而後去看另外一臺服務器上的oscache日誌:

INFO AbstractBroadcastingListener:174 - Cluster notification (type=4, data=Wed Jan 13 13:02:34 CST 2010) was received.

從com.opensymphony.oscache.plugins.clustersupport下的ClusterNotification類中能夠得知類型4就是 public static final int FLUSH_CACHE = 4; Specifies a notification message indicating that an entire cache should be flushed.在原代碼裏能夠看到的。

而後去打開本服務器的那個剛纔加載過cache的頁面,打開不快了,服務器後臺在刷查詢語句呢,這就對了,FLUSH功能實現了;而後打開另外一個節點服務器也是打開剛纔那個頁面,一樣的效果,也是從新讀數據庫,同FLUSH緩存已經OK

根據項目須要,同步FLUSH不能知足,須要作到cache同步增長,同更新等功能,也就是說當羣全部節點都已經正常啓動監聽,其中某一個節點新增了一個cache,就會廣播通知其餘節點,並增長這個cache

測試過程當中發現,一個節點 Cache.putInCache(key,content) 加入緩存後,
在另外一個節點並無到任何數據同步信息。
因而翻看了osccache Cache AbstractBroadcastingListener源碼發現;
在Cache的事件處理中,AbstractBroadcastingListener只是提供了 cacheflushXXX 相關的信息通知,
cacheEntryAddedcacheEntryRemovedcacheEntryUpdated都未作處理。
 public void cacheFlushed(CachewideEvent event) {
       if (!Cache.NESTED_EVENT.equals(event.getOrigin()) && !CLUSTER_ORIGIN.equals(event.getOrigin())) {
           if (log.isDebugEnabled()) {
               log.debug("cacheFushed called (" + event + ")");
           }

   //此處發送廣播消息通知其餘節點
           sendNotification(new ClusterNotification(ClusterNotification.FLUSH_CACHE, event.getDate()));
       }
   }

   // --------------------------------------------------------
   // The remaining events are of no interest to this listener
   // --------------------------------------------------------
   public void cacheEntryAdded(CacheEntryEvent event) {
   }

   public void cacheEntryRemoved(CacheEntryEvent event) {
   }

   public void cacheEntryUpdated(CacheEntryEvent event) {
   }

然來要本身來實現這個功能。。。那就實現吧。

要想實現這個幾個功能,必需要看看這個幾個類:

AbstractBroadcastingListener.java

//類說明:Implementation of a CacheEntryEventListener. It broadcasts the flush events

 across a cluster to other listening caches. Note that this listener cannot  be used in  conjection with session caches.

ClusterNotification.java,

//類說明: A notification message that holds information about a cache event. This  class is <code>Serializable</code> to allow it to be sent across the  network to other machines running in a cluster.

Cache.java,

//類說明:Provides an interface to the cache itself. Creating an instance of this class  will create a cache that behaves according to its construction parameters. The public API provides methods to manage objects in the cache and configure  any cache event listeners.

CacheEntry.java,

//類說明: A CacheEntry instance represents one entry in the cache. It holds the object that  is being cached, along with a host of information about that entry such as the  cache key, the time it was cached, whether the entry has been flushed or not and  the groups it belongs to.

CacheEntryEvent.java

//類說明: CacheEntryEvent is the object created when an event occurs on a  cache entry (Add, update, remove, flush). It contains the entry itself and  its map.

還有jgroups裏面很重要的幾個類:

NotificationBus

//類說明:This class provides notification sending and handling capability.

* Producers can send notifications to all registered consumers.

* Provides hooks to implement shared group state, which allows an

* application programmer to maintain a local cache which is replicated

* by all instances. NotificationBus sits on

* top of a channel, however it creates its channel itself, so the

* application programmers do not have to provide their own channel.

(org.jgroups.util;Util

//類說明:Collection of various utility routines that can not be assigned to other classes.

Util 類裏有不少共用方法,例如在序列化的時候把對象轉成流對象,反序列化的時候把流對象讀成對象等,後面遇到的問題就和這個有關係。以上幾個類是我遇到問題後,順藤摸瓜着慢慢找問題的時候看到的,要想了解oscachejgroups消息同步是怎樣一個順序流程,就必須瞭解這幾個類,當中還有些和event事件監聽有關係的這裏就不一一提到,有空能夠在SVN上把源代碼拉下來看看就知道了。

仍是來接着說實現要實現的功能,瞭解了上面幾個類的功能和總體的流程後,我實現的代碼以下:

public void cacheEntryAdded(CacheEntryEvent event)

{if(!CLUSTER_ORIGIN.equals(event.getOrigin()))

{sendNotification(new ClusterNotification(ClusterNotification.CLUSTER_ENTRY_ADD,new CacheEntryEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));}

}

public void cacheEntryRemoved(CacheEntryEvent event)

{if(!CLUSTER_ORIGIN.equals(event.getOrigin()))

{sendNotification(new ClusterNotification(ClusterNotification.CLUSTER_ENTRY_DELETE,new CacheEntryEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));}}

public void cacheEntryUpdated(CacheEntryEvent event)

{System.out.println("origin:"+event.getOrigin());if(!CLUSTER_ORIGIN.equals(event.getOrigin()))

{sendNotification(new ClusterNotification(ClusterNotification.CLUSTER_ENTRY_UPDATE,new CacheEntryEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));}

}

代碼寫好後,接着就來調試了,我打開了2myeclipse,端口一個是80一個8080,啓動完成後,消息廣播監聽也都正常初始化,日誌裏也都看到2個節點已經加入到集羣中。而後選擇其中一個服務器,訪問一個帶有cache緩存的頁面,若是能把這個cache同步到其餘節點的話,去訪問另外一個服務器上的名字想同的頁面時,應該很快打開的,但是我去打開的時候,仍是很慢,在從數據庫裏讀數據,而沒有增長緩存成功。查看日誌文件oscache.log,發現有報錯: ERROR NotificationBus:292 - exception=java.lang.IllegalArgumentException: java.lang.NullPointerExceptioncache同步增長不成功,只能跟蹤代碼看看問題出在那。重複上面的操做,這時event監聽器監聽到了當前的操做,而且跳到cacheEntryAdded這個方法中,而後就調用sendNotification方法給其餘節點發消息,這個方法sendNotification(ClusterNotification message);須要一個MESSAGE對象,而

new  ClusterNotification(ClusterNotification.CLUSTER_ENTRY_ADD,new CacheEntryEvent(event.getMap(),event.getEntry(),CLUSTER_ORIGIN)));

就是構造一個message對象,

接着就去jgroups中去運行  sendNotification(Serializable n) { sendNotification(null, n); }這個方法,而後在sendNotification(Address dest, Serializable n) 這個方法中去構造一個對象流,代碼以下:

public void sendNotification(Address dest, Serializable n) {

       Message msg=null;

       byte[] data=null;

       Info info;

       try {

           if(n == null) return;

           info=new Info(Info.NOTIFICATION, n);

           data=Util.objectToByteBuffer(info);//在公共方法裏來把這個對象轉成bytebuffer

           msg=new Message(dest, null, data);

           if(channel == null) {

               if(log.isErrorEnabled()) log.error("channel is null. Won't send notification");

               return;

           }

           channel.send(msg);

       }

       catch(Throwable ex) {

           if(log.isErrorEnabled()) log.error("error sending notification", ex);

       }

   }

在Util.objectToByteBuffer(info);中有這段代碼,也就是按流程會走到這裏

else { // will throw an exception if object is not serializable

               out_stream.write(TYPE_SERIALIZABLE);

               out=new ObjectOutputStream(out_stream);

               ((ObjectOutputStream)out).writeObject(obj);

注意out_stream.write(TYPE_SERIALIZABLE);在將INFO對象寫到流對象的時候,在以前加了一個標示位TYPE_SERIALIZABLE,也就是說這個對象流裏不徹底是對象。這樣一來就得看看後面在反序列化對象的時候是怎麼操做的。

這個消息發出去後,在另外一個myecilipse中進入了我設置的斷點 public void receive(Message msg);裏面有段   obj=msg.getObject();,就是調objectFromByteBuffer(byte[] buffer, int offset, int length)方法就是處理這個的,看看具體代碼:

case TYPE_SERIALIZABLE: // the object is Externalizable or Serializable

                   in=in_stream; // changed Nov 29 2004 (bela)

                   ObjectInputStream inn= new ObjectInputStream(in);

                   retval=inn.readObject();

                   break;

在執行retval=inn.readObject();的時候進到了異常處理:

catch(Throwable ex) {

           if(log.isErrorEnabled()) log.error("exception=" + ex);

       }

通常狀況下序列化裏應該是一個完整的對象,可是這裏不全是對象,是否是那個標示位的緣由呢,我把代碼改了下,讓它把對象流寫到我本地的一個文件中,而後我在寫了個反序列化的類,發現反不出來,直接報錯,接着我修改代碼,在把對象讀到對象流並寫到本地文件的時候,沒有把標示位寫進去,而後我再反序列化,發現正常,能夠獲得info對象。

接下來修改objectFromByteBuffer方法:

case TYPE_SERIALIZABLE: // the object is Externalizable or Serializable

                   // changed Nov 29 2004 (bela)

byte[] result2=new byte[buffer.length-1];

                   System.arraycopy(buffer,1,result2,0,result2.length);  

                   ByteArrayInputStream in_stream1=new ByteArrayInputStream(result2, offset, result2.length);

                   in=in_stream1;

                   ObjectInputStream inn= new ObjectInputStream(in);

                  // System.out.println(inn.readObject());

                   retval=inn.readObject();

                  // retval=(Info)inn.readObject();

                   break;

反序列化的時候把標示位給它去掉,而後再去讀對象,改了後,程序在這裏終於正常的走下去了。而後就到了case ClusterNotification.CLUSTER_ENTRY_ADD:

System.out.println("cluster cache add:"+message.getData());

if(message.getData()instanceof ClusterNotification)

{

CacheEntryEvent event=(CacheEntryEvent)message.getData();

cache.putInCache(event.getKey(),event.getEntry().getContent(),null,null,CLUSTER_ORIGIN);

}

break;

在執行cache.putInCache的時候會拋出一個異常,試了不少次,結果同樣,後來我就把

配置文件中CACHE ALGORITHM 修改成

cache.algorithm=com.opensymphony.oscache.base.algorithm.UnlimitedCache

重複上面的操做,日誌中顯示

INFO AbstractBroadcastingListener:174 - Cluster notification (type=9, data=key=/MMBOSS/rdp/simple/v1.0/Search.jsp_GET__42Lrd1K4b5IDrON_wukcrQ==1) was received.

服務器後臺信息顯示:

cluster cache add:key=/MMBOSS/rdp/simple/v1.0/Search.jsp_GET__42Lrd1K4b5IDrON_wukcrQ==1

obj=type= NOTIFICATION, notification=type=9, data=key=//MMBOSS/rdp/simple/v1.0/alum.jsp

而後訪問這個頁面,一點就開,同步增長cache完成。

考慮到之後的集羣環境,若是節點不少,每當更新一個欄目的時候,都去所有刷新,這個就有點浪費了,因此以前用的application級所有刷新就不是很方便使用了,須要更加精確。

接下來我想到,cache標籤有個refresh屬性,能夠本身控制何時刷新,接下來我就修改了cache標籤:

<cache:cachetime="<%=cacheTime%>" key="<%=cachekey%>" refresh='<%=getNeedRefresh(request.getParameter("needRefresh"))%>' >

getNeedRefresh()是我寫的一個方法,主要是處理url中傳進來的needRefresh參數,把這個string轉換成boolean型,由於refresh屬性中只有truefalse,並且是boolean型。這樣設定後,若是有刷新某個欄目,只須要在他的url地址中加入一個請求參數needRefresh=true就能夠了,例如:

先打開這個欄目,控制檯就出現以下信息:

origin:CLUSTER

obj=type=NOTIFICATION,notification=type=9, data=key=//MMBOSS/rdp/simple/v1.0/Search.jsp|2308

cluster cache add:key=//MMBOSS/rdp/simple/v1.0/Search.jsp|2308

消息顯示有個新欄目的cache數據產生了,而且發消息通知cluster每一個節點來增長這個數據爲datacacheorigin是範圍,type=9是消息類型,9cache增長,我在ClusterNotification這個類中增長了幾個消息類型:

   public final static int ACTION_ADD_OBJ=5;

   public final static int ACTION_UPDATE_OBJ=6;

   public final static int ACTION_DELETE_OBJ=7;

   public final static int ACTION_FLUSH_OBJ=8;

   public final static int CLUSTER_ENTRY_ADD=9;

   public final static int CLUSTER_ENTRY_UPDATE=10;

   public final static int CLUSTER_ENTRY_DELETE=11;

接下來試下但個更新參數:

http://localhost:8080/MMBOSS/rdp/simple/v1.0/Search.jsp?groupcode=2308&needRefresh=true

當這個請求成功發送給服務器後,服務器就會去從新讀這個欄目,而且從新去數據庫查詢此欄目的相關數據信息。控制檯信息以下:

origin:CLUSTER

obj=type=NOTIFICATION,notification=type=10, data=key=//MMBOSS/rdp/simple/v1.0/Search.jsp|2308

cluster cache update:key=//MMBOSS/rdp/simple/v1.0/Search.jsp|2308

日誌消息爲:

INFO AbstractBroadcastingListener:174 - Cluster notification (type=10, data=key=//MMBOSS/rdp/simple/v1.0/Search.jsp|2308) was received.

經過雙開myeclipse,重複上面的操做,獲得的結果是,同步更新功能已實現。後面還將這個刷新控制作了一個專門的頁面,把帶有請求參數needRefresh=true的欄目URL地址在後臺運行,在前臺頁面是看不到更新的過程,用戶只須要刷新下頁面,更新後的數據就展示出來了。

oscache的功能就剩下Group相關的應用沒有實現,例如(cacheGroupAddedcacheGroupEntryAdded等)因爲時間問題就沒繼續研究下去。

相關文章
相關標籤/搜索