因爲最近工做中接觸了比較多關閉消息推送以及異常重發機制的問題,終於得空總結一下經驗java
主動推送:通常爲websocket創建長鏈接實現,此處網上多有各類實現方式。下面貼出本人結合實際應用場景使用的長鏈接方式。web
websocket服務端代碼redis
import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import javax.annotation.PostConstruct; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint(value = "/websocket/{id}") @Component @Slf4j public class WebSocket { // 靜態變量,用來記錄當前在線鏈接數。應該把它設計成線程安全的。 private static int onlineCount = 0; // concurrent包的線程安全Set,用來存放每一個客戶端對應的MyWebSocket對象。 private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>(); // 保存容許創建鏈接的id private static List<String> idList = Lists.newArrayList(); private String id = ""; /** * 這裏使用AutoWired注入的bean會出現沒法持續保存而出現null的狀況。 * 具體緣由暫時沒有深究,若是有須要時,能夠再init初始化方法中手動將臨時的beanTmp類存入static常量中便可正常使用該bean類。 * @Autowired * private RedisCacheUtil redisTmp; * private static RedisCacheUtil redis; * */ // 與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據 private Session session; public void closeConn(String appId) { // 關閉鏈接 try { WebSocket socket = webSocketSet.get(appId); if (null != socket) { if (socket.session.isOpen()) { socket.session.close(); } } } catch (IOException e) { System.out.println("IO異常"); e.printStackTrace(); } idList.remove(appId); } /** * 鏈接/註冊時去重 */ public void conn(String appId) { // 去重 if (!idList.contains(appId)) { idList.add(appId); } } /** * 獲取註冊在websocket進行鏈接的id */ public static List<String> getIdList() { return idList; } /** * 初始化方法 * @author caoting * @date 2019年2月13日 */ @PostConstruct public void init() { try { /** * TODO 這裏的設計是在項目啓動時從DB或者緩存中獲取註冊了容許創建鏈接的id * 而後將獲取到的id存入內存--idList * // 從數據庫獲取idList * List<WsIds> ids = wsIdsServiceTmp.selectList(null); */ // TODO 初始化時將剛注入的對象進行靜態保存 // redis = redisTmp; } catch (Exception e) { // TODO 項目啓動錯誤信息 } } /** * 鏈接啓動時查詢是否有滯留的新郵件提醒 * @param id * * @author caoting * @throws IOException * @date 2019年2月28日 */ private void selectOfflineMail(String id) throws IOException { // 查詢緩存中是否存在離線郵件消息 Jedis jedis = redis.getConnection(); try { List<String> mails = jedis.lrange(Constant.MAIL_OFFLINE+id, 0, -1); if (CommomUtil.isNotEmpty(mails)) { for (String mailuuid : mails) { String mail = jedis.get(mailuuid); if (StringUtils.isNotEmpty(mail)) sendToUser(Constant.MESSAGE_MAIL + mail, id); Thread.sleep(1000); } // 發送完成從緩存中移除 jedis.del(Constant.MAIL_OFFLINE+id); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { jedis.close(); } } /** * 鏈接創建成功調用的方法 * @param id */ @OnOpen public void onOpen(@PathParam(value = "id") String id, Session session) { try { // 注:ws-admin是管理員內部使用通道 不受監控 謹慎使用 if (!id.contains(Constant.WS_ADMIN)) { this.session = session; this.id = id;//接收到發送消息的人員編號 // 驗證id是否在容許 if (idList.contains(id)) { // 判斷是否已存在相同id WebSocket socket = webSocketSet.get(id); if (socket == null) { webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數加1 this.sendMessage("Hello:::" + id); System.out.println("用戶"+id+"加入!當前在線人數爲" + getOnlineCount()); // 檢查是否存在離線推送消息 selectOfflineMail(id); } else { this.sendMessage(Constant.MESSAGE_ERROR+"鏈接id重複--鏈接即將關閉"); this.session.close(); } } else { // 查詢數據庫中是否存在數據 WsIds wsIds = wsIdsService.selectByAppId(id); if ( null != wsIds ) { idList.add(id); webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數加1 this.sendMessage("Hello:::" + id); log.debug("用戶"+id+"加入!當前在線人數爲" + getOnlineCount()); // 檢查是否存在離線推送消息 selectOfflineMail(id); } else { // 關閉 this.sendMessage(Constant.MESSAGE_ERROR+"暫無鏈接權限,鏈接即將關閉,請確認鏈接申請是否過時!"); this.session.close(); log.warn("有異常應用嘗試與服務器進行長鏈接 使用id爲:"+id); } } } else { this.session = session; this.id = id;//接收到發送消息的人員編號 webSocketSet.put(id, this); //加入set中 addOnlineCount(); // 在線數加1 this.sendMessage("Hello:::" + id); log.debug("用戶"+id+"加入!當前在線人數爲" + getOnlineCount()); } } catch (IOException e) { e.printStackTrace(); } } /** * 鏈接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this.id); // 從set中刪除 subOnlineCount(); // 在線數減1 log.debug("有一鏈接關閉!當前在線人數爲" + getOnlineCount()); } /** * 收到客戶端消息後調用的方法 * * @param message * 客戶端發送過來的消息 */ @OnMessage public void onMessage(String message, Session session) { log.debug("來自客戶端的消息:" + message); // TODO 收到客戶端消息後的操做 } /** * 發生錯誤時調用 */ @OnError public void onError(Session session, Throwable error) { log.debug("發生錯誤"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getAsyncRemote().sendText(message); } /** * 發送信息給指定ID用戶,若是用戶不在線則返回不在線信息給本身 * @param message * @param sendUserId * @throws IOException */ public Boolean sendToUser(String message, String sendUserId) throws IOException { Boolean flag = true; WebSocket socket = webSocketSet.get(sendUserId); if (socket != null) { try { if (socket.session.isOpen()) { socket.sendMessage(message); } else { flag = false; } } catch (Exception e) { flag = false; e.printStackTrace(); } } else { flag = false; log.warn("【" + sendUserId + "】 該用戶不在線"); } return flag; } /** * 羣發自定義消息 */ public void sendToAll(String message) throws IOException { for (String key : webSocketSet.keySet()) { try { WebSocket socket = webSocketSet.get(key); if (socket.session.isOpen()) { socket.sendMessage(message); } } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { if (WebSocket.onlineCount > 0) WebSocket.onlineCount--; } }
這裏使用的是較爲原始的websocket鏈接方式,事實上springboot已經融合了websocket,工做關係沒有空暫未研究。記錄一下有空了再去寫寫demo。這個socket服務端主要實現了:1.鏈接控制
,創建鏈接時驗證id的合法性。無證鏈接進行異常記錄並關閉鏈接。2.離線消息檢測到上線當即推送
這是消息推送須要實現的基本功能之一了,詳見代碼。3.統計在線人數
依舊是基本功能
下面是websocket服務端配置類WebSocketServerConfig spring
import lombok.extern.slf4j.Slf4j; import org.apache.catalina.session.StandardSessionFacade; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; import javax.servlet.http.HttpSession; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; import javax.websocket.server.ServerEndpointConfig.Configurator; @Configuration @Slf4j public class WebSocketServerConfig extends Configurator { @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { /* 若是沒有監聽器,那麼這裏獲取到的HttpSession是null */ StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession(); if (ssf != null) { HttpSession httpSession = (HttpSession) request.getHttpSession(); // 關鍵操做 sec.getUserProperties().put("sessionId", httpSession.getId()); log.debug("獲取到的SessionID:" + httpSession.getId()); } } /** * 若是使用獨立的servlet容器,而不是直接使用springboot的內置容器 * 就不要注入ServerEndpointExporter,由於它將由容器本身提供和管理。 * 即:生產環境中在獨立的tomcat運行時請註釋掉這個bean * * @return * * @author caoting * @date 2019年2月20日 */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
這裏其實有個坑,就是上述代碼中的bean類
serverEndpointExporter
,開發環境若是不是配置獨立的tomcat運行的話是須要注入的,可是生產環境下在獨立的tomcat容器運行時是須要註釋掉的,不然會報錯。
很重要的session監聽器 RequestListener數據庫
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.servlet.ServletRequestEvent; import javax.servlet.ServletRequestListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; /** * 監聽器類:主要任務是用ServletRequest將咱們的HttpSession攜帶過去 * 此註解千萬千萬不要忘記,它的主要做用就是將這個監聽器歸入到Spring容器中進行管理,至關於註冊監聽 */ @Component @Slf4j public class RequestListener implements ServletRequestListener { @Override public void requestInitialized(ServletRequestEvent sre) { // 將全部request請求都攜帶上httpSession HttpSession httpSession = ((HttpServletRequest) sre.getServletRequest()).getSession(); log.debug("將全部request請求都攜帶上httpSession " + httpSession.getId()); } public RequestListener() { } @Override public void requestDestroyed(ServletRequestEvent arg0) { } }
以上就是一個websocket服務端須要的全部配置和類
websocket客戶端代碼apache
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.curator.shaded.com.google.common.collect.Maps; import redis.clients.jedis.Jedis; import javax.websocket.*; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; /** * @author caoting * @date 2018年9月27日 */ @Slf4j @ClientEndpoint public class MailWebSocketClient { private static RedisCacheUtil redis; protected void setRedis(RedisCacheUtil redisTmp) { redis = redisTmp; } /** * @author caoting * @date 2019年3月11日 */ public static void doSomething() { // TODO 因爲這個類沒有寫初始化方法,可是有些初始化操做必須完成, // 所以在socket配置類中調用此方法能夠完成一些須要初始化注入的操做 } private Session session; @OnOpen public void open(Session session) { log.info("鏈接開啓..."); this.session = session; } @OnMessage public void onMessage(String message) { log.info("來自服務端的消息: " + message); // TODO 對消息進行過濾判斷處理 // 不作過多操做影響性能 直接交給異步任務處理--這個辦法仍是比較low的如今springboot有更好的解決辦法@Async 有空再記錄下多線程異步處理任務調度的相關代碼。 ExecutorService executor = Executors.newSingleThreadExecutor(); FutureTask<Boolean> future = new FutureTask<Boolean>(new Callable<Boolean>() {// 使用Callable接口做爲構造參數 public Boolean call() { return pushMsg(message); } }); executor.execute(future); Boolean res = CommomUtil.timeOutTask(future, executor, 600); if (res != null && res) log.info("操做成功"); else log.info("操做失敗"); } /** * @author caoting * @date 2019年3月11日 */ private Boolean pushMailMsg(String message) { Boolean flag = true; // 推送消息 ReceiverRes resObj = new ReceiverRes(); try { resObj = restTemplate.httpPostMediaTypeJson(url, ReceiverRes.class, message); } catch (Exception e) { // 這裏異常通常是http接口服務宕機了,因此放進緩存在對方上線時進行從新推送 resObj.setCode(500); log.error(e.getMessage(), e); } // ====推送完成後的後續異常檢查與數據重發工做 這裏是一個redis任務調度 處理失敗任務的典型案例 看不懂就刪掉 Integer code = resObj.getCode(); if (code == 500) { // 發送失敗存進redis緩存 按照約定好的狀態碼進行判斷 jedis.lpush(Constant.PUSH_ERROR, mailMapJson); } else { // 發送成功之後查詢之前出錯的數據進行從新推送。--這種辦法只適合消息很頻繁的,畢竟不頻繁的等下次發消息又不知道是什麼時候了,所以須要採用別的方法 while (true) { // 查詢以往的異常發送數據 從新發送 String jsonMap = jedis.rpoplpush(Constant.PUSH_ERROR, Constant.PUSH_ERROR_TMP); if (StringUtils.isEmpty(jsonMap)) { break; } try { errObj = restTemplate.httpPostMediaTypeJson(receiverUrl, ReceiverRes.class, message); } catch (Exception e) { errObj.setCode(500); log.error(e.getMessage(), e); } if (errObj.getCode() == 500) { // 再次失敗 彈回原隊列 jedis.rpoplpush(Constant.PUSH_ERROR_TMP, Constant.PUSH_ERROR); } else { jedis.rpop(Constant.PUSH_ERROR_TMP); } } } return flag; } @OnClose public void onClose() { log.info("長鏈接關閉..."); } @OnError public void onError(Session session, Throwable t) { t.printStackTrace(); } public void send(String message) { this.session.getAsyncRemote().sendText(message); } public void close() throws IOException { if (this.session.isOpen()) { this.session.close(); } } }
上面是websocket客戶端的代碼。其中主要有:一、http推送失敗重發機制
二、redis任務調度經典案例
websocket客戶端配置類WebSocketConfigjson
import com.hnpolice.business.service.ApplicationService; import com.hnpolice.sso.common.ex.BaseException; import com.hnpolice.sso.common.utils.RedisCacheUtil; import com.hnpolice.sync.RestTemplateFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import javax.websocket.ContainerProvider; import javax.websocket.WebSocketContainer; import java.net.URI; @Slf4j @Component public class WebSocketConfig implements ApplicationRunner { @Autowired private RedisCacheUtil redisTmp; private static Boolean isOk; private MailWebSocketClient client; private static WebSocketContainer conmtainer = ContainerProvider.getWebSocketContainer(); @Override public void run(ApplicationArguments args) throws Exception { // 跟隨項目啓動的方法能夠在這裏作一些初始化工做 // websocket客戶端初始化 wsClientInit(); } public void wsClientInit() { try { client = new MailWebSocketClient(); client.setRedis(redisTmp); MailWebSocketClient.dosomething(); conmtainer.connectToServer(client, new URI(##socket服務鏈接地址##)); isOk = true; } catch (Exception e) { isOk = false; log.error(e); } // 斷線重連 while (true) { if (isOk != null && isOk) { try { client.send("ping:"+appId); } catch (Exception e) { isOk = false; } } else { // 系統鏈接失敗進行重試 log.warn("系統鏈接失敗,正在重連..."); try { client.send("ping:"+appId); log.warn("系統重連成功!"); isOk = true; } catch (Exception e) { try { client = new MailWebSocketClient(); conmtainer.connectToServer(client, new URI(mailUrl)); isOk = true; } catch (Exception e1) { isOk = false; } if (isOk != null && isOk) { log.warn("系統重連成功!"); } } } try { Thread.sleep(30000); } catch (InterruptedException e) { log.error(BaseException.collectExceptionStackMsg(e)); e.printStackTrace(); } } } }
這是websocket客戶端的配置類,實現ApplicationRunner 接口是爲了在項目啓動時完成一些初始化工做,並不是必須。主要功能:一、協助websocketCient進行初始化
,二、心跳包檢測,斷線自動重連
消息推送的第二種方式在下篇中再編寫緩存