netty的pipeline處理鏈上的handler:須要IdleStateHandler心跳檢測channel是否有效,以及處理登陸認證的UserAuthHandler和消息處理MessageHandlerjava
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(defLoopGroup, //編碼解碼器 new HttpServerCodec(), //將多個消息轉換成單一的消息對象 new HttpObjectAggregator(65536), //支持異步發送大的碼流,通常用於發送文件流 new ChunkedWriteHandler(), //檢測鏈路是否讀空閒,配合心跳handler檢測channel是否正常 new IdleStateHandler(60, 0, 0), //處理握手和認證 new UserAuthHandler(), //處理消息的發送 new MessageHandler() ); }
對於全部連進來的channel,咱們須要保存起來,日後的羣發消息須要依靠這些channelgit
public static void addChannel(Channel channel) { String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel); System.out.println("addChannel:" + remoteAddr); if (!channel.isActive()) { logger.error("channel is not active, address: {}", remoteAddr); } UserInfo userInfo = new UserInfo(); userInfo.setAddr(remoteAddr); userInfo.setChannel(channel); userInfo.setTime(System.currentTimeMillis()); userInfos.put(channel, userInfo); }
登陸後,channel就變成有效的channel,無效的channel以後將會丟棄github
public static boolean saveUser(Channel channel, String nick, String password) { UserInfo userInfo = userInfos.get(channel); if (userInfo == null) { return false; } if (!channel.isActive()) { logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick); return false; } // 驗證用戶名和密碼 if (nick == null || password == null) { return false; } LambdaQueryWrapper<Account> lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password); Account account = accountMapperStatic.selectOne(lambdaQueryWrapper); if (account == null) { return false; } // 增長一個認證用戶 userCount.incrementAndGet(); userInfo.setNick(nick); userInfo.setAuth(true); userInfo.setId(account.getId()); userInfo.setUsername(account.getUsername()); userInfo.setGroupNumber(account.getGroupNumber()); userInfo.setTime(System.currentTimeMillis()); // 註冊該用戶推送消息的通道 offlineInfoTransmitStatic.registerPull(channel); return true; }
當channel關閉時,就再也不接收消息。unregisterPull就是註銷信息消費者,客戶端再也不接取聊天消息。此外,從下方有一個加寫鎖的操做,就是爲了不channel還在發送消息時,這邊忽然關閉channel,這樣會致使報錯。app
public static void removeChannel(Channel channel) { try { logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel)); //加上讀寫鎖保證移除channel時,避免channel關閉時,還有別的線程對其操做,形成錯誤 rwLock.writeLock().lock(); channel.close(); UserInfo userInfo = userInfos.get(channel); if (userInfo != null) { if (userInfo.isAuth()) { offlineInfoTransmitStatic.unregisterPull(channel); // 減去一個認證用戶 userCount.decrementAndGet(); } userInfos.remove(channel); } } finally { rwLock.writeLock().unlock(); } }
爲了無縫切換使用rabbitmq、rocketmq、activemq、不使用中間件存儲和轉發聊天消息這4種狀態,定義以下4個接口。依次是發送單聊消息、羣聊消息、客戶端啓動接收消息、客戶端下線不接收消息。異步
public interface OfflineInfoTransmit { void pushP2P(Integer userId, String message); void pushGroup(String groupNumber, String message); void registerPull(Channel channel); void unregisterPull(Channel channel); }
其中,如何使用rabbitmq、rocketmq、activemq三種中間件中的一種來存儲和轉發聊天消息,它的處理流程以下:oop