Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,使用Scala語言編寫,以後成爲Apache項目的一部分,目前是Apache的一個頂級項目。java
Kafka是一個分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。bash
它提供了相似於JMS(Java消息隊列規範)的特性,可是在設計實現上徹底不一樣,此外它並非JMS規範的實現。服務器
kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣由多個kafka實例組成,每一個實例(server)稱爲broker。網絡
不管是kafka集羣,仍是producer和consumer都依賴於zookeeper來保證系統可用性集羣保存一些meta信息。session
總結:併發
Kafka是分佈式消息隊列,按topic分類存放數據,擁有Producer、Consumer、Broker三個角色,使用zookeeper作爲集羣的協調工具。負載均衡
理論上Kafka每秒能夠生產約25萬消息(50MB),每秒處理55萬消息(110MB),生產環境中在這個速度上下浮動,這個速度就至關於硬盤IO的速度。異步
可進行持久化操做。將消息持久化到磁盤,所以可用於批量消費,例如ETL,以及實時應用程序。經過將數據持久化到硬盤以及replication防止數據丟失。分佈式
全部的producer、broker和consumer均可以有多個,均爲分佈式的。無需停機便可擴展機器。函數
消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
Kafka將消息以topic爲單位進行概括。topic之間的數據是相互隔離的。
將向Kafka topic發佈消息的程序稱爲producers。
將預訂topics並消費消息的程序稱爲consumer。
Kafka以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker。
producers經過網絡將消息發送到Kafka集羣,集羣向消費者提供消息。
客戶端和服務端經過TCP協議通訊。Kafka提供了Java客戶端,而且對多種語言都提供了支持。
一個topic是對一組消息的概括。
Kafka針對對每一個topic的日誌進行了分區。
kafka中的分區是負載均衡和失敗恢復的基本單位。
每一個分區都由一系列有序的、不可變的消息組成,這些消息被連續的追加到分區中。分區中的每一個消息都有一個連續的序列號叫作offset,用來在分區中惟一標識一個消息。在一個可配置的時間段內,Kafka集羣保留全部發布的消息,無論這些消息有沒有被消費。
好比,若是消息的保存策略被設置爲2天,那麼在一個消息被髮布的兩天時間內,它都是能夠被消費的。過時以後它將被丟棄以釋放空間。Kafka的性能是和數據量無關的常量級的,因此保留太多的數據並非問題。
每一個分區在Kafka集羣的若干服務中都有副本,這樣這些持有副本的服務能夠共同處理數據和請求,副本數量是能夠配置的。副本使Kafka具有了容錯能力。
每一個分區都由一個服務器做爲「leader」,零或若干服務器做爲「followers」,leader負責處理消息的讀和寫,followers則去同步leader的數據,並對外提供讀操做。若是leader down了,followers中的一臺則會自動成爲leader。
集羣中的每一個服務器都會同時扮演兩個角色:做爲它所持有的一部分分區的leader,同時做爲其餘分區的followers,這樣集羣就據有較好的負載均衡。
將日誌分區能夠達到如下目的:
首先這使得每一個日誌的數量不會太大,能夠在單個服務上保存。另外每一個分區能夠單獨發佈和消費,爲併發操做topic提供了一種可能。
分區是負載均衡失敗恢復分佈式數據存儲的基本單元。
Producer將消息發佈到它指定的topic中,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。使用的更多的是第二種。
實際上每一個consumer惟一須要維護的數據是消息在日誌中的位置,也就是offset。這個offset由consumer來維護,通常狀況下隨着consumer不斷的讀取消息,這offset的值會不斷增長,但其實consumer能夠以任意的順序讀取消息,好比它能夠將offset設置成爲一箇舊的值來重讀以前的消息。
以上特色的結合,使Kafka consumers很是的輕量級,它們能夠在不對集羣和其餘consumer形成影響的狀況下讀取消息。你可使用命令行來"tail"消息而不會對其餘正在消費消息的consumer形成影響。
消費消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。
隊列模式中,多個consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到。
通俗的來說,consumers之間是競爭關係,都在從borker中搶數據,而這個數據只有一份,誰搶到就是誰的。
發佈-訂閱模式中消息被廣播到全部的consumer中。這種模式之下,每一個consumers都能獲得相同的消息數據。
Consumers能夠加入一個consumer group,此組就是用來實現以上兩種模式的。
若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。
組內的Consumer是處在隊列模式下,共同競爭一個topic內的消息,topic中的消息將被分發到組中的一個成員中,同一條消息只發往其中的一個消費者。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。
若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。
而若是有多個Consumer group來消費相同的Topic中的消息,則組和組之間使用的就是發佈訂閱模式,是一個共享數據的狀態。每個組均可以獲取到這個主題中的全部消息。
常見的應用方式是,每一個topic都有若干數量的consumer組來消費,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組都由若干consumer組成,在組內競爭實現負載均衡。實現了組內競爭負載均衡,組間共享互不影響,這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。
相比傳統的消息系統,Kafka能夠很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。
在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。
Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。
爲何大數據環境下的消息隊列常選擇kafka?
分佈式存儲數據,提供了更好的性能、可靠性、可擴展能力。
利用磁盤存儲數據,且按照主題、分區來分佈式存放數據,持久化存儲,提供海量數據存儲能力。
採用磁盤存儲數據,連續進行讀寫保證性能,性能和磁盤的性能相關和數據量的大小無關。
下載Kafka安裝包,上傳到Linux服務器。
解壓:
tar -zxvf kafka_2.9.2-0.8.1.1.tgz
解壓完成就至關於安裝完畢,不過還要進行響應的配置。
修改server.properties文件。
log.dirs=/tmp/kafka-logs
此選項配置的是Kafka的數據存儲位置,須要更改。
修改zookeeper.properties配置文件。此文件是Kafka內置的Zookeeper的配置文件。Kafka爲了保證軟件的獨立性,本身內置了一個Zookeeper,因此使用爲分佈式的狀況下,不用專門安裝Zookeeper。
以下項配置的是zookeeper的數據存儲位置,默認在/tmp中,須要修改。
dataDir=/tmp/zookeeper
啓動zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties &
啓動kafka:
bin/kafka-server-start.sh config/server.properties
在config目錄下,修改server.properties,在文件中修改以下參數:
broker.id=0 #當前server編號 port=9092 #使用的端口 log.dirs=/tmp/kafka-logs-1 #日誌存儲目錄 zookeeper.connect=yun01:2181
集羣中broker.id要具備惟一性。
日誌存放目錄須要更改成規劃目錄,不能使用默認的/tmp目錄。
Zookeeper須要配置Zookeeper集羣中全部服務器的ip或主機名:端口。
①啓動zookeeper
在各個機器上執行以下啓動命令:
zkServer.sh start
②啓動kafka
bin/kafka-server-start.sh ../config/server.properties &
Kafka服務默認啓動會佔用控制檯,因此能夠後臺運行。
建立一個擁有3個副本的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topicname
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看每一個節點的信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topicname
向topic發送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname
消費消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topicname
製造宕機,查看Kafka的容錯性。
kill -9 7564 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topicname bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topicname
啓動kafka:
啓動zookeeper:
zkServer.sh start
啓動kafka:
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
使用命令行producer從文件中或者從標準輸入中讀取消息併發送到服務端:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
啓動命令行consumer讀取消息並輸出到標準輸出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
建立java工程,導入kafka相關包,jar包存在於Kafka安裝包的libs目錄中,拷貝時注意,裏面不僅有jar包,還含有其餘類型的文件,只拷貝jar包便可。
/** 接收數據 */ @Test public void ConsumerReceive() throws Exception{ Properties properties = new Properties(); properties.put("zookeeper.connect", "yun01:2181,yun02:2181,yun03:2181");//聲明zk // 必需要使用別的組名稱, 若是生產者和消費者都在同一組,則不能訪問同一組內的topic數據 properties.put("group.id", "group2xx"); properties.put("auto.offset.reset", "smallest"); // properties.put("zookeeper.session.timeout.ms", "400"); // properties.put("zookeeper.sync.time.ms", "200"); // properties.put("auto.commit.interval.ms", "1000"); // properties.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("my-replicated-topic", 1); // 一次從主題中獲取一個數據 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); // 獲取每次接收到的這個數據 KafkaStream<byte[], byte[]> stream = messageStreams.get("my-replicated-topic").get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while(iterator.hasNext()){ System.out.println("receive:" + new String(iterator.next().message())); } }
/** 發送數據 */ @Test public void ProducerSend(){ Properties props = new Properties(); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "192.168.242.101:9092"); Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props )); producer.send(new KeyedMessage<Integer, String>("my-replicated-topic","message~xxx123asdf")); producer.close(); }
下一篇:Kafka的存儲機制以及可靠性