Netty網絡編程實戰 - 用Netty手寫實現一個UDP通知消息廣播

前言
程序員

UDP 是面向無鏈接的通信協議,UDP 數據包括目的端口號和源端口號信息, 因爲通信不須要鏈接,因此能夠實現廣播發送。Netty也爲咱們封裝相關支持UDP諸多組件、數據報文和處理器。編程


UDP 通信時不須要接收方確認,屬於不可靠的傳輸,可能會出現丟包現象, 實際應用中要求程序員編程驗證。網絡

UDP 與 TCP 位於同一層,但它無論數據包的順序、錯誤或重發。所以,UDP 不被應用於那些面向鏈接的服務,UDP 主要用於那些面向查詢---應答的服務,例 如 NFS。相對於 FTP 或 Telnet,這些服務須要交換的信息量較小。使用 UDP 的服 務包括 NTP(網絡時間協議)和 DNS(DNS 也使用 TCP),包總量較少的通訊(DNS、 SNMP 等);2.視頻、音頻等多媒體通訊(即時通訊);3.限定於 LAN 等特定網 絡中的應用通訊;4.廣播通訊(廣播、多播)。app

image.png


經常使用的 QQ,就是一個以 UDP 爲主,TCP 爲輔的通信協議。 TCP 和 UDP 的優缺點沒法簡單地、絕對地去作比較:TCP 用於在傳輸層有 必要實現可靠傳輸的狀況;而在一方面,UDP 主要用於那些對高速傳輸和實時 性有較高要求的通訊或廣播通訊。TCP 和 UDP 應該根據應用的目的按需使用。dom


Netty中UDP的核心組件ide

image.png

Netty 的 DatagramPacket 是一個簡單的消息容器,DatagramChannel 實現用它來和遠程 節點通訊。相似於在咱們先前的類比中的明信片,它包含了接收者(和可選的發送者)的地 址以及消息的有效負載自己。oop


讓咱們來運用Netty的核心組件,構建一個基於UDP的多播應用。this


代碼設計實現編碼

 廣播端部分spa

/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知的廣播端
*/
public class NoticeBroadcast {
   //廣播線程組
   private final EventLoopGroup group;
   //廣播啓動器
   private final Bootstrap boot;

   /**
    * 默認構造
    * @param remotePort 接收端端口
    */
   public NoticeBroadcast(int remotePort) {
       this.group = new NioEventLoopGroup();
       this.boot = new Bootstrap();
       //綁定NioDatagramChannel數據報通道
       this.boot.group(group).channel(NioDatagramChannel.class)
       //設置通道用於廣播
       .option(ChannelOption.SO_BROADCAST, true)
       .handler(new NoticeEncoder(new InetSocketAddress(Constant.BROADCAST_IP, remotePort)));
   }

   /**
    * 運行廣播
    */
   public void run() throws Exception {
       int count = 0;
       //綁定廣播通道
       Channel channel = this.boot.bind(0).sync().channel();
       System.out.println("開始運行廣播,發送通知,目標全部主機端口("+Constant.ACCEPTER_PORT+")...");
       //循環廣播通知
       for (;;){
           /**
            * 發送通知到接收端
            */
          channel.writeAndFlush(new Notice(++count, Constant.getNotice(),null));
          //間隔3秒發送
           try {
               Thread.sleep(3000);
           } catch (InterruptedException e) {
               Thread.interrupted();
               e.printStackTrace();
               break;
           }
       }
   }

