Netty實現高性能IOT服務器(Groza)之精盡代碼篇中

 

 

運行環境:php

  • JDK 8+html

  • Maven 3.0+java

  • Redispython

技術棧:git

  • SpringBoot 2.0+github

  • Redis (Lettuce客戶端,RedisTemplate模板方法)golang

  • Netty 4.1+web

  • MQTT 3.1.1redis

IDE:docker

  • IDEA或者Eclipse

  • Lombok插件

 

簡介

近年來,物聯網高歌猛進,美國有「工業互聯網」,德國有「工業4.0」,我國也有「中國製造2025」,這背後都是雲計算、大數據。據波士頓諮詢報告,單單中國製造業,雲計算、大數據、人工智能等新技術就能爲其帶來高達6萬億的額外附加值。

國內外巨頭紛紛駐足工業互聯網,國外如亞馬遜AWS、微軟Azure,國內則是三大電信運營商、百度雲、華爲、金山雲等,其中騰訊雲、阿里雲最甚,還拉來了傳統制造大佬,國內巨頭紛紛在物聯網上佈局。在2018雲棲-深圳峯會上,阿里巴巴資深副總裁,阿里雲總裁胡曉明宣佈阿里巴巴將正式進軍IoT。胡曉明表示,IoT是阿里巴巴集團繼電商、金融、物流、雲計算以後的一條新的主賽道。

 

IOT技術窺探

以上這些內容,做者做爲一個開發人員,並非一個投資人員和創業先鋒。並不太關係這些具體細節。我所關心的是如何用技術去實現或者模擬一個支持百萬連接的IOT服務器,並不嚴謹,僅作你們參考。

關於爲何選用下圖的中間件或者對MQTT不太瞭解的話,能夠閱讀我以前的2篇文章:

  1. IOT高性能服務器實現之路

  2. Netty實現高性能IOT服務器(Groza)之手撕MQTT協議篇上

技術輪廓圖

 

 

快速入門

 

運行測試

 

  1. git clone https://github.com/sanshengshui/netty-learning-example
  2. cd netty-iot
  3. 運行 NettyIotApplication
  4. 打開 http://localhost:8080/groza/v1/123456/auth,獲取密碼!
  5. 啓動Eclipse Paho,並填寫用戶名和密碼,便可鏈接。
  6. 另起一個Eclipse Paho,訂閱隨意主題,例如test。另外一個Eclipse Paho發佈主題test。便可收到消息。
  7. 取消主題訂閱,再次發佈消息。就收不到消息。

     

 

有了前面2篇文章的鋪墊並學習了MQTT V3.1.1 協議,說了那麼多,手癢癢的很。

You build it, You run it!

項目結構介紹

 netty-iot
        ├── auth -- 認證
          ├── service -- 用戶名,密碼認證明現類
          ├── util -- 認證工具類
        ├── common -- 公共類
          ├── auth -- 用戶名,密碼認證接口
          ├── message -- 協議存儲實體及接口類
          ├── session -- session存儲實體及接口類
          ├── subscribe -- 訂閱存儲實體及接口類
        ├── config -- Redis配置
        ├── protocol -- MQTT協議實現
        ├── server -- MQTT服務器
        ├── store -- Redis數據存儲
          ├── cache 
          ├── message 
          ├── session
          ├── subscribe
        ├── web -- web服務
        ├── NettyIotApplication -- 服務啓動類

 

Redis

安裝

體驗 Redis 須要使用 Linux 或者 Mac 環境,若是是 Windows 能夠考慮使用虛擬機。主要方式有四種:

  • 使用 Docker 安裝。

  • 經過 Github 源碼編譯。

  • 直接安裝 apt-get install(Ubuntu)、yum install(RedHat) 或者 brew install(Mac)。

  • 若是讀者懶於安裝操做,也可使用網頁版的 Web Redis 直接體驗。

具體操做以下:

Docker 方式

  # 拉取 redis 鏡像
  > docker pull redis
  # 運行 redis 容器
  > docker run --name myredis -d -p6379:6379 redis
  # 執行容器中的 redis-cli,能夠直接使用命令行操做 redis
  > docker exec -it myredis redis-cli...

 

Github 源碼編譯方式

  # 下載源碼
  > git clone --branch 2.8 --depth 1 git@github.com:antirez/redis.git
  > cd redis
  # 編譯
  > make
  > cd src
  # 運行服務器,daemonize表示在後臺運行
  > ./redis-server --daemonize yes
  # 運行命令行
  > ./redis-cli...

 

直接安裝方式

  # mac
  > brew install redis
  # ubuntu
  > apt-get install redis
  # redhat
  > yum install redis
  # 運行客戶端
  > redis-cli

 

使用

