注:本文所指版本Kafka 1.1html
Kafka是由Apache開發的一款發佈訂閱消息系統,是分佈式的,分區的重複的日誌服務。java
①、解耦web
容許兩方修改處理過程,只要遵循共同的接口約束。apache
②、靈活性和峯值處理能力json
面對忽然增長的吞吐量有很好應對,發送信息量50M,消費信息量100M。bootstrap
③、消息冗餘後端
消息隊列把數據持久化直到已經徹底被處理。與以往消息「插入 - 獲取 - 刪除」不一樣,在刪除消息時,必須肯定消息已被處理完畢。api
④、擴展性緩存
擴展性較好,只要增長入隊和消費處理過程便可。服務器
⑤、順序保證
針對消息順序的重要性,kafka保證一個partition內的消息有序性。
①、日誌收集
ELK日誌採集框架中,利用kafka同Logstash來收集服務端日誌。
②、消息系統
解耦生產者與消費者,緩存消息,實現異步處理。
③、實現消息 「發佈-訂閱模式「
對於不一樣消費者消費同一消息,利用Kafka實現:同一個topic中的消息只能被同一個Consumer Group中的一個消費者消費,但能夠被多個Consumer Group消費這一消息。
④、用戶活動跟蹤
Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到Hadoop、數據倉庫中作離線分析和挖掘。
⑤、運營指標
Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
大概介紹些知識點,做爲一個後端開發,瞭解如何用及基本原理就能夠了。
kafka官網給出的交互流程
消息處理流程
producer.send(ProducerRecord<K,V> record); 生產者在發送消息時,沒有找到topic,會自動建立???
auto.create.topics.enable = true (默認爲true,若沒找到topic則自動建立)
注意我這裏有些值寫的是僞代碼,還需封裝到一個公共類中調取。如kafka地址等。
另外,下列代碼全手打,有錯誤的地方請指正。
producer和consumer均依賴 kafka-clients.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
2.一、producer configs
Properties properties = new Properties(); properties.put("bootstrap.servers", "ipAndPort"); //指定kafka服務地址,集羣的狀況用逗號分隔,如:host1:port1,host2:port2 ... properties.put("acks", "all");//表示完成Requests前須要認可的數量。 0:無需認可直接發送到socket 1:須要leader認可 all/-1:須要所有認可後發送 properties.put("retries", 0);//發生錯誤時,重傳次數。當開啓重傳時,須要將`max.in.flight.requests.per.connection`設置爲1,不然可能致使失序 properties.put("batch.size", 16384); properties.put("linger.ms", 1); //1毫秒,簡單講,就是延時1ms,把期間收集到的全部Requests聚合到一塊兒發送,以此提升吞吐量 properties.put("buffer.memory", 33554432);//默認值就是 33554443,緩存數據的內存大小;若生產速度大於Producer向Broker發送速度,會阻塞超時拋出異常 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定扔到kafka的鍵值對中鍵的類型,實例包下還有Long、Double、Short等等 properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定鍵值對的值類型,具體同上
2.二、KafkaProducerUtil.java
這裏定義一個Producer的工具類,封裝了producer的初始化及發送消息到指定topic。
public class KafkaProducerUtil { private static Logger logger = Logger.getLogger(KafkaProducerUtil.class); private static Producer<String, String> producer;
/** 作一個簡單的單例模式,實例化producer對象 **/ private static Producer<String, String> getProducer() { if (producer == null) { synchronized (KafkaProducerUtil.class) { if (producer == null) { Properties properties = new Properties(); //寫入配置信息供初始化Producer producer = new KafkaProducer<String, String>(properties); } } } return producer; }
/**
* send record to topic
*/
public static void sendToKafka(String topic, String message, Long timeOut) throws InterruptedException, ExecutionException, TimeoutException { producer = getProducer(); producer.send(new ProducerRecord<String, String>(topic, message)).get(timeOut, TimeUnit.SECONDS); logger.info("sendToKafka:" + message); } }
2.三、KafkaProducerService.java
提供工業務層調用的接口,這裏作了http請求方式的兼容處理。
public interface KafkaProducerService {
/**
* @param httpUrl 經過http發送請求的方式調用地址
* @param code 生成topic
* @param request 請求數據
* @param version 加簽
* @param timeOut 超時時間
*/
void send(String httpUrl, String code, Object request, String version, Long timeout);
}
@Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static Logger logger = LoggerFactory.getLogger(KafkaProducerServiceImpl.class); public void send(String httpUrl, String code, Object request, String version, Long timeout) { String isOpenFlag = "能夠做爲系統參數,不一樣環境有不一樣的啓用程度,開啓走kafka,關閉走http方式"; if ("open" == isOpenFlag) { sendByKafka(httpUrl, code, request, version, timeout); } else { sendByHttp(...); } } private void sendByKafka(String httpUrl, String code, Object request, String version, Long timeout) { // 按必定規則拼接topic String topic = "XXX能夠依環境決定,也能夠依系統決定" + "_" + code; // 取加簽私鑰 String privateKey = "自行封裝"; logger.info("topic:" + topic + "請求報文:" + JSON.toJSONOString(request, SerializerFeature,WriteMapNullValue)); // 報文加簽 Object producerObject = SecurityUtil.digest(request, privateKey, version); String producerRecord = JSON.toJSONString(producerObject, SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.WriteMapNullValue); try { KafkaProducerUtil.sendToKafka(topic, producerRecord, timeout); } catch (RuntimeException e) { //出現異常,改用http調用 logger.error(e.getMessage()); sendByHttp(httpUrl, request, version); } } private void sendByHttp(String httpUrl, Strign request, String version) { //TODO 自行封裝,一般狀況加驗籤,把請求報文打印日誌,轉成json格式發送至api接口,此處不過多贅述。 } }
以上基本知足Producer方使用。
3.一、consumer configs
Properties props = new Properties(); props.put("bootstrap.servers", "IpAndPort"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000");//自動提交時間間隔,前提是 enable.auto.commit設置爲true props.put("session.timeout.ms", "30000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//指定接受到數據的鍵值對類型 props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "unique group id");//指定consumer的惟一group
3.二、KafkaConsumerInit.java
consumer方初始化類,封裝參數配置、線程定義,業務接口調用等。
public class KafkaConsumerInit extends Thread { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerInit.class); private final AtomicBoolean closed = new AtomicBoolean(false); KafkaConsumer consumer; //此處格式<topic對應code,對應業務bean名> private Map<String, String> topicsAndBeans; public KafkaConsumerInit(Map<String, String> topicsAndBeans) { super(); // 作轉換,根據部署環境,topic加環境的前綴 Set<String> codes= topicsAndBeans.keySet(); Map<String, String> realTopicsAndBeans = new HashMap<String, String>(); for (String code : codes) { String topic = "XXX能夠依環境決定,也能夠依系統決定" + "_" + code; realTopicsAndBeans.put(topic, topicsAndBeans.get(code)); } this.topicsAndBeans = realTopicsAndBeans; } @Override public void run() { String isOpenFlag = "XXX"; if (!"open".equals(isOpenFlag)) { logger.info("===============配置文件設置 KafkaConsumer 不啓動==============="); return; } logger.info("===============啓動KafkaConsumer==============="); try { Properties props = new Properties(); //TODO 添加consumer configs consumer = new KafkaConsumer<>(props); //給consumer註冊topics 類型Collection<String> consumer.subscribe(topicsAndBeans.keySet()); logger.info("初始化consumer參數"); while (!closed.get()) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { try { logger.info("offset = " + record.offset() + ", value = " + record.value()); logger.info(record.value());
//定義一個統一接口,不一樣業務實現同一接口。 CommunicationConsumerService communicationHandleService = (CommunicationConsumerService) ApplicationContext.getContext() .getBean(topicsAndBeans.get(record.topic())); communicationHandleService.doHandle(record.value()); } catch (Exception e) { logger.error("數據處理異常:" + record.value()); logger.error(e.getMessage(), e); } } sleep(1000); } } catch (WakeupException e) { logger.error(e.getMessage(), e); if (!closed.get()) { throw e; } } catch (InterruptedException e) { logger.error(e.getMessage(), e); } finally { consumer.close(); } } }
3.三、KafkaConsumerListener.java
服務啓動後初始化kafka,這裏是利用基於Spring的ApplicationListener接口實現的,若果這方面知識還不清楚,先請點這裏。
@Component public class KafkaConsumerListener implements ApplicationListener<ApplicationEvent> { private static final Logger logger = LogManager.getLogger(KafkaConsumerListener.class); @Override public void onApplicationEvent(ApplicationEvent event) { logger.info("==============添加訂閱的topic和對應的處理方法==============="); Map<String, String> topicAndBeans = new HashMap<String, String>(); topicAndBeans.put("codeForTopic1", "beanImpl1"); topicAndBeans.put("codeForTopic2", "beanImpl2"); topicAndBeans.put("codeForTopic3", "beanImpl3"); //實例化consumer new KafkaConsumerInit(topicAndBeans).start(); } }
2.3.四、ComsumerService.java
具體業務層實現及封裝調用
public interface ConsumerService { /** * @param message */ void doHandle(String message); }
@Serivce public class BeanImpl1 implements ConsumerService { public void doHandle(String message) { //TODO 集體業務實現 } }
以上基本知足consumer方使用。