Message Queue 之 Kafka

Message Queue 之 Kafkahtml

Author: Lijbjava

Email: lijb1121@163.com算法

Message:通常用於系統間通訊,一般系統間通訊之間經過兩種方式發送消息.即時消息:消息發送方和接收方必須同時在線,例如WebService、Dubbo等這些常見的RPC框架通常發送都是即時消息(相似打電話)。離線消息:消息發送方和消息接受方不須要同時在線,實現消息異步接收。例如:Message Queue就是專門用於發送異步消息(相似發短信)。apache

消息隊列:消息遵循先進先出的原則FIFO ,kafka 屬於發佈與訂閱的bootstrap

kafka 特色:架構

1.高吞吐量、低延遲:(把消息書寫到本地磁盤上,持久化消息,利用磁盤的順序讀寫,追加文件)

Message Queue使用場景併發

參考:http://www.cnblogs.com/linjiqin/p/5720865.htmlapp

消息隊列中間件是分佈式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ負載均衡

  • 異步通訊

異步通訊

  • 系統間解耦

解耦和

  • 削峯填谷

Kafka 系統架構框架

生產者:發送消息的進程,將消息發送到topic中
消費者:消費消息的進程,從topic中讀取消息。
ConsumerGroup:由多個消費者組成的組,每個組中的全部消費者共同消費一個完整的topic,彼此消費的消息不重複。
broker:對應一個kafka實例。kafka集羣負責分區leader的選舉和遷移。上右圖中紅線對應的是leader。
record:對應一個消息,key,value,ts(timestamp).
topic:kafka對消息進行分類,每一類對應一個topic,一個topic至少被分紅一個分區partition,全部的分區的消息加起來組成一個topic。
replication:副本,每一個分區都有多個副本。
leader、follower:leader負責消息的讀寫,follower負責從leader複製消息。

offset: 偏移量,每一個分區中的offset是有序的,局部性的,對應每一個record的惟一標識。

topic默認的分區策略:根據輸入的key的hash值%分區個數。
zookeeper:負責kafka元數據的管理及Consumer相關數據的管理。

消費者的負載均衡:rang 均分| round-robin 輪詢
總結:當一個ConsumerGroup中,有消費者成員Consumer加入或者離開時,就會觸發kafka分區(partition)的從新分配,也就是partition的均衡,均衡的目的是爲了提高topic的併發(多個線程併發消費分區)消費能力。至於哪一個消費者消費哪個分區,這是有一個算法的,這個算法是可以保證一個分區必定只能被一個消費者消費,而不能被多個消費者消費,還可以保證一個消費者能夠消費多個分區。也就是分區和消費者之間是多對一(包含一對一)的關係。這個算法的最終結果是,當一個消費者組(ConsumerGroup)中消費者成員(Consumer)的數量大於一個topic分區的數量時,多餘的消費者就沒有辦法消費到數據了。

