一. 需求背景
最近新接觸一個需求,須要將kafka中的數據實時推送到前端展現。最開始想到的是前端輪詢接口數據,可是沒法保證輪詢的頻率和消費的頻率徹底一致,或形成數據缺失等問題。最終肯定用利用WebSocket實現數據的實時推送。
二. websocket簡介
網上已經有好多介紹WebSocket的文章了,就不詳細介紹了,這裏只作簡單介紹。 WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通訊——容許服務器主動發送信息給客戶端。
三. 服務端實現
1. pom文件
這裏須要引用三個依賴。第一個爲WebSocket須要的依賴,另外兩個爲kafka的依賴
<dependencies>
<!-- webSocket所需依賴 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- kafka 所需依賴 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
javascript
2. webSocket服務端實現html
1 //此處定義接口的uri 2 @ServerEndpoint("/wbSocket") 3 public class WebSocket { 4 private Session session; 5 public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此處定義靜態變量,以在其餘方法中獲取到全部鏈接 6 7 /** 8 * 創建鏈接。 9 * 創建鏈接時入參爲session 10 */ 11 @OnOpen 12 public void onOpen(Session session){ 13 this.session = session; 14 wbSockets.add(this); //將此對象存入集合中以在以後廣播用,若是要實現一對一訂閱,則類型對應爲Map。因爲這裏廣播就能夠了隨意用Set 15 System.out.println("New session insert,sessionId is "+ session.getId()); 16 } 17 /** 18 * 關閉鏈接 19 */ 20 @OnClose 21 public void onClose(){ 22 wbSockets.remove(this);//將socket對象從集合中移除,以便廣播時不發送次鏈接。若是不移除會報錯(須要測試) 23 System.out.println("A session insert,sessionId is "+ session.getId()); 24 } 25 /** 26 * 接收前端傳過來的數據。 27 * 雖然在實現推送邏輯中並不須要接收前端數據,可是做爲一個webSocket的教程或叫備忘,仍是將接收數據的邏輯加上了。 28 */ 29 @OnMessage 30 public void onMessage(String message ,Session session){ 31 System.out.println(message + "from " + session.getId()); 32 } 33 34 public void sendMessage(String message) throws IOException { 35 this.session.getBasicRemote().sendText(message); 36 } 37 }
3. kafka消費者實現前端
1 public class ConsumerKafka extends Thread { 2 3 private KafkaConsumer<String,String> consumer; 4 private String topic = "kafkaTopic"; 5 6 public ConsumerKafka(){ 7 8 } 9 10 @Override 11 public void run(){ 12 //加載kafka消費者參數 13 Properties props = new Properties(); 14 props.put("bootstrap.servers", "localhost:9092"); 15 props.put("group.id", "ytna"); 16 props.put("enable.auto.commit", "true"); 17 props.put("auto.commit.interval.ms", "1000"); 18 props.put("session.timeout.ms", "15000"); 19 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 20 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 21 //建立消費者對象 22 consumer = new KafkaConsumer<String,String>(props); 23 consumer.subscribe(Arrays.asList(this.topic)); 24 //死循環,持續消費kafka 25 while (true){ 26 try { 27 //消費數據,並設置超時時間 28 ConsumerRecords<String, String> records = consumer.poll(100); 29 //Consumer message 30 for (ConsumerRecord<String, String> record : records) { 31 //Send message to every client 32 for (WebSocket webSocket :wbSockets){ 33 webSocket.sendMessage(record.value()); 34 } 35 } 36 }catch (IOException e){ 37 System.out.println(e.getMessage()); 38 continue; 39 } 40 } 41 } 42 43 public void close() { 44 try { 45 consumer.close(); 46 } catch (Exception e) { 47 System.out.println(e.getMessage()); 48 } 49 } 50 51 //供測試用,若經過tomcat啓動需經過其餘方法啓動線程 52 public static void main(String[] args){ 53 ConsumerKafka consumerKafka = new ConsumerKafka(); 54 consumerKafka.start(); 55 } 56 }
P.S. 須要注意的是WebSocket對tomcat版本是有要求的,筆者使用的是7.0.7.8。
四. 前端簡單實現
1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 <meta charset="UTF-8"> 5 <title>WebSocket client</title> 6 <script type="text/javascript"> 7 var socket; 8 if (typeof (WebSocket) == "undefined"){ 9 alert("This explorer don't support WebSocket") 10 } 11 12 function connect() { 13 //Connect WebSocket server 14 socket =new WebSocket("ws://127.0.0.1:8080/wbSocket"); 15 //open 16 socket.onopen = function () { 17 alert("WebSocket is open"); 18 } 19 //Get message 20 socket.onmessage = function (msg) { 21 alert("Message is " + msg); 22 } 23 //close 24 socket.onclose = function () { 25 alert("WebSocket is closed"); 26 } 27 //error 28 socket.onerror = function (e) { 29 alert("Error is " + e); 30 } 31 } 32 33 function close() { 34 socket.close(); 35 } 36 37 function sendMsg() { 38 socket.send("This is a client message "); 39 } 40 </script> 41 </head> 42 <body> 43 <button onclick="connect()">connect</button> 44 <button onclick="close()">close</button> 45 <button onclick="sendMsg()">sendMsg</button> 46 </body> 47 </html>
五. 結語
以上基本能夠實現將kafka數據實時推送到前端。這是筆者第一篇筆記,不足之處請指出、諒解。
源碼:https://github.com/youtNa/webSocketkafka
引用:1.
webSocket百度百科