簡單來講就是一種去中心化、點對點的數據廣播協議,你能夠把它理解爲病毒的傳播。A傳染給B,B繼續傳染給C,如此下去。java
協議自己只有一些簡單的限制,狀態更新的時間隨着參與主機數的增加以對數的速率增加,即便是一些節點掛掉或者消息丟失也不要緊。不少的分佈式系統都用gossip 協議來解決本身遇到的一些難題。好比說服務發現框架consul
就用了gossip協議( Serf)來作管理主機的關係以及集羣之間的消息廣播,Cassandra也用到了這個協議,用來實現一些節點發現、健康檢查等。node
首先系統須要配置幾個種子節點,好比說A、B, 每一個參與的節點都會維護全部節點的狀態,node->(Key,Value,Version),版本號較大的說明其數據較新,節點P只能直接更新它本身的狀態,節點P只能間接的經過gossip協議來更新本機維護的其餘節點的數據。git
大體的過程以下,github
① SYN:節點A向隨機選擇一些節點,這裏能夠只選擇發送摘要,即不發送valus,避免消息過大apache
② ACK:節點B接收到消息後,會將其與本地的合併,這裏合併採用的是對比版本,版本較大的說明數據較新. 好比節點A向節點B發送數據C(key,value,2),而節點B本機存儲的是C(key,value1,3),那麼由於B的版本比較新,合併以後的數據就是B本機存儲的數據,而後會發回A節點。bootstrap
③ ACK2:節點A接收到ACK消息,將其應用到本機的數據中app
A發GossipDigestSyn => B執行GossipDigestSynVerbHandler B發GossipDigestAck => A執行GossipDigestAckVerbHandler A發GossipDigestAck2 => B執行GossipDigestAck2VerbHandler
這三個類都實現了IVerbHandler
接口,註冊到MessagingService的處理器中:框架
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
這樣當消息模塊接收到消息後就會調用對應的Handler處理,以下面的代碼所示:less
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb); if (verbHandler == null) { //未知的消息不處理 logger.trace("Unknown verb {}", verb); return; } try { verbHandler.doVerb(message, id); } catch (IOException ioe) { handleFailure(ioe); throw new RuntimeException(ioe); } catch (TombstoneOverwhelmingException | IndexNotAvailableException e) { handleFailure(e); logger.error(e.getMessage()); } catch (Throwable t) { handleFailure(t); throw t; }
具體的初始化都是在org.apache.cassandra.service.StorageService#public synchronized void initServer() throws ConfigurationException()
去作的,裏面會調用prepareToJoin()
嘗試加入gossip集羣。dom
private void prepareToJoin() throws ConfigurationException { //volatile修飾保證可見性,已經加入了集羣就直接跳過 if (!joined) { /*....省略...*/ if (!MessagingService.instance().isListening()) //開始監聽消息 MessagingService.instance().listen(); //給本節點起個名字 UUID localHostId = SystemKeyspace.getLocalHostId(); /* * 一次shadow round會獲取全部到與之通信節點擁有的全部節點的信息 */ if (replacing) { localHostId = prepareForReplacement(); appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens)); if (!DatabaseDescriptor.isAutoBootstrap()) { // Will not do replace procedure, persist the tokens we're taking over locally // so that they don't get clobbered with auto generated ones in joinTokenRing SystemKeyspace.updateTokens(bootstrapTokens); } else if (isReplacingSameAddress()) { //only go into hibernate state if replacing the same address (CASSANDRA-8523) logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " + "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " + "repair must be run after the replacement process in order to make this node consistent.", DatabaseDescriptor.getReplaceAddress()); appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); } } else { checkForEndpointCollision(localHostId); } // have to start the gossip service before we can see any info on other nodes. this is necessary // for bootstrap to get the load info it needs. // (we won't be part of the storage ring though until we add a counterId to our state, below.) // Seed the host ID-to-endpoint map with our own ID. getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress()); appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion()); appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId)); appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress())); appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); // load the persisted ring state. This used to be done earlier in the init process, // but now we always perform a shadow round when preparing to join and we have to // clear endpoint states after doing that. loadRingState(); logger.info("Starting up server gossip"); //啓動gossip,好比定時任務等 Gossiper.instance.register(this); Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering. gossipActive = true; // gossip snitch infos (local DC and rack) gossipSnitchInfo(); // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates LoadBroadcaster.instance.startBroadcasting(); HintsService.instance.startDispatch(); BatchlogManager.instance.start(); } } public synchronized Map<InetAddress, EndpointState> doShadowRound() { buildSeedsList(); // it may be that the local address is the only entry in the seed // list in which case, attempting a shadow round is pointless if (seeds.isEmpty()) return endpointShadowStateMap; seedsInShadowRound.clear(); endpointShadowStateMap.clear(); // 構造一個空的Syn消息,代表這是一次shadow round List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), gDigests); MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN, digestSynMessage, GossipDigestSyn.serializer); inShadowRound = true; int slept = 0; try { while (true) { /* * 第一次以及後面每五秒都會嘗試向全部的種子節點發送一次shdow round syn消息,嘗試 * 獲取全部的節點的信息。若是達到了最大的延遲(默認爲30S)或者已經達到了目的就會退出 */ if (slept % 5000 == 0) { logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds); for (InetAddress seed : seeds) MessagingService.instance().sendOneWay(message, seed); } Thread.sleep(1000); if (!inShadowRound) break; slept += 1000; if (slept > StorageService.RING_DELAY) { // if we don't consider ourself to be a seed, fail out if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())) throw new RuntimeException("Unable to gossip with any seeds"); logger.warn("Unable to gossip with any seeds but continuing since node is in its own seed list"); inShadowRound = false; break; } } } catch (InterruptedException wtf) { throw new RuntimeException(wtf); } return ImmutableMap.copyOf(endpointShadowStateMap); }
Gossiper#start()
中啓動一個定時任務GossipTask
,默認爲每秒一次,發送SYN消息:
/* * 線程池最好都指定名字,這樣方便查問題,另外最好指定好隊列大小,最好不要用Executors中 * 默認的無界隊列,關閉的時候注意處理好中斷,不少人都是catch Exception後打個異常就算了, * 這樣不是很好的處理方式,我我的一般是當catch到InterruptedException後,根據業務場景決定是否* * 須要經過interrupt方法重置中斷位,當處理完這輪任務以後,決定是否退出 */ private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks"); public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates) { buildSeedsList(); /* initialize the heartbeat state for this localEndpoint */ maybeInitializeLocalState(generationNbr); EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); localState.addApplicationStates(preloadLocalStates); //notify snitches that Gossiper is about to start DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); if (logger.isTraceEnabled()) logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration()); scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(), Gossiper.intervalInMillis, Gossiper.intervalInMillis, TimeUnit.MILLISECONDS); }
那麼GossipTask內部的實現是怎樣的呢?
private class GossipTask implements Runnable { public void run() { try { //等待MessagingService開始監聽 MessagingService.instance().waitUntilListening(); //加鎖 taskLock.lock(); //更新心跳計數器,這個是用來作失敗檢測的,這裏會有個定時任務輪詢這個Map,檢測最近一次的 //心跳時間,若是距離當前時間差距不合理,那麼咱們就能夠認爲這個節點掛掉了,能夠放到另外 //隊列中,隨後隔一段時間再去看看是否恢復。 endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat(); if (logger.isTraceEnabled()) logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion()); final List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); //隨機選擇一些節點,構造摘要列表 Gossiper.instance.makeRandomGossipDigest(gDigests); if (gDigests.size() > 0) { //構造消息,能夠看到這裏的類型是GOSSIP_DIGEST_SYN GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), gDigests); MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN, digestSynMessage, GossipDigestSyn.serializer); /*將消息發送給一個活着的節點,隨機選擇的,代碼以下 * int index = (size == 1) ? 0 : random.nextInt(size); * InetAddress to = liveEndpoints.get(index); * 若是選擇到的是種子節點,那麼就會返回true. */ boolean gossipedToSeed = doGossipToLiveMember(message); //隨機決定是否向掛掉的節點發送gossip消息 maybeGossipToUnreachableMember(message); /* * 可參見這個issue:https://issues.apache.org/jira/browse/CASSANDRA-150 */ if (!gossipedToSeed || liveEndpoints.size() < seeds.size()) maybeGossipToSeed(message); doStatusCheck(); } } catch (Exception e) { JVMStabilityInspector.inspectThrowable(e); logger.error("Gossip error", e); } finally { taskLock.unlock(); } } }
public void doVerb(MessageIn<GossipDigestSyn> message, int id) { InetAddress from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestSynMessage from {}", from); if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled"); return; } GossipDigestSyn gDigestMessage = message.payload; /* 不是同一個集羣的就不處理 */ if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName())) { logger.warn("ClusterName mismatch from {} {}!={}", from, gDigestMessage.clusterId, DatabaseDescriptor.getClusterName()); return; } if (gDigestMessage.partioner != null && !gDigestMessage.partioner.equals(DatabaseDescriptor.getPartitionerName())) { logger.warn("Partitioner mismatch from {} {}!={}", from, gDigestMessage.partioner, DatabaseDescriptor.getPartitionerName()); return; } List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests(); /*發送者和接受者都處於shadow round階段,那麼就發送一個空的ack回去*/ if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound()) { // a genuine syn (as opposed to one from a node currently // doing a shadow round) will always contain > 0 digests if (gDigestList.size() > 0) { logger.debug("Ignoring non-empty GossipDigestSynMessage because currently in gossip shadow round"); return; } logger.debug("Received a shadow round syn from {}. Gossip is disabled but " + "currently also in shadow round, responding with a minimal ack", from); // new ArrayList<>默認16的size,也會佔用額外的內存, // 能夠考慮改爲0或者使用Collections.EMPTY_LIST MessagingService.instance() .sendOneWay(new MessageOut<>(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(new ArrayList<>(), new HashMap<>()), GossipDigestAck.serializer), from); return; } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); for (GossipDigest gDigest : gDigestList) { sb.append(gDigest); sb.append(" "); } logger.trace("Gossip syn digests are : {}", sb); } /* * 下面的工做其實就相似於git中的merge,如上文所說,版本大的說明他所持有的節點信息較新 * 這裏就是作一個diff,若是你的version比我本地的大,那麼我就發一個請求,讓你把這個節點的 * 信息發給我,若是個人version比你的大,那麼說明個人信息更新一點,就會告訴你,你的該更新了 * 而後就會發一個GossipDigestAck消息回去。 */ doSort(gDigestList); List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>(); Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size()); MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap), GossipDigestAck.serializer); if (logger.isTraceEnabled()) logger.trace("Sending a GossipDigestAckMessage to {}", from); MessagingService.instance().sendOneWay(gDigestAckMessage, from); }
核心的實現:
void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap) { if (gDigestList.size() == 0) { /* * 若是是空的,代表這是一次shadow round,那麼咱們要把本身全部已知的節點信息發過去。 */ logger.debug("Shadow request received, adding all states"); for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet()) { gDigestList.add(new GossipDigest(entry.getKey(), 0, 0)); } } for ( GossipDigest gDigest : gDigestList ) { int remoteGeneration = gDigest.getGeneration(); int maxRemoteVersion = gDigest.getMaxVersion(); /* Get state associated with the end point in digest */ EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint()); /* Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to request all the data for this endpoint. */ if (epStatePtr != null) { int localGeneration = epStatePtr.getHeartBeatState().getGeneration(); /* get the max version of all keys in the state associated with this endpoint */ int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr); if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion) continue; if (remoteGeneration > localGeneration) { /* we request everything from the gossiper */ requestAll(gDigest, deltaGossipDigestList, remoteGeneration); } else if (remoteGeneration < localGeneration) { /* send all data with generation = localgeneration and version > 0 */ sendAll(gDigest, deltaEpStateMap, 0); } else if (remoteGeneration == localGeneration) { /* If the max remote version is greater then we request the remote endpoint send us all the data for this endpoint with version greater than the max version number we have locally for this endpoint. If the max remote version is lesser, then we send all the data we have locally for this endpoint with version greater than the max remote version. */ if (maxRemoteVersion > maxLocalVersion) { deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion)); } else if (maxRemoteVersion < maxLocalVersion) { /* send all data with generation = localgeneration and version > maxRemoteVersion */ sendAll(gDigest, deltaEpStateMap, maxRemoteVersion); } } } else { /* We are here since we have no data for this endpoint locally so request everything. */ requestAll(gDigest, deltaGossipDigestList, remoteGeneration); } } }
public void doVerb(MessageIn<GossipDigestAck> message, int id) { InetAddress from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestAckMessage from {}", from); if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled"); return; } GossipDigestAck gDigestAckMessage = message.payload; List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList(); Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size()); if (Gossiper.instance.isInShadowRound()) { if (logger.isDebugEnabled()) logger.debug("Received an ack from {}, which may trigger exit from shadow round", from); // 若是是空的,說明他也在shdow round中,木有事,反正還會重試的 Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap); return; } if (epStateMap.size() > 0) { /* * 第一次發送SYN消息的時候會更新firstSynSendAt,若是ACK消息 * 是在咱們第一次SYN以前的,那麼說明這個ACK已通過期了,直接忽略。 */ if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0) { if (logger.isTraceEnabled()) logger.trace("Ignoring unrequested GossipDigestAck from {}", from); return; } /* 失敗檢測相關的,先無論 */ Gossiper.instance.notifyFailureDetector(epStateMap); /*將遠程收到的信息跟本地的merge,相似上面的操做*/ Gossiper.instance.applyStateLocally(epStateMap); } /* * 構造一個GossipDigestAck2Message消息,將對方須要的節點信息發給他 */ Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); for (GossipDigest gDigest : gDigestList) { InetAddress addr = gDigest.getEndpoint(); EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); if (localEpStatePtr != null) deltaEpStateMap.put(addr, localEpStatePtr); } MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2(deltaEpStateMap), GossipDigestAck2.serializer); if (logger.isTraceEnabled()) logger.trace("Sending a GossipDigestAck2Message to {}", from); MessagingService.instance().sendOneWay(gDigestAck2Message, from); }
public void doVerb(MessageIn<GossipDigestAck2> message, int id) { if (logger.isTraceEnabled()) { InetAddress from = message.from; logger.trace("Received a GossipDigestAck2Message from {}", from); } if (!Gossiper.instance.isEnabled()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestAck2Message because gossip is disabled"); return; } Map<InetAddress, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap(); Gossiper.instance.notifyFailureDetector(remoteEpStateMap); /*將收到的節點信息與本地的merge*/ Gossiper.instance.applyStateLocally(remoteEpStateMap); }
源碼上看結構是很是清晰的,每一步的邏輯相對來說仍是比較容易理解的,其實也就相似tcp三次握手:
①、A隨機找我的B,隨機告訴他一些我知道的信息(這裏能夠根據時間排序、根據版本打分等等,具體能夠參照論文)
②、B收到之後,和本身本地對比下,比A新的發回給A,比A舊的讓通知A在下一步告訴我
③、A本地合併下,而後將B須要的信息告訴他
④、B本地合併下
⑤、完成了