往期精選java
第一篇咱們講到了docker的單機搭建。算法
第二篇咱們講到了與springboot的整合。spring
第三篇咱們講到了kafka的原理。docker
這一篇咱們將敘述,我是怎麼在項目中進行對kafka優化的咱們將從三方面進行考慮,一是代碼;二是 配置;三是集羣。項目背景,作數據遷移工做後面我將寫幾篇文章講訴咱們是怎麼對百萬數據進行遷移的工做)。主要場景利用kafka作讀寫分離,一直請求源數據寫入到kafka生產者,而後kafka消費者進行寫入數據到新數據。數據庫
1、配置優化《報錯》節選:apache
[2018-09-25 11:23:59.370] ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] LoggingErrorHandler.java:37 - Error while processing: null org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1324) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1185) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.lang.Thread.run(Thread.java:748)
這是因爲kafka一直生產數據,致使kafka消費太慢了。咱們主要優化也是對消費者進行優化。根據上面的報錯,咱們能夠看到一個參數:max-poll-records,因此咱們首先將對提交數,進行調大。具體的須要根據項目進行測試,咱們把數進行調大到100,同時對下面的參數進行:springboot
#自動提交offset到zookeeper的時間間隔微信
auto-commit-interval: 1000session
#earliest app
#當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
#latest
#當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
#none
#topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
auto-offset-reset: latest
#提交方式改成false,是否自動週期性提交已經拉取到消費端的消息offset
enable-auto-commit: false
因爲使用了spring-kafka,則把kafka-client的enable.auto.commit設置成了false,表示禁止kafka-client自動提交offset,由於就是以前的自動提交失敗,致使offset永遠沒更新,從而轉向使用spring-kafka的offset提交機制。而且spring-kafka提供了多種提交策略:
而後我修改了kafka的配置(spring-kafka),須要到安裝的文件(config)下進行修改,分別是生產文件和配置文件。
1.session.timeout.ms=100000(增大session超時時間)。
2.request.timeout.ms=110000(socket握手超時時間,默認是3000 可是kafka配置要求大於session.timeout.ms時間).
同時Kafka的生產端能夠壓縮消息,若是原始消息是XML,當經過壓縮以後,消息可能會變得不那麼大。在生產端的配置參數中使用compression.codec和commpressed.topics能夠開啓壓縮功能,壓縮算法可使用GZip或Snappy。
2、代碼優化:《日誌》節選
若是進行了的配置調優,差很少會提升kafka的消費能力,可是寫入過大,控制檯仍是打印下面日誌信息:
2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-4, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0] [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-1, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0] [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-3, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXXX-0] [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXX-0] [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0] [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0] [2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:336 - [Consumer clientId=consumer-4, groupId=test-consumer-group]
也是就說機制一直打印這些信息,可是又不報錯,可是又不寫入數據,咱們就想,除了配置優化以後,能不能像數據庫同樣,批量提交或者說是批量消費呢?看了官網資料,發現確實能夠,如下是咱們對代碼的優化,由單一的消費,改成批量消費:
一:增長一個config類。
@Configuration @EnableKafka public class KafkaConfig { @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(10); factory.getContainerProperties().setPollTimeout(1500); factory.setBatchListener(true); //@KafkaListener 批量消費 每一個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); //設置提交偏移量的方式 return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(16); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP地址須要修改"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000); propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,110000); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 150); //每一個批次獲取數 return propsMap; } }
2、更改消費接受代碼。
@KafkaListener(topics = {"消費名稱須要改"}) public void listen(List<ConsumerRecord> records, Acknowledgment ack) { try { for (ConsumerRecord record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } } } catch (Exception e) { log.error("kafka失敗,當前失敗的批次。data:{}", records); e.printStackTrace(); } finally { ack.acknowledge(); } }
集羣搭建
前面雖然優化配置和代碼,可是代碼執行仍是不夠快,網上尋找資料(提升了partition的數量,從而提升了consumer的並行能力,從而提升數據的消費能力),說能夠提升分區數量,若是單機怎麼提升仍是同樣的(咱們試過了),後來搭建了一個集羣。注意咱們是使用docker搭建kafka集羣的,搭建過程以下。docker-compose.yml內容:
version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9095:9095" environment: KAFKA_ADVERTISED_HOST_NAME: IP地址 KAFKA_ADVERTISED_PORT: 9095 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://XXXXXXXX:9095 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9095 KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_LOG_RETENTION_HOURS: 1 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 100000 KAFKA_NUM_PARTITIONS: 2 KAFKA_DELETE_RETENTION_MS: 1000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock kafka-manager: image: sheepkiller/kafka-manager links: - kafka - zookeeper environment: ZK_HOSTS: zookeeper:2181 APPLICATION_SECRET: letmein KM_ARGS: -Djava.net.preferIPv4Stack=true ports: - "9000:9000"
1.啓動的命令:
docker-compose up -d
2.先去修改配置文件的端口,而後再啓動相關的命令:
docker-compose scale kafka=2
3.再次修改文件袋的端口,而後再啓動相關的命令:
docker-compose scale kafka=3
以上就是我所總結的kafka優化,歡迎有更好的方案進行交流,歡迎關注微信號:繁榮Aaron和轉發。