Spring Boot除了支持常見的ORM框架外,更是對經常使用的中間件提供了很是好封裝,隨着Spring Boot2.x的到來,支持的組件愈來愈豐富,也愈來愈成熟,其中對Redis的支持不只僅是豐富了它的API,更是替換掉底層Jedis的依賴,取而代之換成了Lettuce(生菜),你們能夠參考這篇文章對工程進行配置。因此我使用Lettuce做爲客戶端來對個人MQTT協議傳輸的消息進行緩存。

下列的是Redis所對應的操做方式

  • opsForValue: 對應 String(字符串)

  • opsForZSet: 對應 ZSet(有序集合)

  • opsForHash: 對應 Hash(哈希)

  • opsForList: 對應 List(列表)

  • opsForSet: 對應 Set(集合)

  • opsForGeo: 對應 GEO(地理位置)

我主要使用opsForValue,opsForHashopsForZSet,對於字符串。我推薦使用StringRedisTemplate

如下對於opsForValue和opsForHash的基礎操做,我在這裏簡短的講解一下。

 

Redis的Hash數據機構

Redis的散列可讓用戶將多個鍵值對存儲到一個Redis鍵裏面。 public interface HashOperations<H,HK,HV> HashOperations提供一系列方法操做hash:

 java > template.opsForHash().put("books","java","think in java");
  redis-cli > hset books java "think in java"  # 命令行的字符串若是包含空格,要用引號括起來
  (integer) 1
  ------
  java > template.opsForHash().put("books","golang","concurrency in go");
  redis-cli > hset books golang "concurrency in go"
  (integer) 1
  ------
  java > template.opsForHash().put("books","python","python cookbook");
  redis-cli > hset books python "python cookbook"
  (integer) 1
  ------
  java > template.opsForHash().entries("books")
  redis-cli > hgetall books  # entries(),key 和 value 間隔出現
  1) "java"
  2) "think in java"
  3) "golang"
  4) "concurrency in go"
  5) "python"
  6) "python cookbook"
  ------
  java > template.opsForHash().size("books")
  redis-cli > hlen books
  (integer) 3
  ------
  java > template.opsForHash().get("redisHash","age")
  redi-cli > hget books java
  "think in java"
  ------
  java > 
  Map<String,Object> testMap = new HashMap();
        testMap.put("java","effective java");
        testMap.put("python","learning python");
        testMap.put("golang","modern golang programming");
  template.opsForHash().putAll("books",testMap);
  redis-cli > hmset books java "effective java" python "learning python" golang "modern golang programming"  # 批量 set
  OK...

 


Redis的Set數據結構

Redis的Set是string類型的無序集合。集合成員是惟一的,這就意味着集合中不能出現重複的數據。 Redis 中 集合是經過哈希表實現的,因此添加,刪除,查找的複雜度都是O(1)。

java > template.opsForSet().add("python","java","golang")
  redis-cli > sadd books python java golang
  (integer) 3
  ------
  java > template.opsForSet().members("books")
  redis-cli > smembers books  # 注意順序,和插入的並不一致,由於 set 是無序的
  1) "java"
  2) "python"
  3) "golang"
  ------
  java > template.opsForSet().isMember("books","java")
  redis-cli > sismember books java  # 查詢某個 value 是否存在,至關於 contains(o)
  (integer) 1
  ------
  java > template.opsForSet().size("books")
  redis-cli > scard books  # 獲取長度至關於 count()
  (integer) 3
  ------
  java > template.opsForSet().pop("books")
  redis-cli > spop books  # 彈出一個
  "java"...

 

 

MQTT

MQTT是一種輕量級的發佈/訂閱消息傳遞協議,最初由IBM和Arcom(後來成爲Eurotech的一部分)於1998年左右建立。如今,MQTT 3.1.1規範已由OASIS聯盟標準化。

 

客戶端下載

 

對於MQTT客戶端,我選用Eclipse Paho,Eclipse Paho項目提供針對物聯網(IoT)的新的,現有的和新興的應用程序的MQTT和MQTT-SN消息傳遞協議的開源客戶端實現。具體下載地址,你們根據本身的操做系統自行下載。

 

MQTT控制報文

  ├── Connect -- 鏈接服務端
  ├── DisConnect -- 斷開鏈接
  ├── PingReq -- 心跳請求
  ├── PubAck -- 發佈確認
  ├── PubComp -- 發佈完成(QoS2,第散步)
  ├── Publish -- 發佈消息
  ├── PubRec -- 發佈收到(QoS2,第一步)
  ├── PubRel -- 發佈釋放(QoS2,第二步)
  ├── Subscribe -- 訂閱主題
  ├── UnSubscribe -- 取消訂閱

 

Connect

