架構、分佈式、日誌隊列,標題本身都看着唬人,其實就是一個日誌收集的功能,只不過中間加了一個Kafka作消息隊列罷了。java
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。git
Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,有以下特性:redis
發佈和訂閱消息流,這個功能相似於消息隊列,這也是kafka歸類爲消息隊列框架的緣由spring
以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流apache
能夠再消息發佈的時候進行處理json
在系統或應用程序之間構建可靠的用於傳輸實時數據的管道,消息隊列功能服務器
構建實時的流數據處理程序來變換或處理數據流,數據處理功能網絡
Linux、JDK、Zookeepersession
wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
tar -zxvf kafka_2.11-0.10.0.1.tgz cd kafka_2.11-0.10.0.1
bin 啓動,中止等命令 config 配置文件 libs 類庫
#########################參數解釋############################## broker.id=0 #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣 port=9092 #當前kafka對外提供服務的端口默認是9092 host.name=192.168.1.170 #這個參數默認是關閉的 num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能 socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 num.partitions=1 #默認的分區數,一個topic默認1個分區數 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除 log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能 zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #設置zookeeper的鏈接端口、若是非集羣配置一個地址便可 #########################參數解釋##############################
啓動kafka以前要啓動相應的zookeeper集羣、自行安裝,這裏不作說明。數據結構
#進入到kafka的bin目錄 ./kafka-server-start.sh -daemon ../config/server.properties
spring-boot、elasticsearch、kafka
<!-- kafka 消息隊列 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; /** * 生產者 * 建立者 科幫網 * 建立時間 2018年2月4日 */ @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
mport java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; /** * 消費者 * 建立者 科幫網 * 建立時間 2018年2月4日 */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.itstyle.es.common.utils.JsonMapper; import com.itstyle.es.log.entity.SysLogs; import com.itstyle.es.log.repository.ElasticLogRepository; /** * 掃描監聽 * 建立者 科幫網 * 建立時間 2018年2月4日 */ @Component public class Listener { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private ElasticLogRepository elasticLogRepository; @KafkaListener(topics = {"itstyle"}) public void listen(ConsumerRecord<?, ?> record) { logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value()); if(record.key().equals("itstyle_log")){ try { SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class); logger.info("kafka保存日誌: " + log.getUsername()); elasticLogRepository.save(log); } catch (Exception e) { e.printStackTrace(); } } } }
/** * kafka 日誌隊列測試接口 */ @GetMapping(value="kafkaLog") public @ResponseBody String kafkaLog() { SysLogs log = new SysLogs(); log.setUsername("紅薯"); log.setOperation("開源中國社區"); log.setMethod("com.itstyle.es.log.controller.kafkaLog()"); log.setIp("192.168.1.80"); log.setGmtCreate(new Timestamp(new Date().getTime())); log.setExceptionDetail("開源中國社區"); log.setParams("{'name':'碼雲','type':'開源'}"); log.setDeviceType((short)1); log.setPlatFrom((short)1); log.setLogType((short)1); log.setDeviceType((short)1); log.setId((long)200000); log.setUserId((long)1); log.setTime((long)1); //模擬日誌隊列實現 String json = JsonMapper.toJsonString(log); kafkaTemplate.send("itstyle", "itstyle_log",json); return "success"; }
以前簡單的介紹過,JavaWeb項目架構之Redis分佈式日誌隊列,有小夥伴們聊到, Redis PUB/SUB沒有任何可靠性保障,也不會持久化。固然了,原項目中僅僅是記錄日誌,並非十分重要的信息,能夠有必定程度上的丟失
Kafka與Redis PUB/SUB之間最大的區別在於Kafka是一個完整的分佈式發佈訂閱消息系統,而Redis PUB/SUB只是一個組件而已。