Message Queue 之 Kafkahtml
Author: Lijbjava
Email: lijb1121@163.com算法
Message:通常用於系統間通訊,一般系統間通訊之間經過兩種方式發送消息.即時消息:消息發送方和接收方必須同時在線,例如WebService、Dubbo等這些常見的RPC框架通常發送都是即時消息(相似打電話)。離線消息:消息發送方和消息接受方不須要同時在線,實現消息異步接收。例如:Message Queue就是專門用於發送異步消息(相似發短信)。apache
消息隊列:消息遵循先進先出的原則FIFO ,kafka 屬於發佈與訂閱的bootstrap
kafka 特色:架構
1.高吞吐量、低延遲:(把消息書寫到本地磁盤上,持久化消息,利用磁盤的順序讀寫,追加文件)
Message Queue使用場景併發
參考:http://www.cnblogs.com/linjiqin/p/5720865.htmlapp
消息隊列中間件是分佈式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ負載均衡
Kafka 系統架構框架
生產者:發送消息的進程,將消息發送到topic中 消費者:消費消息的進程,從topic中讀取消息。 ConsumerGroup:由多個消費者組成的組,每個組中的全部消費者共同消費一個完整的topic,彼此消費的消息不重複。 broker:對應一個kafka實例。kafka集羣負責分區leader的選舉和遷移。上右圖中紅線對應的是leader。 record:對應一個消息,key,value,ts(timestamp). topic:kafka對消息進行分類,每一類對應一個topic,一個topic至少被分紅一個分區partition,全部的分區的消息加起來組成一個topic。 replication:副本,每一個分區都有多個副本。 leader、follower:leader負責消息的讀寫,follower負責從leader複製消息。 offset: 偏移量,每一個分區中的offset是有序的,局部性的,對應每一個record的惟一標識。 topic默認的分區策略:根據輸入的key的hash值%分區個數。 zookeeper:負責kafka元數據的管理及Consumer相關數據的管理。 消費者的負載均衡:rang 均分| round-robin 輪詢 總結:當一個ConsumerGroup中,有消費者成員Consumer加入或者離開時,就會觸發kafka分區(partition)的從新分配,也就是partition的均衡,均衡的目的是爲了提高topic的併發(多個線程併發消費分區)消費能力。至於哪一個消費者消費哪個分區,這是有一個算法的,這個算法是可以保證一個分區必定只能被一個消費者消費,而不能被多個消費者消費,還可以保證一個消費者能夠消費多個分區。也就是分區和消費者之間是多對一(包含一對一)的關係。這個算法的最終結果是,當一個消費者組(ConsumerGroup)中消費者成員(Consumer)的數量大於一個topic分區的數量時,多餘的消費者就沒有辦法消費到數據了。
搭建Kaka集羣
搭建zookeeper集羣而且啓動(略)
解壓並安裝kafka集羣(務必配置主機名和IP映射關係)
[root@CentOSX ~]# tar -zxf kafka_2.11-0.11.0.0.tgz -C /usr/ [root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/config/server.properties
#####Server Basics ######## broker.id=0
# 每一個節點都要修改,且不一樣 delete.topic.enable=true
listeners=PLAINTEXT://CentOSC
:9092 # 每一個節點都要修改,且不一樣 ######### Log Basics ########## log.dirs=/usr/kafka-logs
log.retention.hours=168 ########## Zookeeper ########## zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
啓動Kafka集羣
[root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
關閉kafka
[root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/bin/kafka-server-stop.sh PIDS=$(jps| grep Kafka | awk '{print $1}') # 修改 獲取進程id 號
if [ -z "$PIDS" ]; then echo "No kafka server to stop" exit 1 else kill -s TERM $PIDS fi [root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-stop.sh
Kafka測試
//建立分區 [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3 //啓動消費者 [root@CentOSB kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning //啓動生產者 [root@CentOSC kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 > hello kafka
Topic基本操做
建立topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3
partitions:分區的個數,replication-factor副本因子
查看topic詳情
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --describe --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 Topic:topic01 PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic01 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: topic01 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic01 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
查看全部Topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --list --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 topic01 topic02 topic03
刪除topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --delete --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic03 Topic topic03 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
修改Topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --alter --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic02 --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
分區數目只容許增長,不容許減小。
Java 鏈接Kafka集羣
<!--kafka依賴--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency>
生產者
//1.建立Properties對象 Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); //2.序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(props); //3.封裝Record ProducerRecord<String,String> record= new ProducerRecord<String, String>("topic01","0100","zhangsan 男 18"); //4.發送消息 kafkaProducer.send(record); kafkaProducer.flush(); kafkaProducer.close();
消費者
//1.建立Properties對象 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092"); //2.反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(props); //3.訂閱相應的topic kafkaConsumer.subscribe(Arrays.asList("topic01")); //4.開始獲取消息 while(true){ ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { String key=record.key(); String value = record.value(); long ts = record.timestamp(); int partition = record.partition(); long offset = record.offset(); System.out.println(key+"=>"+value+"\t offset "+offset+" ,partition:"+partition+"\t"+ts); } }
自定義序列化發送對象
public class ObjectSerializer implements Serializer<Object> { // Serializer該接口是kafka的類 public void configure(Map<String, ?> map, boolean isKey) {} public byte[] serialize(String topic, Object o) { return SerializationUtils.serialize((Serializable) o); } public void close() {} } --- public class ObjectDeserializer implements Deserializer<Object> { public void configure(Map<String, ?> configs, boolean isKey) { } public Object deserialize(String topic, byte[] data) { return SerializationUtils.deserialize(data); } public void close() { } }
如何幹預Kafka分區策略 提升 record 的並行度
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,XxxPartitioner.class) //自定義分區類實現Partitioner接口,而後指定分區策略 public class XxxPartitioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); return (key.hashCode()&Integer.MAX_VALUE)%numPartitions; } public void close(){ } }
訂閱形式
subscribe(訂閱)方式
props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); kafkaConsumer.subscribe(Arrays.asList("topic02"));
優勢:但是自動實現 組內負載均衡和故障轉移。
assign (分配)方式
TopicPartition part02 = new TopicPartition("topic02", 2); // 分區個數 kafkaConsumer.assign(Arrays.asList(part02));
優勢:手動指定分區信息,缺點:沒法實現負載均衡和故障轉移
Offset自動提交
默認客戶端自動開啓了自動提交功能,默認提交時間間隔是5秒鐘,用戶能夠採起手動提交的方式實現。開啓手動提交以下:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//關閉自動提交 //消費代碼後追加 kafkaConsumer.commitAsync();
或者適當調小自動提交時間間隔:
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//關閉自動提交
Kafka Stream-High Level
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Properties; public class KafkaStreamDemo { public static void main(String[] args) { //1.建立Properties對象 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); //2.添加kafka集羣信息 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092"); //3.添加數據類型 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //4.建立KStreamBuilder並讀取數據文件 KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); //5.對數據文件進行切分計算 KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word) .count("counts"); wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); //6.把KStreaamBuilder和Properties對象放入stream流 KafkaStreams streams = new KafkaStreams(builder, props); //7.啓動stream streams.start(); } }