本章介紹java
UDP介紹算法
UDP程序結構和設計數據庫
日誌事件POJO編程
編寫廣播器bootstrap
編寫監聽者服務器
使用廣播器和監聽者網絡
Summaryapp
前面的章節都是在示例中使用TCP協議,這一章,咱們將使用UDP。UDP是一種無鏈接協議,若須要很高的性能和對數據的完成性沒有嚴格要求,那使用UDP是一個很好的方法。最著名的基於UDP協議的是用來域名解析的DNS。dom
Netty使用了統一的傳輸API,這使得編寫基於UDP的應用程序很容易。能夠重用現有的ChannelHandler和其餘公共組件來編寫另外的Netty程序。看完本章後,你就會知道什麼是無鏈接協議以及爲何UDP可能適合你的應用程序。socket
13.1 UDP介紹
在深刻探討UDP以前,咱們先了解UDP是什麼,以及UDP有什麼限制或問題。UDP是一種無鏈接的協議,也就是說客戶端和服務器在交互數據以前不會像TCP那樣事先創建鏈接。
UDP是User Datagram Protocol的簡稱,即用戶數據報協議。UDP有不提供數據報分組、組裝和不能對數據報進行排序的缺點,也就是說,當數據報發送以後是沒法確認數據是否完整到達的。
UDP協議的主要做用是將網絡數據流量壓縮成數據包的形式。一個典型的數據包就是一個二進制數據的傳輸單位。每個數據包的前8個字節用來包含報頭信息,剩餘字節則用來包含具體的傳輸數據。
在選擇使用協議的時候,選擇UDP必需要謹慎。在網絡質量使人十分不滿意的環境下,UDP協議數據包丟失會比較嚴重。可是因爲UDP的特性:它不屬於鏈接型協議,於是具備資源消耗小,處理速度快的優勢,因此一般音頻、視頻和普通數據在傳送時使用UDP較多,由於它們即便偶爾丟失一兩個數據包,也不會對接收結果產生太大影響。好比咱們聊天用的ICQ和QQ就是使用的UDP協議。
13.2 UDP程序結構和設計
本章例子中,程序打開一個文件並將文件內容一行一行的經過UDP廣播到其餘的接收主機。
對於像發送日誌的需求,UDP很是適合這樣的應用程序,並可使用UDP經過網絡發送大量的「事件」。
每一個UDP報文分UDP報頭和UDP數據區兩部分,報頭由四個16位長(2字節)字段組成,分別說明該報文的源端口、目的端口、報文長度以及校驗值;數據庫就是傳輸的具體數據。
UDP有以下特性:
1.UDP是一個無鏈接協議,傳輸數據以前源端和終端不創建鏈接,當它想傳送時就簡單地去抓取來自應用程序的數據,並儘量快地把它扔到網絡上。在發送端,UDP傳送數據的速度僅僅是受應用程序生成數據的速度、計算機的能力和傳輸帶寬的限制;在接收端,UDP把每一個消息段放在隊列中,應用程序每次從隊列中讀一個消息段。
2.因爲傳輸數據不創建鏈接,所以也就不須要維護鏈接狀態,包括收發狀態等,所以一臺服務機可同時向多個客戶機傳輸相同的消息。
3.UDP信息包的標題很短,只有8個字節,相對於TCP的20個字節信息包的額外開銷很小。
4.吞吐量不受擁擠控制算法的調節,只受應用軟件生成數據的速率、傳輸帶寬、源端和終端主機性能的限制。
5.UDP使用盡最大努力交付,即不保證可靠交付,所以主機不須要維持複雜的連接狀態表(這裏面有許多參數)。
6.UDP是面向報文的。發送方的UDP對應用程序交下來的報文,在添加首部後就向下交付給IP層。既不拆分,也不合並,而是保留這些報文的邊界,所以,應用程序須要選擇合適的報文大小。
本章UDP程序例子的示意圖入以下:
從上圖能夠看出,例子程序由兩部分組成:廣播日誌文件和「監控器」,監控器用於接收廣播。爲了簡單,咱們將不作任何形式的身份驗證或加密。
13.3 日誌事件POJO
咱們的應用程序一般須要某種「消息POJO」用於保存消息,咱們把這個消息POJO當作是一個「事件消息」在本例子中咱們也建立一個POJO叫作LogEvent,LogEvent用來存儲事件數據,而後將數據輸出到日誌文件。看下面代碼:
package netty.in.action.udp; import java.net.InetSocketAddress; public class LogEvent { public static final byte SEPARATOR = (byte) '|'; private final InetSocketAddress source; private final String logfile; private final String msg; private final long received; public LogEvent(String logfile, String msg) { this(null, -1, logfile, msg); } public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { this.source = source; this.logfile = logfile; this.msg = msg; this.received = received; } public InetSocketAddress getSource() { return source; } public String getLogfile() { return logfile; } public String getMsg() { return msg; } public long getReceived() { return received; } }
接下來的章節,咱們將用這個POJO類來實現具體的邏輯。
13.4 編寫廣播器
咱們要作的是廣播一個DatagramPacket日誌條目,以下圖所示:
上圖顯示咱們有一個從日誌條路到DatagramPacket一對一的關係。如同全部的基於Netty的應用程序同樣,它由一個或多個ChannelHandler和一些實體對象綁定,用於引導該應用程序。首先讓咱們來看看LogEventBroadcaster的ChannelPipeline以及做爲數據載體的LogEvent的流向,看下圖:
上圖顯示,LogEventBroadcaster使用LogEvent消息並將消息寫入本地Channel,全部的信息封裝在LogEvent消息中,這些消息被傳到ChannelPipeline中。流進ChannelPipeline的LogEvent消息被編碼成DatagramPacket消息,最後經過UDP廣播到遠程對等通道。
這能夠歸結爲有一個自定義的ChannelHandler,從LogEvent消息編程成DatagramPacket消息。
回憶咱們在第七章講解的編解碼器,咱們定義個LogEventEncoder,代碼以下:
package netty.in.action.udp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.util.List; public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> { private final InetSocketAddress remoteAddress; public LogEventEncoder(InetSocketAddress remoteAddress){ this.remoteAddress = remoteAddress; } @Override protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out) throws Exception { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes(msg.getLogfile().getBytes(CharsetUtil.UTF_8)); buf.writeByte(LogEvent.SEPARATOR); buf.writeBytes(msg.getMsg().getBytes(CharsetUtil.UTF_8)); out.add(new DatagramPacket(buf, remoteAddress)); } }
下面咱們再編寫一個廣播器:
package netty.in.action.udp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.net.InetSocketAddress; public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; private final File file; public LogEventBroadcaster(InetSocketAddress address, File file) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new LogEventEncoder(address)); this.file = file; } public void run() throws IOException { Channel ch = bootstrap.bind(0).syncUninterruptibly().channel(); long pointer = 0; for (;;) { long len = file.length(); if (len < pointer) { pointer = len; } else { RandomAccessFile raf = new RandomAccessFile(file, "r"); raf.seek(pointer); String line; while ((line = raf.readLine()) != null) { ch.write(new LogEvent(null, -1, file.getAbsolutePath(), line)); } ch.flush(); pointer = raf.getFilePointer(); raf.close(); } try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.interrupted(); break; } } } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { int port = 4096; String path = System.getProperty("user.dir") + "/log.txt"; LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress( "255.255.255.255", port), new File(path)); try { broadcaster.run(); } finally { broadcaster.stop(); } } }
13.5 編寫監聽者
這一節咱們編寫一個監聽者:EventLogMonitor,也就是用來接收數據的程序。EventLogMonitor作下面事情:
接收LogEventBroadcaster廣播的DatagramPacket
解碼LogEvent消息
輸出LogEvent
EventLogMonitor的示意圖以下:
解碼器代碼以下:
package netty.in.action.udp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.CharsetUtil; import java.util.List; public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> { @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception { ByteBuf buf = msg.content(); int i = buf.indexOf(0, buf.readableBytes(), LogEvent.SEPARATOR); String filename = buf.slice(0, i).toString(CharsetUtil.UTF_8); String logMsg = buf.slice(i + 1, buf.readableBytes()).toString(CharsetUtil.UTF_8); LogEvent event = new LogEvent(msg.sender(), System.currentTimeMillis(), filename, logMsg); out.add(event); } }
處理消息的Handler代碼以下:
package netty.in.action.udp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { @Override protected void channelRead0(ChannelHandlerContext ctx, LogEvent msg) throws Exception { StringBuilder builder = new StringBuilder(); builder.append(msg.getReceived()); builder.append(" ["); builder.append(msg.getSource().toString()); builder.append("] ["); builder.append(msg.getLogfile()); builder.append("] : "); builder.append(msg.getMsg()); System.out.println(builder.toString()); } }
EventLogMonitor代碼以下:
package netty.in.action.udp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import java.net.InetSocketAddress; public class LogEventMonitor { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventMonitor(InetSocketAddress address) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LogEventDecoder()); pipeline.addLast(new LogEventHandler()); } }).localAddress(address); } public Channel bind() { return bootstrap.bind().syncUninterruptibly().channel(); } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws InterruptedException { LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(4096)); try { Channel channel = monitor.bind(); System.out.println("LogEventMonitor running"); channel.closeFuture().sync(); } finally { monitor.stop(); } } }
13.6 使用LogEventBroadcaster和LogEventMonitor
爲避免LogEventMonitor接收不到數據,咱們必須先啓動LogEventMonitor後,再啓動LogEventBroadcaster,輸出內容就不貼圖了,讀者能夠本身運營本例子測試。
13.7 Summary
本章依然沒按照原書中的來翻譯,主要是以一個例子來講明UDP在Netty中的使用。概念性的東西都是從網上覆制的,讀者只須要了解UDP的概念再瞭解清楚例子代碼的含義,並試着運行一些例子。