做爲消息中間件,Kafka用以服務消息的異步傳輸
功能和JMS相似:生產者把消息放進隊列中,消費者從隊列中獲取數據
可是實現起來倒是徹底不同spring
1.Producer是經過鏈接Broker來發送消息的
2.Producer是向某個Topic來傳遞送消息的
3.發送到Topic中的消息是能夠給不一樣Broker處理的
4.Consumer是鏈接到某個Broker來監聽訂閱的Topic的
5.Brokers是經過Zookeeper來進行管理的,並互相通知各自的運行狀況
6.若是Producer或Consumer鏈接到的Broker沒有相關的Topic的,那麼消息會自動路由到相關的Broker, 下一次Producer或者Consumer會自動記住相關的Brokerapache
1.在發送消息後,每一個消息會依次排列到每一個Partition
2.消息是能夠經過配置來決定要在Partition上保留多久
3.每一個消費者能夠從不一樣隊列位置來開始消費消息,而且能夠重複消費bootstrap
下載:curl
curl -L -O http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
修改配置異步
vi kakfa/config/server.properties 日誌目錄 log.dirs=/tmp/kafka/logs 分配內存(小於256會致使內存溢出) export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" 向zookeeper註冊 zookeeper.connect=106.15.205.155:2181
刪除數據(防止磁盤爆炸)maven
vi /config/server.properties 根據存放時間 log.retention.hours=168 根據數據大小 log.segment.bytes=1073741824
啓動url
bin/kafka-server-start.sh config/server.properties &
關閉spa
bin/kafka-server-stop.sh
建立一個topic日誌
bin/kafka-topics.sh --create --zookeeper 106.15.205.155:2181 --replication-factor 1 --partitions 1 --topic test
查看全部topiccode
bin/kafka-topics.sh --list --zookeeper 106.15.205.155:2181
producer發送消息
bin/kafka-console-producer.sh --broker-list 106.15.205.155:9092 --topic test
consumer接收消息
bin/kafka-console-consumer.sh --zookeeper 106.15.205.155:2181 --topic test --from-beginning
刪除指定topic
./bin/kafka-topics.sh --delete --zookeeper 106.15.205.155:2181 --topic test
使用spring操做kafka
導入maven依賴:
<!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency>
消息生產者:
配置類:
@Configuration public class KafkaProducerConfig { private String bootstrapServers = "IP:9092"; @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
發送消息類:
@Component public class KafkaSender { @Autowired private KafkaTemplate kafkaTemplate; public void send(){ kafkaTemplate.send("topic_1","luxiaotao"); } }
消息接受者
配置類:
@Configuration public class KafkaConsumerConfig { private String bootstrapServers="IP:9092"; private String topic = "topic_1"; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
消息接受類:
@Component public class KafkaReceive { @KafkaListener(topics = {"topic_1"}) public void receive(String content){ System.out.println("============================="+content+"============================"); } }
以上即是kafka的基本安裝和使用,謝謝閱讀