   /**
    * 中止運行
    */
   public void stop(){
       try {
           this.group.shutdownGracefully();
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:廣播運行器
*/
public class BroadcastRunner {
   /**
    * 運行消息廣播
    * @param args
    */
   public static void main(String[] args) {
       NoticeBroadcast broadcast = null;
       try {
           broadcast = new NoticeBroadcast(Constant.ACCEPTER_PORT);
           broadcast.run();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           broadcast.stop();
       }
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知編碼器
*/
public class NoticeEncoder extends MessageToMessageEncoder<Notice> {
   //目的地
   private final InetSocketAddress target;

   public NoticeEncoder(InetSocketAddress target) {
       this.target = target;
   }

   /**
    * 編碼方法實現
    * @param ctx 處理器上下文
    * @param notice 通知對象
    * @param list 集合
    * @throws Exception
    */
   protected void encode(ChannelHandlerContext ctx, Notice notice, List<Object> list) throws Exception {
       //內容數據
       byte[] bytes = notice.getContent().getBytes(CharsetUtil.UTF_8);
       //定義緩衝:一個int型+一個long型+內容長度+分隔符
       int capacity = 4+8+bytes.length+1;
       ByteBuf buf = ctx.alloc().buffer(capacity);
       //寫通知id
       buf.writeInt(notice.getId());
       //發送時間
       buf.writeLong(notice.getTime());
       //分隔符
       buf.writeByte(Notice.SEPARATOR);
       //內容
       buf.writeBytes(bytes);
       //加入消息列表
       list.add(new DatagramPacket(buf, target));
   }
}


接收端部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知接收器
*/
public class NoticeAccepter {
   //通知線程組
   private final EventLoopGroup group;
   //啓動器
   private final Bootstrap boot;
   public NoticeAccepter() {
       this.group = new NioEventLoopGroup();
       this.boot = new Bootstrap();
       this.boot.group(this.group)
               .channel(NioDatagramChannel.class)
               //開啓通道底層廣播
               .option(ChannelOption.SO_BROADCAST, true)
               //端口重用
               .option(ChannelOption.SO_REUSEADDR, true)
               .handler(new ChannelInitializer<Channel>() {
                   @Override
                   protected void initChannel(Channel channel) throws Exception {
                       ChannelPipeline pipeline = channel.pipeline();
                       pipeline.addLast(new NoticeDecoder());
                       pipeline.addLast(new NoticeChannelHanler());
                   }
               })
               .localAddress(Constant.ACCEPTER_PORT);
   }

   /**
    * 運行接收器
    */
   public void run(){
       try {
           //設置不間斷接收消息,並綁定通道
           Channel channel = this.boot.bind().syncUninterruptibly().channel();
           System.out.println("接收器啓動,端口("+ Constant.ACCEPTER_PORT+"),等待接收通知...");
           //通道阻塞,直到關閉
           channel.closeFuture().sync();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }finally {
          this.stop();
       }
   }

   /**
    * 中止接收消息
    */
   public void stop(){
       try {
           this.group.shutdownGracefully();
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知通道處理器
*/
public class NoticeChannelHanler extends SimpleChannelInboundHandler<Notice> {
   /**
    * 接收廣播傳遞過來的報文
    * @param channelHandlerContext
    * @param notice
    * @throws Exception
    */
   @Override
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, Notice notice) throws Exception {
       StringBuffer buffer = new StringBuffer();
       buffer.append("時間[");
       buffer.append(notice.getTime());
       buffer.append("],廣播源[");
       buffer.append(notice.getSource().toString());
       buffer.append("]=====[");
       buffer.append(notice.getId());
       buffer.append("]=====通知內容:");
       buffer.append(notice.getContent());
       //打印接收到的數據
       System.out.println(buffer.toString());
   }

   /**
    * 異常捕獲
    * @param ctx 上下文
    * @param cause
    * @throws Exception 異常信息
    */
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       ctx.close();
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知解碼器
*/
public class NoticeDecoder extends MessageToMessageDecoder<DatagramPacket> {

   /**
    * 解碼器核心實現
    * @param channelHandlerContext 處理器上下文
    * @param datagramPacket 數據報
    * @param list 消息列表
    * @throws Exception
    */
   @Override
   protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {
       //數據報內容
       ByteBuf data = datagramPacket.content();
       //通知id
       int id = data.readInt();
       //發送時間
       long time = data.readLong();
       //分隔符
       data.readByte();
       //當前索引
       int idx = data.readerIndex();
       //通知內容
       String content = data.slice(idx, data.readableBytes()).toString(CharsetUtil.UTF_8);
       //加入消息列表
       list.add(new Notice(id,content, datagramPacket.sender()));
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:消息接收器啓動器
*/
public class AccepterRunner {
   /**
    * 運行通知接收任務
    * @param args
    */
   public static void main(String[] args) {
       NoticeAccepter accepter = null;
       try {
           accepter = new NoticeAccepter();
           accepter.run();
       } catch (Exception e) {
           e.printStackTrace();
       }finally {
           accepter.stop();
       }
   }
}


其它部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知信息
*/
public class Notice {
   public int getId() {
       return id;
   }

   public long getTime() {
       return time;
   }

   public String getContent() {
       return content;
   }

   public InetSocketAddress getSource() {
       return source;
   }
   //通知id
   private final int id;
   //發送時間
   private final long time;
   //通知內容
   private final String content;
   //來源地址
   private final InetSocketAddress source;
   //分隔符
   public static final byte SEPARATOR = (byte) ':';
   public Notice(int id, String content, InetSocketAddress source) {
       this.id = id;
       this.content = content;
       this.source = source;
       this.time = System.currentTimeMillis();
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:系統常量
*/
public class Constant {
   /**
    * 廣播地址
    */
   public static final String BROADCAST_IP = "255.255.255.255";
   /**
    * 接收者端口(固定的)
    */
   public static final int ACCEPTER_PORT = 8700;
   /**
    * 通知池
    */
   private static final String[] NOTICE_POOL = {
           "多國疫情忽然反彈,北京下一步怎麼辦?",
           "端午假期,這位政法委書記去了邊境",
           "省委書記、省長們的端午假期",
           "人社部迴應圖書館留言大叔 送吳桂春們求職就業指南",
           "北京通報病例狀況,有句話出現十屢次",
           "北京26日新增病例活動軌跡公佈!涉及這些地方!\n",
           "蚊蠅增可能是否會傳播新冠病毒?官方迴應來了",
           "俄羅斯將擴大直接對華供應食品地區名單",
           "一圖看懂:新發地到底有多大多複雜?",
           "倒閉、虧損、壞帳,影視行業如何「活下去」?",
           "警察獻血反被辱!香港醫管局:致歉並展開調查",
           "華爲獲准在英國建研發中心 美官員「打招呼」過問",
           "105所高校經過認證!教育部公佈一份重磅名單",
           "北京新增確診17例:北京16天確診297例",
           "印度陸軍司令:已在中印實控線作長期準備",
           "印度增兵邊境作出兩冒險動做 中方須作衝突升級準備",
           "中印對峙印度下一步會有何行動?偷襲奇襲撈一把就走",
           "印度大批軍機飛向拉達克想洗刷恥辱 直升機緊急着陸",
           "蓬佩奧爲什麼反對伊朗買殲10?將威脅美在波斯灣秩序",
           "美國防受權法呼籲美軍醫療船安慰號及仁慈號停靠臺灣",
           "印度造艦能力有多強?媒體:像當年的中日韓值得看好",
           "中國6代戰機究竟長啥樣:機頭尖銳無平尾",
           "殲16電子戰機有多強:干擾距離翻倍 優於美軍EA-18G",
           "我軍PCL09車載炮爲什麼上高原 高低搭配火力覆蓋沒死角",
           "胡錫進:這時候誰願意去美國?簽證留給黃之鋒吧",
           "美方因涉港問題對中方官員實施簽證限制 中方迴應",
           "我軍爲什麼選擇6-25高炮放棄單35 根本緣由並不在火炮",
           "在中國問題上 短視的是莫迪買單的是印度",
           "解放軍駐吉布提基地官兵已換裝星空迷彩服",
           "印度陸軍司令向防長彙報:在中印實控線作長期準備",
           "印度多架軍機在中印邊境密集活動 直升機緊急着陸",
           "我軍PCL09車載炮爲什麼上高原 高低搭配火力覆蓋沒死角",
           "蓬佩奧爲什麼反對伊朗買殲10?將威脅美在波斯灣秩序",
           "美國防受權法呼籲美軍醫療船安慰號及仁慈號停靠臺灣"
   };
   /**
    * 獲取消息
    */
   public static String getNotice(){
       Random r = new Random();
       return NOTICE_POOL[r.nextInt(NOTICE_POOL.length)];
   }
}


運行驗證

UDP廣播端模式,廣播端和接收端並沒有嚴格地啓動順序;通常來講爲了不開始消息接收不到的問題,可先啓動接收端等待。接收端開多個模擬驗證。

image.png

image.png

image.png

image.png


總結

由於UDP廣播模式的發送針對局域網全部主機IP,因此更適合在公司內部使用項目,相似通知模塊和須要全體一塊兒接收的業務場景。但同時鑑於UDP是面向無鏈接的,消息的發送沒有對端的應答等機制。因此是不可靠的傳輸協議,你們在項目中要評估好業務場景。固然也能夠做爲輔助手段和TCP協議結合使用爲最佳。

相關文章
相關標籤/搜索