netty無縫切換rabbitmq、activemq、rocketmq實現聊天室單聊、羣聊功能

file

圖片描述

netty的pipeline處理鏈上的handler:須要IdleStateHandler心跳檢測channel是否有效,以及處理登陸認證的UserAuthHandler和消息處理MessageHandlerjava

protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(defLoopGroup,
        //編碼解碼器
        new HttpServerCodec(),
        //將多個消息轉換成單一的消息對象
        new HttpObjectAggregator(65536),
        //支持異步發送大的碼流,通常用於發送文件流
        new ChunkedWriteHandler(),
        //檢測鏈路是否讀空閒,配合心跳handler檢測channel是否正常
        new IdleStateHandler(60, 0, 0),
        //處理握手和認證
        new UserAuthHandler(),
        //處理消息的發送
        new MessageHandler()
    );
}

對於全部連進來的channel,咱們須要保存起來,日後的羣發消息須要依靠這些channelgit

public static void addChannel(Channel channel) {
        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
        System.out.println("addChannel:" + remoteAddr);
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}", remoteAddr);
        }
        UserInfo userInfo = new UserInfo();
        userInfo.setAddr(remoteAddr);
        userInfo.setChannel(channel);
        userInfo.setTime(System.currentTimeMillis());
        userInfos.put(channel, userInfo);
    }

登陸後,channel就變成有效的channel,無效的channel以後將會丟棄github

public static boolean saveUser(Channel channel, String nick, String password) {
        UserInfo userInfo = userInfos.get(channel);
        if (userInfo == null) {
            return false;
        }
        if (!channel.isActive()) {
            logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
            return false;
        }
        // 驗證用戶名和密碼
        if (nick == null || password == null) {
            return false;
        }
        LambdaQueryWrapper<Account> lambdaQueryWrapper = new LambdaQueryWrapper<>();
        lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
        Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
        if (account == null) {
            return false;
        }
        // 增長一個認證用戶
        userCount.incrementAndGet();
        userInfo.setNick(nick);
        userInfo.setAuth(true);
        userInfo.setId(account.getId());
        userInfo.setUsername(account.getUsername());
        userInfo.setGroupNumber(account.getGroupNumber());
        userInfo.setTime(System.currentTimeMillis());

        // 註冊該用戶推送消息的通道
        offlineInfoTransmitStatic.registerPull(channel);
        return true;
    }

當channel關閉時,就再也不接收消息。unregisterPull就是註銷信息消費者,客戶端再也不接取聊天消息。此外,從下方有一個加寫鎖的操做,就是爲了不channel還在發送消息時,這邊忽然關閉channel,這樣會致使報錯。app

public static void removeChannel(Channel channel) {
        try {
            logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
            //加上讀寫鎖保證移除channel時,避免channel關閉時,還有別的線程對其操做,形成錯誤
            rwLock.writeLock().lock();
            channel.close();
            UserInfo userInfo = userInfos.get(channel);
            if (userInfo != null) {
                if (userInfo.isAuth()) {
                    offlineInfoTransmitStatic.unregisterPull(channel);
                    // 減去一個認證用戶
                    userCount.decrementAndGet();
                }
                userInfos.remove(channel);
            }
        } finally {
            rwLock.writeLock().unlock();
        }

    }

爲了無縫切換使用rabbitmq、rocketmq、activemq、不使用中間件存儲和轉發聊天消息這4種狀態,定義以下4個接口。依次是發送單聊消息、羣聊消息、客戶端啓動接收消息、客戶端下線不接收消息。異步

public interface OfflineInfoTransmit {
    void pushP2P(Integer userId, String message);

    void pushGroup(String groupNumber, String message);

    void registerPull(Channel channel);

    void unregisterPull(Channel channel);
}

其中,如何使用rabbitmq、rocketmq、activemq三種中間件中的一種來存儲和轉發聊天消息,它的處理流程以下:oop

  1. 單聊的模型參考線程池的模型,若是用戶在線,直接經過channel發送給用戶。若是用戶離線,則發往中間件存儲,下次用戶上線時直接從中間件拉取消息。這樣作對比全部消息的發送都經過中間件來轉的好處是提高了性能
  2. 羣聊則是徹底經過中間件來轉發消息,消息發送中間件,客戶端從中間件接取消息。若是仍像單聊那樣操做,在線用戶直接經過channel發送,操做過於繁瑣,要判斷這個羣組的哪些用戶是否在線
  3. 若是用戶在線就註冊消費者,從中間件接取消息。不然,就斷開消費者,消息保留在中間件中,以便客戶端下次上線時拉取。這樣就實現了離線消息的接收。
  4. 無論使用哪一種中間件或使用不使用中間件,它的處理流程都遵循上面的3個要求,就能無縫切換上方的4種方法來存儲和轉發消息。須要哪一種方法開啓相應註解便可。

file

項目地址:https://github.com/shuangyueliao/netty-chat性能

相關文章
相關標籤/搜索