搭建Kaka集羣

  • 搭建zookeeper集羣而且啓動(略)

  • 解壓並安裝kafka集羣(務必配置主機名和IP映射關係)

    [root@CentOSX ~]# tar -zxf kafka_2.11-0.11.0.0.tgz -C /usr/ [root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/config/server.properties

    #####Server Basics ######## broker.id=0 # 每一個節點都要修改,且不一樣 delete.topic.enable=true

    Socket Server Settings

    listeners=PLAINTEXT://CentOSC:9092 # 每一個節點都要修改,且不一樣 ######### Log Basics ########## log.dirs=/usr/kafka-logs

    Log Retention Policy

    log.retention.hours=168 ########## Zookeeper ########## zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181

  • 啓動Kafka集羣

    [root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh -daemon config/server.properties

  • 關閉kafka

    [root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/bin/kafka-server-stop.sh PIDS=$(jps| grep Kafka | awk '{print $1}') # 修改 獲取進程id 號

    if [ -z "$PIDS" ]; then echo "No kafka server to stop" exit 1 else kill -s TERM $PIDS fi [root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-stop.sh

Kafka測試

//建立分區
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3 
//啓動消費者
[root@CentOSB kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning
//啓動生產者
[root@CentOSC kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01  
> hello kafka

Topic基本操做

  • 建立topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3

partitions:分區的個數,replication-factor副本因子

  • 查看topic詳情

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --describe --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 Topic:topic01 PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic01 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: topic01 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic01 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

  • 查看全部Topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --list --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 topic01 topic02 topic03

  • 刪除topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --delete --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic03 Topic topic03 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.

  • 修改Topic

    [root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --alter --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic02 --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected

分區數目只容許增長,不容許減小。

Java 鏈接Kafka集羣

<!--kafka依賴-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.2</version>
</dependency>

生產者

//1.建立Properties對象
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
//2.序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(props);
 //3.封裝Record
ProducerRecord<String,String> record=
                new ProducerRecord<String, String>("topic01","0100","zhangsan 男 18");
 //4.發送消息
kafkaProducer.send(record);
kafkaProducer.flush();
kafkaProducer.close();

消費者

//1.建立Properties對象
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
//2.反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1");

KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(props);
//3.訂閱相應的topic
kafkaConsumer.subscribe(Arrays.asList("topic01"));
//4.開始獲取消息
while(true){
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    for (ConsumerRecord<String, String> record : records) {
        String key=record.key();
        String value = record.value();
        long ts = record.timestamp();
        int partition = record.partition();
        long offset = record.offset();
        System.out.println(key+"=>"+value+"\t offset "+offset+" ,partition:"+partition+"\t"+ts);
    }
        }

自定義序列化發送對象

public class ObjectSerializer implements Serializer<Object> { // Serializer該接口是kafka的類
    public void configure(Map<String, ?> map, boolean isKey) {}

    public byte[] serialize(String topic, Object o) {
        return SerializationUtils.serialize((Serializable) o);
    }

    public void close() {}
}

---
public class ObjectDeserializer implements Deserializer<Object> {
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    public Object deserialize(String topic, byte[] data) {
        return SerializationUtils.deserialize(data);
    }

    public void close() {

    }
}

如何幹預Kafka分區策略 提升 record 的並行度

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,XxxPartitioner.class)
//自定義分區類實現Partitioner接口,而後指定分區策略
public class XxxPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return (key.hashCode()&Integer.MAX_VALUE)%numPartitions;
    }

    public void close(){
      
    }
}

訂閱形式

  • subscribe(訂閱)方式

    props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); kafkaConsumer.subscribe(Arrays.asList("topic02"));

優勢:但是自動實現 組內負載均衡和故障轉移。

  • assign (分配)方式

    TopicPartition part02 = new TopicPartition("topic02", 2); // 分區個數 kafkaConsumer.assign(Arrays.asList(part02));

優勢:手動指定分區信息,缺點:沒法實現負載均衡和故障轉移

Offset自動提交

默認客戶端自動開啓了自動提交功能,默認提交時間間隔是5秒鐘,用戶能夠採起手動提交的方式實現。開啓手動提交以下:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//關閉自動提交
//消費代碼後追加
kafkaConsumer.commitAsync();

或者適當調小自動提交時間間隔:

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//關閉自動提交

Kafka Stream-High Level

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;
public class KafkaStreamDemo {
    public static void main(String[] args) {
        //1.建立Properties對象
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        //2.添加kafka集羣信息
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        //3.添加數據類型
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		//4.建立KStreamBuilder並讀取數據文件
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        //5.對數據文件進行切分計算
        KTable<String, Long> wordCounts = textLines
                .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word)
                .count("counts");
        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
		//6.把KStreaamBuilder和Properties對象放入stream流
        KafkaStreams streams = new KafkaStreams(builder, props);
        //7.啓動stream
        streams.start();
    }
}
相關文章
相關標籤/搜索