通常批處理(一個文件 或者一批文件),無論文件多大,都是能夠度量 java
mapreduce hive sparkcore sparksqlsql
源源不斷的流水同樣 (流數據)shell
Storm SparkStreamingapache
消息 Messagebootstrap
隊列 Queue安全
消息隊列 MQ服務器
消息隊列(Queue)、主題(Topic)、發佈者(Publisher)、訂閱者(Subscriber)網絡
每一個消息能夠有多個消費者,彼此互不影響。好比我發佈一個微博:關注個人人都可以看到。分佈式
高吞吐量工具
持久性
分佈式
一個消息隊列須要哪些部分?
Topic(主題)
Broker (消息代理)
Partition (物理上的分區)
Message (消息)
準備kafka
解壓kafka
重命名
配置環境變量
export KAFKA_HOME=/opt/kafka export PATH=$PATH:$KAFKA_HOME/bin
編輯server.properties
broker.id=1 log.dirs=/opt/kafka/logs zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181 listeners=PLAINTEXT://:9092
啓動kafka-server服務
kafka-server-start.sh [-daemon] server.properties
中止kafka服務
kafka-server-stop.sh
只須要在每一個機器上修改對應的 ==broker.id=1== 便可
建立topic
kafka-topics.sh --create --topic t1 --partitions 3 --replication-factor 1 --zookeeper uplooking03:2181,uplooking04:2181
==注意: 建立topic過程的問題,replication-factor個數不能超過brokerserver的個數==
查看topic
kafka-topics.sh --list --zookeeper uplooking03
查看具體topic的詳情
kafka-topics.sh --describe --topic t1 --zookeeper uplooking04:2181
PartitionCount:topic對應的partition的個數 ReplicationFactor:topic對應的副本因子,說白就是副本個數 Partition:partition編號,從0開始遞增 Leader:當前partition起做用的breaker.id Replicas: 當前副本數據存在的breaker.id,是一個列表,排在最前面的其做用 Isr:當前kakfa集羣中可用的breaker.id列表
修改topic(不能修改replication-factor,以及只能對partition個數進行增長,不能減小 )
kafka-topics.sh --alter --topic t1 --partitions 4 --zookeeper uplooking03
刪除Topic
kafka-topics.sh --delete --topic t1 --zookeeper uplooking03
ps:這種刪除只是標記刪除,要想完全刪除必須設置一個屬性,在server.properties中配置delete.topic.enable=true,不然只是標記刪除
配置完成以後,須要重啓kafka服務
kafka-console-producer.sh --topic t1 --broker-list uplooking03:9092,uploo king04:9092,uplooking05:9092
kafka-console-consumer.sh --zookeeper uplooking03 --topic t1
--from-beginning:從頭開始消費 --blacklist:黑名單過濾(kafka-console-consumer.sh --zookeeper uplooking03 --blacklist t1,t3) --whitelist:白名單過濾(kafka-console-consumer.sh --zookeeper uplooking03 --whitelist t2) ps:--topic|--blacklist|--whitelist 只能出現其中一個
配置flume的agent配置文件
touch flume-kafka.properties
# 對各個組件的描述說明 # 其中a1爲agent的名字 # r1是a1的source的代號名字 # c1是a1的channel的代號名字 # k1是a1的sink的代號名字 ############################################ a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 用於描述source的,類型是netcat網絡 a1.sources.r1.type = netcat # source監聽的網絡ip地址和端口號 a1.sources.r1.bind = uplooking01 a1.sources.r1.port = 44444 # 用於描述sink,類型是kafka a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = hadoop a1.sinks.k1.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 2 # 用於描述channel,在內存中作數據的臨時的存儲 a1.channels.c1.type = memory # 該內存中最大的存儲容量,1000個events事件 a1.channels.c1.capacity = 1000 # 可以同時對100個events事件監管事務 a1.channels.c1.transactionCapacity = 100 # 將a1中的各個組件創建關聯關係,將source和sink都指向了同一個channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啓動flume開始採集數據
[root@uplooking01:/opt/flume/conf] flume-ng agent --name a1 --conf-file flume-kafka.properties
開啓Kafka消息消費工具
[root@uplooking03:/opt/flume/conf] kafka-console-consumer.sh --zookeeper uplooking03 --topic hadoop
給flume監聽的Source發送數據
[root@uplooking03:/] nc uplooking01 44444
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.0.1</version> </dependency>
建立生產者的配置文件 producer.properties
bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
建立生產者而且發送數據到topic中
public class MyKafkaProducer { public static void main(String[] args) throws IOException { Properties prop = new Properties(); prop.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties")); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(prop); kafkaProducer.send(new ProducerRecord<String, String>("hadoop", "name", "admin123")); kafkaProducer.close(); } }
建立消費者的配置文件consumer.properties
zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181 group.id=test-consumer-group bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
建立消息消費者消費topic中的數據
public static void main(String[] args) throws Exception { Properties prop = new Properties(); prop.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties")); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(prop); Collection topics = new ArrayList(); topics.add("hadoop"); kafkaConsumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } } }
自定義分區(MyCustomPartition)
package com.uplooking.bigdata.kafka.partition; public class MyCustomPartition implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //獲取分區數, 分區編號通常都是從0開始 int partitionSize = cluster.partitionCountForTopic(topic); int keyHash = Math.abs(key.hashCode()); int valueHash = Math.abs(value.hashCode()); return keyHash % partitionSize; } public void close() { } public void configure(Map<String, ?> configs) { } }
配置自定義分區(producer.properties)
partitioner.class=com.uplooking.bigdata.kafka.partition.MyCustomPartition