讓咱們對照着MQTT 3.1.1協議來實現客戶端Connect協議。

  1. 當咱們對消息解碼時,若是協議名不正確服務端能夠斷開客戶端的鏈接,按照本規範,服務端不能繼續處理CONNECT報。

  2. 服務端使用客戶端標識符 (ClientId) 識別客戶端。鏈接服務端的每一個客戶端都有惟一的客戶端標識符(ClientId)。

    // 消息解碼器出現異常
              if (msg.decoderResult().isFailure()) {
                  Throwable cause = msg.decoderResult().cause();
                  if (cause instanceof MqttUnacceptableProtocolVersionException) {
                      // 不支持的協議版本
                      MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                              new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                              new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
                      channel.writeAndFlush(connAckMessage);
                      channel.close();
                      return;
                  } else if (cause instanceof MqttIdentifierRejectedException) {
                      // 不合格的clientId
                      MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                              new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                              new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
                      channel.writeAndFlush(connAckMessage);
                      channel.close();
                      return;
                  }
                  channel.close();
                  return;
              }

     

  3. clientId爲空或null的狀況, 這裏要求客戶端必須提供clientId, 無論cleanSession是否爲1, 此處沒有參考標準協議實現

          
      if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
                  MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                          new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                          new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
                  channel.writeAndFlush(connAckMessage);
                  channel.close();
                  return;
              }

     

  4. 用戶名和密碼驗證, 這裏要求客戶端鏈接時必須提供用戶名和密碼, 不論是否設置用戶名標誌和密碼標誌爲1, 此處沒有參考標準協議實現

              
       String username = msg.payload().userName();
                 String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
                 if (!grozaAuthService.checkValid(username,password)) {
                     MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                             new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                             new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
                     channel.writeAndFlush(connAckMessage);
                     channel.close();
                     return;
                 }

     

  1. 若是會話中已存儲這個新鏈接的clientId, 就關閉以前該clientId的鏈接

      if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){
                  SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier());
                  Channel previous = sessionStore.getChannel();
                  Boolean cleanSession = sessionStore.isCleanSession();
                  if (cleanSession){
                      grozaSessionStoreService.remove(msg.payload().clientIdentifier());
                      grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier());
                      grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
                      grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
                  }
                  previous.close();
              }

     

     

  2. 處理遺囑信息

     SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null);
              if (msg.variableHeader().isWillFlag()){
                  MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                          new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0),
                          new MqttPublishVariableHeader(msg.payload().willTopic(),0),
                          Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes())
                  );
                  sessionStore.setWillMessage(willMessage);
              }

     

  3. 處理鏈接心跳包

     if (msg.variableHeader().keepAliveTimeSeconds() > 0){
                  if (channel.pipeline().names().contains("idle")){
                      channel.pipeline().remove("idle");
                  }
                  channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, Math.round(msg.variableHeader().keepAliveTimeSeconds() * 1.5f)));
              }

     

    至此存儲會話消息及返回接受客戶端鏈接 將clientId存儲到channel的map中
  4. grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore);
              channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
              Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
              MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                      new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0),
                      new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent),
                      null
              );
              channel.writeAndFlush(okResp);

     

  5. 若是cleanSession爲0, 須要重發同一clientId存儲的未完成的QoS1和QoS2的DUP消息

       if (!msg.variableHeader().isCleanSession()){
                  List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier());
                  List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
                  dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
                      MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage(
                              new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0),
                              new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()),
                              Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())
                      );
                      channel.writeAndFlush(publishMessage);
                  });
                  dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
                      MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
                              new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0),
                              MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()),
                              null
                      );
                      channel.writeAndFlush(pubRelMessage);
                  });
              }

     

    其餘MQTT報文你們對照着工程並對照着MQTT v3.1.1自行查看!

用戶名密碼認證

 /**
   * 用戶名和密碼認證服務
   * @author 穆書偉
   */
  @Service
  public class AuthServiceImpl implements GrozaAuthService {
      private RSAPrivateKey privateKey;
  ​
      @Override
      public boolean checkValid(String username, String password) {
          if (StringUtils.isEmpty(username)){
              return false;
          }
          if (StringUtils.isEmpty(password)){
              return false;
          }
          RSA rsa = new RSA(privateKey,null);
          String value = rsa.encryptBcd(username, KeyType.PrivateKey);
          return value.equals(password) ? true : false;
      }
  ​
      @PostConstruct
      public void init() {
          privateKey = IoUtil.readObj(AuthServiceImpl.class.getClassLoader().getResourceAsStream("keystore/auth-private.key"));
      }
  }

 

其餘

關於Netty實現高性能IOT服務器(Groza)之精盡代碼篇中詳解到這裏就結束了。

原創不易,若是感受不錯,但願給個推薦!您的支持是我寫做的最大動力!

下文會帶你們推動Netty實現MQTT協議的IOT服務器。

版權聲明:

做者:穆書偉

博客園出處:https://www.cnblogs.com/sanshengshui

github出處:https://github.com/sanshengshui    

我的博客出處:https://sanshengshui.github.io/

相關文章
相關標籤/搜索