smart-socket實戰:服務端主動Push消息至客戶端

在通訊場景中比較常見的模式爲客戶端發送請求給服務端,服務端再回以響應。還有一種通訊模式爲服務端主動Push消息給客戶端,這種通訊一般有兩種場景。java

場景一
某個客戶端發送指令給服務端,觸發服務端push消息至其餘客戶端,例如:IM。 git

場景二
服務端基於某種業務場景主動Push消息至相連的客戶端,例如:APP消息推送。 編程


本文以場景一爲例演示如何經過smart-socket實現Push消息下發,首先咱們須要定義三個角色:session

  • SendClient:消息發送者,該客戶端會發送消息至服務端,再由服務端push至其餘客戶端。
  • ReceiverClient:消息接收者,接收服務端Push過來的消息。
  • PushServer:Push服務端,接收 SendClient 發送的消息,並轉發給其餘客戶端ReceiverClient。

第一步:定義協議

通訊編程的首要步驟,則是定義通訊協議。出於演示目的,咱們採用length+data的協議格式,即採用4個字節長度的int值表示消息頭,而該int數值的大小表明着消息體的長度。SendClient與PushServer,PushServer與ReceiverClient皆採用此協議通訊。socket

public class StringProtocol implements Protocol<String> {

    @Override
    public String decode(ByteBuffer readBuffer, AioSession<String> session) {
        int remaining = readBuffer.remaining();
        if (remaining < Integer.BYTES) {
            return null;
        }
        readBuffer.mark();
        int length = readBuffer.getInt();
        if (length > readBuffer.remaining()) {
            readBuffer.reset();
            return null;
        }
        byte[] b = new byte[length];
        readBuffer.get(b);
        readBuffer.mark();
        return new String(b);
    }
}

第二步:Push服務端處理器

PushServer的處理器須要具有如下幾方面能力:ide

  1. 維護全部客戶端鏈接。客戶端與服務端創建鏈接後將 AioSession 存放至 sessionMap 中,斷開鏈接時則從Map中移除掉。
  2. 接收SendClient發送的消息,並Push給其餘客戶端。
public class PushServerProcessorMessage implements MessageProcessor<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushServerProcessorMessage.class);
    private Map<String, AioSession<String>> sessionMap = new ConcurrentHashMap<>();

    @Override
    public void process(AioSession<String> session, String msg) {
        LOGGER.info("收到SendClient發送的消息:{}", msg);
        byte[] bytes = msg.getBytes();
        sessionMap.values().forEach(onlineSession -> {
            if (session == onlineSession) {
                return;
            }
            WriteBuffer writeBuffer = onlineSession.writeBuffer();
            try {
                LOGGER.info("發送Push至ReceiverClient:{}", onlineSession.getSessionID());
                writeBuffer.writeInt(bytes.length);
                writeBuffer.write(bytes);
                writeBuffer.flush();
            } catch (Exception e) {
                LOGGER.error("Push消息異常", e);
            }
        });
    }

    @Override
    public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {
        switch (stateMachineEnum) {
            case NEW_SESSION:
                LOGGER.info("與客戶端:{} 創建鏈接", session.getSessionID());
                sessionMap.put(session.getSessionID(), session);
                break;
            case SESSION_CLOSED:
                LOGGER.info("斷開客戶端鏈接: {}", session.getSessionID());
                sessionMap.remove(session.getSessionID());
                break;
            default:
        }
    }
}

第三步:ReceiverClient處理器

本文簡化了消息接收者的處理邏輯,只是打印一行日誌用於觀察。實際應用中須要根據收到的消息執行一些業務邏輯。ui

public class PushClientProcessorMessage implements MessageProcessor<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushClientProcessorMessage.class);

    @Override
    public void process(AioSession<String> session, String msg) {
        LOGGER.info("ReceiverClient:{} 收到Push消息:{}", session.getSessionID(), msg);
    }

    @Override
    public void stateEvent(AioSession<String> session, StateMachineEnum stateMachineEnum, Throwable throwable) {

    }
}

第四步:啓動服務

啓動服務端:PushServer日誌

public class PushServer {
    public static void main(String[] args) throws IOException {
        AioQuickServer<String> server = new AioQuickServer<>(8080, new StringProtocol(), new PushServerProcessorMessage());
        server.start();
    }
}

啓動接收者:ReceiverClientcode

public class ReceiverClient {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });
        StringProtocol protocol = new StringProtocol();
        PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage();
        AioQuickClient<String>[] clients = new AioQuickClient[4];
        for (int i = 0; i < clients.length; i++) {
            clients[i] = new AioQuickClient<>("localhost", 8080, protocol, clientProcessorMessage);
            clients[i].start(channelGroup);
        }
    }
}

啓動發送者:SenderClientserver

public class SenderClient {
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        StringProtocol protocol = new StringProtocol();
        PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage();
        AioQuickClient<String> clients = new AioQuickClient("localhost", 8080, protocol, clientProcessorMessage);
        AioSession<String> session = clients.start();
        byte[] msg = "HelloWorld".getBytes();
        while (true) {
            WriteBuffer writeBuffer = session.writeBuffer();
            writeBuffer.writeInt(msg.length);
            writeBuffer.write(msg);
            writeBuffer.flush();
            Thread.sleep(1000);
        }
    }
}

第五步:觀察控制檯

SenderClient每秒中發送一條:「HelloWorld」 消息至 PushServer。觀察 PushServer控制檯 能夠看到服務端接收到消息以後,會便可轉發至 ReceiverClient。

而後再去觀察 ReceiverClient控制檯,則會打印服務端Push過來的消息。

最後

本文經過一個簡單的示例,演示了Push服務的實現原理。實際場景下還包括不少可靠性方面的問題須要考慮,感興趣的讀者可自行研究。

本文涉及到的示例代碼可從smart-socket倉庫中下載

相關文章
相關標籤/搜索