消息隊列與kafka

14.6 kafka

爲何用消息隊列html

舉例java

好比在一個企業裏,技術老大接到boss的任務,技術老大把這個任務拆分紅多個小任務,完成全部的小任務就算搞定整個任務了。
那麼在執行這些小任務的時候,可能有一個環節很費時間,而且優先級很低,推遲完成也不影響整個任務運轉,那麼技術老大就會將這個很費時間,且不重要的任務,丟給他的小弟去解決,本身繼續完成其餘任務。

轉化爲計算機思想python

那個技術老大就是一個 程序系統,那個小弟就是消息隊列。
當程序系統發現某些任務耗費時間且優先級較低,遲點完成也不影響整個任務,就把這個任務丟給消息隊列。

場景linux

在程序系統中,例如外賣系統,訂單系統,庫存系統,優先級較高
發紅包,發郵件,發短信,app消息推送等任務優先級很低,很適合交給消息隊列去處理,以便於程序系統更快的處理其餘請求。

消息隊列工做流程apache

消息隊列通常有三個角色:
隊列服務端
隊列生產者
隊列消費者
消息隊列工做流程就如同一個流水線,有產品加工,一個輸送帶,一個打包產品
輸送帶就是 不停運轉的消息隊列服務端
加工產品的就是 隊列生產者
在傳輸帶結尾打包產品的 就是隊列消費者

隊列產品bootstrap

RabbitMQ
Erlang編寫的消息隊列產品,企業級消息隊列軟件,支持消息負載均衡,數據持久化等。

ZeroMQ 
saltstack軟件使用此消息,速度最快。

Redis
key-value的系統,也支持隊列數據結構,輕量級消息隊列

Kafka
由Scala編寫,目標是爲處理實時數據提供一個統1、高通量、低等待的平臺

一個app系統消息隊列工做流程緩存

消費者,一個後臺進程,不斷的去檢測消息隊列中是否有消息,有消息就取走,開啓新線程去處理業務,若是沒有一會再來

kafka是什麼

在流式計算中,Kafka通常用來緩存數據,Storm經過消費Kafka的數據進行計算。安全

1)Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。服務器

2)Kafka最初是由LinkedIn公司開發,並於 2011年初開源。2012年10月從Apache Incubator畢業。該項目的目標是爲處理實時數據提供一個統1、高通量、低等待的平臺。網絡

3)Kafka是一個分佈式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)成爲broker。

4)不管是kafka集羣,仍是producer和consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。

消息通訊圖


點對點模式(一對一,消費者主動拉取數據,輪詢機制,消息收到後消息清除,ack確認機制)

點對點模型一般是一個基於拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。

這個模型的特色是發送到隊列的消息被一個且只有一個接收者接收處理,即便有多個消息監聽者也是如此。


發佈/訂閱模式(一對多,數據生產後,推送給全部訂閱者)

發佈訂閱模型則是一個基於推送的消息傳送模型。

發佈訂閱模型能夠有多種不一樣的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的全部消息,即便當前訂閱者不可用,處於離線狀態。

消息隊列做用

1)程序解耦

容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。

2)冗餘:

消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。

許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

3)峯值處理能力:

(大白話,就是原本公司業務只須要5臺機器,可是臨時的秒殺活動,5臺機器確定受不了這個壓力,咱們又不可能將總體服務器架構提高到10臺,那在秒殺活動後,機器不就浪費了嗎?所以引入消息隊列)

在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。

若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。

使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。

4)可恢復性:

系統的一部分組件失效時,不會影響到整個系統。

消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。

5)順序保證:

在大多使用場景下,數據處理的順序都很重要。

大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)

6)緩衝:

有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。

7)異步通訊:

不少時候,用戶不想也不須要當即處理消息。好比發紅包,發短信等流程。

消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。

kafka架構

1)Producer :消息生產者,就是向kafka broker發消息的客戶端。

2)Consumer :消息消費者,向kafka broker取消息的客戶端

3)Topic :主題,能夠理解爲一個隊列。

4) Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給全部的consumer)和單播(發給任意一個consumer)的手段。一個topic能夠有多個CG。topic的消息會複製-給consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。要實現單播只要全部的consumer在同一個CG。用CG還能夠將consumer進行自由的分組而不須要屢次發送消息到不一樣的topic。

5)Broker :一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。

6)Partition:爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的總體(多個partition間)的順序。

7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset作名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件便可。固然the first offset就是00000000000.kafka

分佈式模型

​ Kafka每一個主題的多個分區日誌分佈式地存儲在Kafka集羣上,同時爲了故障容錯,每一個(partition)分區都會以副本的方式複製到多個消息代理節點上。

