在通訊場景中比較常見的模式爲客戶端發送請求給服務端,服務端再回以響應。還有一種通訊模式爲服務端主動Push消息給客戶端,這種通訊一般有兩種場景。java
場景一
某個客戶端發送指令給服務端,觸發服務端push消息至其餘客戶端,例如:IM。 git
場景二
服務端基於某種業務場景主動Push消息至相連的客戶端,例如:APP消息推送。 編程
本文以場景一爲例演示如何經過smart-socket實現Push消息下發,首先咱們須要定義三個角色:session
通訊編程的首要步驟,則是定義通訊協議。出於演示目的,咱們採用length+data的協議格式,即採用4個字節長度的int值表示消息頭,而該int數值的大小表明着消息體的長度。SendClient與PushServer,PushServer與ReceiverClient皆採用此協議通訊。socket
public class StringProtocol implements Protocol<String> { @Override public String decode(ByteBuffer readBuffer, AioSession<String> session) { int remaining = readBuffer.remaining(); if (remaining < Integer.BYTES) { return null; } readBuffer.mark(); int length = readBuffer.getInt(); if (length > readBuffer.remaining()) { readBuffer.reset(); return null; } byte[] b = new byte[length]; readBuffer.get(b); readBuffer.mark(); return new String(b); } }
PushServer的處理器須要具有如下幾方面能力:ide
public class PushServerProcessorMessage implements MessageProcessor<String> { private static final Logger LOGGER = LoggerFactory.getLogger(PushServerProcessorMessage.class); private Map<String, AioSession<String>> sessionMap = new ConcurrentHashMap<>(); @Override public void process(AioSession<String> session, String msg) { LOGGER.info("收到SendClient發送的消息:{}", msg); byte[] bytes = msg.getBytes(); sessionMap.values().forEach(onlineSession -> { if (session == onlineSession) { return; } WriteBuffer writeBuffer = onlineSession.writeBuffer(); try { LOGGER.info("發送Push至ReceiverClient:{}", onlineSession.getSessionID()); writeBuffer.writeInt(bytes.length); writeBuffer.write(bytes); writeBuffer.flush(); } catch (Exception e) { LOGGER.error("Push消息異常", e); } }); } @Override public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { switch (stateMachineEnum) { case NEW_SESSION: LOGGER.info("與客戶端:{} 創建鏈接", session.getSessionID()); sessionMap.put(session.getSessionID(), session); break; case SESSION_CLOSED: LOGGER.info("斷開客戶端鏈接: {}", session.getSessionID()); sessionMap.remove(session.getSessionID()); break; default: } } }
本文簡化了消息接收者的處理邏輯,只是打印一行日誌用於觀察。實際應用中須要根據收到的消息執行一些業務邏輯。ui
public class PushClientProcessorMessage implements MessageProcessor<String> { private static final Logger LOGGER = LoggerFactory.getLogger(PushClientProcessorMessage.class); @Override public void process(AioSession<String> session, String msg) { LOGGER.info("ReceiverClient:{} 收到Push消息:{}", session.getSessionID(), msg); } @Override public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) { } }
啓動服務端:PushServer日誌
public class PushServer { public static void main(String[] args) throws IOException { AioQuickServer<String> server = new AioQuickServer<>(8080, new StringProtocol(), new PushServerProcessorMessage()); server.start(); } }
啓動接收者:ReceiverClientcode
public class ReceiverClient { public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }); StringProtocol protocol = new StringProtocol(); PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage(); AioQuickClient<String>[] clients = new AioQuickClient[4]; for (int i = 0; i < clients.length; i++) { clients[i] = new AioQuickClient<>("localhost", 8080, protocol, clientProcessorMessage); clients[i].start(channelGroup); } } }
啓動發送者:SenderClientserver
public class SenderClient { public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { StringProtocol protocol = new StringProtocol(); PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage(); AioQuickClient<String> clients = new AioQuickClient("localhost", 8080, protocol, clientProcessorMessage); AioSession<String> session = clients.start(); byte[] msg = "HelloWorld".getBytes(); while (true) { WriteBuffer writeBuffer = session.writeBuffer(); writeBuffer.writeInt(msg.length); writeBuffer.write(msg); writeBuffer.flush(); Thread.sleep(1000); } } }
SenderClient每秒中發送一條:「HelloWorld」 消息至 PushServer。觀察 PushServer控制檯 能夠看到服務端接收到消息以後,會便可轉發至 ReceiverClient。
而後再去觀察 ReceiverClient控制檯,則會打印服務端Push過來的消息。
本文經過一個簡單的示例,演示了Push服務的實現原理。實際場景下還包括不少可靠性方面的問題須要考慮,感興趣的讀者可自行研究。
本文涉及到的示例代碼可從smart-socket倉庫中下載