衆所周知周知,疫情仍然在全球各地肆虐。據最新數據統計,截至北京時間 2020-05-28,全球累計確診 5698703 例,累計死亡 352282 例,累計治癒 2415237 例。html
從上面的統計數據,咱們能夠看出,新冠病毒在人與人之間的傳播是極其高效的,且影響範圍廣。若是咱們把「新冠病毒」想象成一小段數據,將「人與人之間傳播」想象成數據交換,那麼,咱們能夠得出結論,在不考慮免疫系統和人爲干預等一些因素,通過反覆迭代,數據(新冠病毒)能夠被髮送(感染)到每一個節點(人)上。node
這個就是今天要介紹的 Gossip 協議,該協議早在 1987 年就被髮表在 ACM 上的論文《Epidemic Algorithms for Replicated Database Maintenance》中。當時主要用在分佈式數據庫系統中各個副本節點間同步數據。git
Gossip 協議分爲 Push-based 和 Pull-based 兩種模式,具體工做流程以下:github
Push-based 的 Gossip 協議:數據庫
Pull-based 的 Gossip 協議,正好相反:微信
這邊簡單分析下 HashiCorp 公司的 Serf 的核心庫 Memberlist。這家公司研發了 Consul(基於 raft 實現的分佈式存儲)、Vagrant(聲明式虛擬機編排)等優秀的產品。最近因爲中美矛盾升級,也陷入到了輿論的漩渦中,爆出禁止在中國使用他們的產品的傳聞。不過,這是題外話。網絡
Memberlist 這個 Golang 的代碼庫,基於 Gossip 協議,實現了集羣內節點發現、 節點失效探測、節點故障轉移、節點狀態同步等。app
其核心實現的大體以下:dom
發送端處理流程:分佈式
// Create a gossip ticker if needed
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval) go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip) m.tickers = append(m.tickers, t)
}
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now()) // Get some random live, suspect, or recently dead nodes m.nodeLock.RLock() kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool { if n.Name == m.config.Name { return true } switch n.State { case StateAlive, StateSuspect: return false case StateDead: return time.Since(n.StateChange) > m.config.GossipToTheDeadTime default: return true } }) m.nodeLock.RUnlock() // ... for _, node := range kNodes { // Get any pending broadcasts msgs := m.getBroadcasts(compoundOverhead, bytesAvail) if len(msgs) == 0 { return } addr := node.Address() if len(msgs) == 1 { // Send single message as is if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, msgs[0]); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } else { // Otherwise create and send a compound message compound := makeCompoundMessage(msgs) if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil { m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) } } }
}
接收端:
// packetListen is a long running goroutine that pulls packets out of the
// transport and hands them off for processing.
func (m *Memberlist) packetListen() {
for { select { case packet := <-m.transport.PacketCh(): m.ingestPacket(packet.Buf, packet.From, packet.Timestamp) case <-m.shutdownCh: return } }
}
func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) {
// ... // See if there's a checksum included to verify the contents of the message if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg { crc := crc32.ChecksumIEEE(buf[5:]) expected := binary.BigEndian.Uint32(buf[1:5]) if crc != expected { m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected) return } m.handleCommand(buf[5:], from, timestamp) } else { m.handleCommand(buf, from, timestamp) }
}
看了 Memberlist 的實現,不免會有這樣的疑問,爲何要使用 Gossip 協議,直接在集羣內廣播不香麼?接下來,咱們能夠經過 Gossip 協議的優缺點來分析,使用 Gossip 協議的意義。
優勢:
缺點:
今天對 Gossip 的協議就簡單介紹到這裏,若是有同窗對內容感興趣,能夠回覆評論,咱們私下多多探討和交流。
https://en.wikipedia.org/wiki...
https://github.com/hashicorp/...
https://github.com/hashicorp/...
https://zhuanlan.zhihu.com/p/...
https://www.jianshu.com/p/de7...