其中一個節點會做爲主副本(Leader),其餘節點做爲備份副本(Follower,也叫做從副本)。主副本會負責全部的客戶端讀寫操做,備份副本僅僅從主副本同步數據。當主副本出現故障時,備份副本中的一個副本會被選擇爲新的主副本。由於每一個分區的副本中只有主副本接受讀寫,因此每一個服務器端都會做爲某些分區的主副本,以及另一些分區的備份副本,這樣Kafka集羣的全部服務端總體上對客戶端是負載均衡的。

Kafka的生產者和消費者相對於服務器端而言都是客戶端。

Kafka生產者客戶端發佈消息到服務端的指定主題,會指定消息所屬的分區。

生產者發佈消息時根據消息是否有鍵,採用不一樣的分區策略。消息沒有鍵時,經過輪詢方式進行客戶端負載均衡;消息有鍵時,根據分區語義(例如hash)確保相同鍵的消息老是發送到同一分區。

Kafka的消費者經過訂閱主題來消費消息,而且每一個消費者都會設置一個消費組名稱。由於生產者發佈到主題的每一條消息都只會發送給消費者組的一個消費者。

因此,若是要實現傳統消息系統的「隊列」模型,可讓每一個消費者都擁有相同的消費組名稱,這樣消息就會負責均衡到全部的消費者;若是要實現「發佈-訂閱」模型,則每一個消費者的消費者組名稱都不相同,這樣每條消息就會廣播給全部的消費者。

分區是消費者現場模型的最小並行單位。

以下圖(圖1)所示,生產者發佈消息到一臺服務器的3個分區時,只有一個消費者消費全部的3個分區。在下圖(圖2)中,3個分區分佈在3臺服務器上,同時有3個消費者分別消費不一樣的分區。假設每一個服務器的吞吐量時300MB,在下圖(圖1)中分攤到每一個分區只有100MB,而在下圖(圖2)中,集羣總體的吞吐量有900MB。能夠看到,增長服務器節點會提高集羣的性能,增長消費者數量會提高處理性能。

同一個消費組下多個消費者互相協調消費工做,Kafka會將全部的分區平均地分配給全部的消費者實例,這樣每一個消費者均可以分配到數量均等的分區。Kafka的消費組管理協議會動態地維護消費組的成員列表,當一個新消費者加入消費者組,或者有消費者離開消費組,都會觸發再平衡操做。

Kafka的消費者消費消息時,只保證在一個分區內的消息的徹底有序性,並不保證同一個主題匯中多個分區的消息順序。並且,消費者讀取一個分區消息的順序和生產者寫入到這個分區的順序是一致的。好比,生產者寫入「hello」和「Kafka」兩條消息到分區P1,則消費者讀取到的順序也必定是「hello」和「Kafka」。若是業務上須要保證全部消息徹底一致,只能經過設置一個分區完成,但這種作法的缺點是最多隻能有一個消費者進行消費。通常來講,只須要保證每一個分區的有序性,再對消息假設鍵來保證相同鍵的全部消息落入同一分區,就能夠知足絕大多數的應用。

kafka部署啓動

配置jdk環境

下載網址
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
找到
jdk-8u201-linux-x64.tar.gz

解壓縮,配置java環境變量

tar -zxvf jdk-8u201-linux-x64.tar.gz

PATH="$PATH:/opt/jdk1.8.0_201/bin"

配置zookeeper環境,配置環境變量

tar -zxvf zookeeper-3.4.14.tar.gz

PATH="$PATH:/opt/jdk1.8.0_201/bin:/opt/zookeeper-3.4.14/bin"

zookeeper端口解釋

一、2181

二、3888

三、2888

2、3個端口的做用

一、2181:對cline端提供服務

二、3888:選舉leader使用

三、2888:集羣內機器通信使用(Leader監聽此端口)
部署時注意

一、單機單實例,只要端口不被佔用便可

二、單機僞集羣(單機,部署多個實例),三個端口必須修改成組組不同

如:myid1 : 2181,3888,2888

myid2 : 2182,3788,2788

myid3 : 2183,3688,2688

三、集羣(一臺機器部署一個實例)


4、集羣爲大於等於3個基數,如 三、五、7....,不宜太多,集羣機器多了選舉和數據同步耗時時長長,不穩定。目前以爲,三臺選舉+N臺observe很不錯。

啓動安裝zookeeper

本文以standalone模式運行,並不是集羣模式

1.解壓縮zk壓縮包,配置好環境變量
2.在zk解壓縮包目錄下建立 zkData目錄
3.修改zk解壓縮包目錄下conf/zoo_sample.cfg爲zoo.cfg
4.編輯zoo.cfg配置文件,修改代碼

zookeeper-3.4.14/conf/zoo.cfg修改以下參數

dataDir=/opt/zookeeper-3.4.14/zkData
server.2=192.168.119.10:2888:3888 #修改成你本身服務器的ip

參數解釋

Server.A=B:C:D。

A是一個數字,表示這個是第幾號服務器;

