一、UDP的基礎知識數據庫
咱們將會把重點放在一個無鏈接協議即用戶數據報協議(UDP)上,它一般用在性能相當重要而且可以容忍必定的數據報丟失的狀況下。bootstrap
面向鏈接的傳輸(如TCP)管理了兩個網絡端點之間的鏈接的創建,在鏈接的生命週期內的有序和可靠的消息傳輸,以及最後,鏈接的有序終止。相比之下,在相似於UDP這樣的無鏈接協議中,並無持久化鏈接這樣的概念,而且每一個消息(一個UDP數據報)都是一個單獨的傳輸單元。安全
此外,UDP也沒有TCP的糾錯機制,其中每一個節點都將確認它們所接收到的包,而沒有被確認的包將會被髮送方從新傳輸。服務器
經過類比,TCP鏈接就像打電話,其中一系列的有序消息將會在兩個方法上流動,相反,UDP則相似於往郵箱中投入一疊明信片。你沒法知道它們將以何種順序到達它們的目的地,或者它們是否全部的都可以到達它們的目的地。網絡
UDP的這些方面可能會讓你感受到嚴重的侷限性,可是它們也解釋了爲什麼它會比TCP快那麼多:全部的握手以及消息管理機制的開銷已經被消除了。顯然,UDP很適合那些可以處理或者容忍消息丟失的應用程序,但可能不適合那些處理金融交易的應用程序。app
二、UDP廣播dom
到目前爲止,咱們全部的例子採用的都是一種叫作單播的傳輸模式,定義爲發送消息給一個由惟一的地址所標識的單一的網絡目的地。面向鏈接的協議和無鏈接協議都支持這種模式。socket
UDP提供了向多個接收者發送消息的額外傳輸模式:ide
多播——傳輸到一個預約義的主機組函數
廣播——傳輸到網絡(或者子網)上的全部主機
示例應用程序將經過發送可以被同一個網絡中的全部主機所接收的消息來演示UDP廣播的使用。爲此,咱們將使用特殊的受限廣播地址或者零網絡地址255.255.255.255.發送到這個地址的消息都將會被定向給本地網絡(0.0.0.0)上的全部主機,而不會被路由器轉發給其餘的網絡。
三、UDP示例應用程序
咱們的示例程序將打開一個文件,隨後將會經過UDP把每一行都做爲一個消息廣播到一個指定的端口。若是你熟悉類UNIX操做系統,你可能會認識到這是標準的syslog實用程序的一個很是簡化的版本。UDP很是適合於這樣的應用程序,由於考慮到日誌文件自己已經被存儲在了文件系統中,所以,偶爾丟失日誌文件中的一兩行是能夠容忍的。此外,該應用程序還提供了極具備價值的高效處理大量數據的能力。
接收方是怎麼樣呢?經過UDP廣播,只需簡單地經過在指定的端口上啓動一個監聽程序,即可以建立一個事件監視器來接收日誌消息。須要注意的是,這樣的輕鬆訪問性也帶來了潛在的安全隱患,這也就是爲什麼在不安全的環境中並不傾向於使用UDP廣播的緣由之一。出於一樣的緣由,路由器一般也會阻止廣播消息,並將它們限制在它們的來源網絡上。
發佈/訂閱模式 : 相似於syslog這樣的應用程序一般會被歸類爲發佈/訂閱模式:一個生產者或者服務發佈事件,而多個客戶端進行訂閱以接收它們。
下圖展現了整個系統的一個高級別試圖,其由一個廣播者以及一個或者多個事件監聽器所組成。廣播者將監聽新內容的出現,當它出現時,則經過UDP將它做爲一個廣播消息進行傳輸。
全部的該UDP端口上監聽的事件監聽器都將會接收到廣播消息。
爲了簡單起見,咱們將不會爲咱們的示例程序添加身份認證、驗證或者加密。可是,要加入這些功能並使得其成爲一個健壯的、可用的實用程序應該也不難。
四、消息POJO:LogEvent
在消息處理應用程序中,數據一般由POJO表示,除了實際上的消息內容,其還包含配置或處理消息。在這個應用程序中,咱們將會把消息做爲事件處理,而且因爲該數據來自於日誌文件,因此咱們將它稱爲LogEvent。如下代碼展現了這個簡單的POJO的詳細信息。
public final class LogEvent { public static final byte SEPARATOR = (byte) ':'; private final InetSocketAddress source; private final String logfile; private final String msg; private final long received; //用於傳出消息的構造函數 public LogEvent(String logfile, String msg) { this(null,-1,logfile,msg); } //用於傳入消息的構造函數 public LogEvent(InetSocketAddress source,long received, String logfile, String msg) { this.source = source; this.logfile = logfile; this.msg = msg; this.received = received; } public InetSocketAddress getSource() { return source; } public String getLogfile() { return logfile; } public String getMsg() { return msg; } public long getReceived() { return received; } }
五、編寫廣播者
Netty提供了大量的類來支持UDP應用程序的編寫。Netty的DatagramPacket是一個簡單的消息容器,DatagramChannel實現用它來和遠程節點通訊。相似於在咱們先前的類比中的明信片,它包含了接收者(和可選的發送者)的地址以及消息的有效負載自己。
要將LogEvent消息轉換爲DatagramPacket,咱們將須要一個編碼器。可是沒有必要從頭開始編寫咱們本身的。咱們將擴展Netty的MessageToMessageEncoder。
下圖展現了正在廣播的3個日誌條目,每個都將經過一個專門的DatagramPacket進行廣播。
下圖呈現了該LogEventBroadcaster的ChannelPipeline的一個高級別試圖,展現了LogEvent消息是如何流經它的。
正如你所看到的,全部的將要被傳輸的數據都被封裝在了LogEvent消息中。LogEventBroadcaster將把這些寫入到Channel中,並經過ChannelPipeline發送它們,在那裏它們將會被轉碼爲DatagramPacket消息。最後,它們都將經過UDP被廣播,並由遠程節點所捕獲。
如下代碼展現了咱們自定義版本的MessageToMessageEncoder,其將執行剛纔所描述的轉換。
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent>{ private final InetSocketAddress remoteAddress; //LogEventEncoder建立了即將被髮送到指定的InetSocketAddress的DatagramPacket消息 public LogEventEncoder(InetSocketAddress remoteAddress){ this.remoteAddress = remoteAddress; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception { byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8); byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8); ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1); //將文件名寫入到ByteBuf中 buf.writeBytes(file); //添加一個SEPARATOR buf.writeByte(LogEvent.SEPARATOR); //將日誌消息寫入ByteBuf中 buf.writeBytes(msg); //將一個擁有數據和目的地地址的新DatagramPacket添加到出站的消息列表中 out.add(new io.netty.channel.socket.DatagramPacket(buf,remoteAddress)); } }
在LogEventEncoder被實現以後,咱們已經準備好了引導該服務器,其包括設置各類各樣的ChannelOption,以及在ChannelPipeline中安裝所須要的ChannelHandler。這將經過主類LogEventBroadcaster完成。以下代碼所示。
public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; private final File file; public LogEventBroadcaster(InetSocketAddress address, File file){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); //引導該NioDatagramChannel(無鏈接) bootstrap.group(group).channel(NioDatagramChannel.class) //設置SO_BROADCAST套接字選項 .option(ChannelOption.SO_BROADCAST,true) .handler(new LogEventEncoder(address)); this.file = file; } public void run() throws Exception{ //綁定Channel Channel ch = bootstrap.bind(0).sync().channel(); long pointer = 0; //啓動主處理循環 for (;;){ long len = file.length(); if (len < pointer){ //file was reset //若是有必要,將文件指針設置到該文件的最後一個字符 pointer = len; } else if (len > pointer){ //Content was added RandomAccessFile raf = new RandomAccessFile(file,"r"); //設置當前的文件指針,以確保沒有任何的舊日誌被髮送 raf.seek(pointer); String line; while((line = raf.readLine()) != null){ //對於每條日誌條目。,寫入一個LogEvent到Channel中 ch.writeAndFlush(new LogEvent(null,-1,file.getAbsolutePath(),line)); } //存儲其在文件中的當前位置 pointer = raf.getFilePointer(); raf.close(); } try { //休眠1秒,若是被中斷,則退出循環,不然從新處理它 Thread.sleep(1000); }catch (InterruptedException e){ Thread.interrupted(); break; } } } public void stop(){ group.shutdownGracefully(); } public static void main(String[] args) throws Exception{ if (args.length != 2){ throw new IllegalArgumentException(); } LogEventBroadcaster broadcaster = new LogEventBroadcaster( new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0])),new File(args[1])); try { broadcaster.run(); }finally { broadcaster.stop(); } } }
六、編寫監視器
目標是將netcat替換爲一個更加完整的事件消費者,咱們稱之爲LogEventMonitor。這個程序將:
(1)接收有LogEventBroadcaster廣播的UDP DatagramPacket
(2)將它們解碼爲LogEvent消息
(3)將LogEvent消息寫到System.out
和以前同樣,該邏輯由一組自定義的ChannelHandler實現——對於咱們的解碼器來講,咱們將擴展MessageToMessageDecoder。下圖描繪LogEventMonitor的ChannelPipeline,而且展現了LogEvnet是如何流經它的。
ChannelPipeline中的第一個解碼器LogEventDecoder負責傳入的DatagramPacket解碼爲LogEvent消息(一個用於轉換入站數據的任何Netty應用程序的典型設置)
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket>{ @Override protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> out) throws Exception { //獲取對DatagramPacket中的數據的引用 ByteBuf data = datagramPacket.content(); //獲取該SEPARATOR的索引 int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR); //提取文件名 String fileName = data.slice(0,idx).toString(CharsetUtil.UTF_8); //提取日誌消息 String logMsg = data.slice(idx + 1,data.readableBytes()).toString(CharsetUtil.UTF_8); //構建一個新的LogEvent對象,而且將它添加到列表中 LogEvent event = new LogEvent(datagramPacket.sender(),System.currentTimeMillis(),fileName,logMsg); out.add(event); } }
第二個ChannelHandler的工做是對第一個ChannelHandler所建立的LogEvent消息執行一些處理。在這個場景下,它只是簡單地將它們寫到System.out。在真實世界的應用程序中,你可能須要聚合來源於不一樣日誌文件的事件,或者將它們發佈到數據庫中。
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent>{ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //當異常發生時,打印棧跟蹤信息,並關閉對應的Channel cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception { //建立StringBuilder,而且構建輸出的字符串 StringBuilder builder = new StringBuilder(); builder.append(event.getReceived()); builder.append(" ["); builder.append(event.getSource().toString()); builder.append("] ["); builder.append(event.getLogfile()); builder.append("] : "); builder.append(event.getMsg()); //打印LogEvent的數據 System.out.println(builder.toString()); } }
LogEventHandler將以一種簡單易讀的格式打印LogEvent消息,如今咱們須要將咱們的LogEventDecoder和LogEventHandler安裝到ChannelPipeline中。
public class LogEventMonitor { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventMonitor(InetSocketAddress address){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST,true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LogEventDecoder()); pipeline.addLast(new LogEventHandler()); } }).localAddress(address); } public Channel bind(){ return bootstrap.bind().syncUninterruptibly().channel(); } public void stop(){ group.shutdownGracefully(); } public static void main(String[] args) throws Exception{ if (args.length != 1){ throw new IllegalArgumentException("Usage:LoEventMonitor <port>"); } LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0]))); try { Channel channel = monitor.bind(); System.out.println("LogEventMonitor running"); channel.closeFuture().sync(); }finally { monitor.stop(); } }