B是這個服務器的ip地址;

C是這個服務器與集羣中的Leader服務器交換信息的端口;

D是萬一集羣中的Leader服務器掛了,須要一個端口來從新進行選舉,選出一個新的Leader,而這個端口就是用來執行選舉時服務器相互通訊的端口。

集羣模式下配置一個文件myid,這個文件在dataDir目錄下,這個文件裏面有一個數據就是A的值,Zookeeper啓動時讀取此文件,拿到裏面的數據與zoo.cfg裏面的配置信息比較從而判斷究竟是哪一個server。

啓動zk服務端

zkServer.sh start   #啓動
zkServer.sh status  #檢查狀態

kafka部署

下載二進制kafka代碼包
wget http://apache.claz.org/kafka/2.2.0/kafka_2.11-2.2.0.tgz
解壓縮
tar -xf kafka_2.11-2.2.0.tgz
修改kafka服務端配置文件
/opt/kafka_2.11-2.2.0/config/server.properties
#建立kafka日誌文件夾
mkdir -p /opt/kafka_2.11-2.2.0/logs

/opt/kafka_2.11-2.2.0/config/server.properties修改以下參數

若是修改了kafka的啓動地址參數,注意可能出現的權限問題,或者刪除logs目錄下的數據文件

9092是kafka服務端

#broker的全局惟一編號,不能重複
broker.id=0
#是否容許刪除topic
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的最大緩衝區大小
socket.request.max.bytes=104857600
#kafka運行日誌存放的路徑
log.dirs=/opt/kafka_2.11-2.2.0/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168

#配置鏈接Zookeeper集羣地址,確保zk正確啓動2181已經打開
zookeeper.connect=192.168.119.10:2181

修改linux的PATH環境變量,支持kafka命令

[root@localhost bin]# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/jdk1.8.0_201/bin:/opt/zookeeper-3.4.14/bin:/opt/kafka_2.11-2.2.0/bin

啓動kafka服務端,指定配置文件,後臺啓動

[root@localhost kafka_2.11-2.2.0]# kafka-server-start.sh config/server.properties &

看到以下提示,表明kafka啓動成功

[2019-04-12 23:53:33,229] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 23:53:33,229] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 23:53:33,231] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

kafka命令行操做

查看當前服務器中的全部topic
[root@localhost kafka_2.11-2.2.0]# kafka-topics.sh --zookeeper 192.168.119.10:2181 --list

建立topic
[root@localhost kafka_2.11-2.2.0]# kafka-topics.sh --zookeeper 192.168.119.10:2181 --create --replication-factor 1 --partitions 1 --topic first

選項說明:
--topic 定義topic名
--replication-factor  定義副本數
--partitions  定義分區數

刪除topic
kafka-topics.sh --zookeeper 192.168.119.10:2181 --delete --topic first
須要server.properties中設置delete.topic.enable=true不然只是標記刪除或者直接重啓。

發送消息,9092是kafka的服務端口
[root@localhost kafka_2.11-2.2.0]# kafka-console-producer.sh --broker-list 192.168.119.10:9092 --topic first
>hello kafka
>chaoge niubi

消費消息,注意kafka的版本,以及新參數特性
[root@localhost kafka_2.11-2.2.0]# kafka-console-consumer.sh --bootstrap-server  192.168.119.10:9092 --from-beginning --topic first
--from-beginning:會把first主題中以往全部的數據都讀取出來。根據業務場景選擇是否增長該配置。
broker
    topic
        partition
        
三者包含關係

python操做kafka

環境準備

[root@localhost pykafka]# python3 -V
Python 3.6.7

啓動好zk,kafka,確保2181端口,9092端口啓動

Python模塊安裝

pip3 install kafka-python

生產者

[root@localhost pykafka]# cat pro.py
import time
from kafka import KafkaProducer
#鏈接上kafka服務端9092端口
producer = KafkaProducer(bootstrap_servers = ['192.168.119.10:9092'])
# 註冊一個主題,名字topic
topic = 'oldboy'

#每秒鐘,寫入一個消息數據
def test():
    print ('begin produce..')
    n = 1
    try:
        while (n<=100):
              #向主題oldboy中發送byte數據
            producer.send(topic, str(n).encode())
            print("send" + str(n))
            n += 1
            time.sleep(0.5)
    except KafkaError as e:
        print(e)
    finally:
          #關閉鏈接
        producer.close()
        print('done')

if __name__ == '__main__':
    test()

消費者

[root@localhost pykafka]# cat consumer.py

from kafka import KafkaConsumer

#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('oldboy', group_id = 'oldboy_group', bootstrap_servers = ['192.168.119.10:9092'])
try:
    for msg in consumer:
        print(msg)
        print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition,msg.offset, msg.key, msg.value))
except KeyboardInterrupt as  e:
    print(e)
相關文章
相關標籤/搜索