第十三章:經過UDP廣播事件

本章介紹java

UDP介紹算法

UDP程序結構和設計數據庫

日誌事件POJO編程

編寫廣播器bootstrap

編寫監聽者服務器

使用廣播器和監聽者網絡

Summaryapp

前面的章節都是在示例中使用TCP協議,這一章,咱們將使用UDP。UDP是一種無鏈接協議,若須要很高的性能和對數據的完成性沒有嚴格要求,那使用UDP是一個很好的方法。最著名的基於UDP協議的是用來域名解析的DNS。dom

Netty使用了統一的傳輸API,這使得編寫基於UDP的應用程序很容易。能夠重用現有的ChannelHandler和其餘公共組件來編寫另外的Netty程序。看完本章後,你就會知道什麼是無鏈接協議以及爲何UDP可能適合你的應用程序。socket

13.1 UDP介紹

 在深刻探討UDP以前,咱們先了解UDP是什麼,以及UDP有什麼限制或問題。UDP是一種無鏈接的協議,也就是說客戶端和服務器在交互數據以前不會像TCP那樣事先創建鏈接。

 UDP是User Datagram Protocol的簡稱,即用戶數據報協議。UDP有不提供數據報分組、組裝和不能對數據報進行排序的缺點,也就是說,當數據報發送以後是沒法確認數據是否完整到達的。

UDP協議的主要做用是將網絡數據流量壓縮成數據包的形式。一個典型的數據包就是一個二進制數據的傳輸單位。每個數據包的前8個字節用來包含報頭信息,剩餘字節則用來包含具體的傳輸數據。

在選擇使用協議的時候,選擇UDP必需要謹慎。在網絡質量使人十分不滿意的環境下,UDP協議數據包丟失會比較嚴重。可是因爲UDP的特性:它不屬於鏈接型協議,於是具備資源消耗小,處理速度快的優勢,因此一般音頻、視頻和普通數據在傳送時使用UDP較多,由於它們即便偶爾丟失一兩個數據包,也不會對接收結果產生太大影響。好比咱們聊天用的ICQ和QQ就是使用的UDP協議。

13.2 UDP程序結構和設計

本章例子中,程序打開一個文件並將文件內容一行一行的經過UDP廣播到其餘的接收主機。

對於像發送日誌的需求,UDP很是適合這樣的應用程序,並可使用UDP經過網絡發送大量的「事件」。

每一個UDP報文分UDP報頭和UDP數據區兩部分,報頭由四個16位長(2字節)字段組成,分別說明該報文的源端口、目的端口、報文長度以及校驗值;數據庫就是傳輸的具體數據。

UDP有以下特性:

1.UDP是一個無鏈接協議,傳輸數據以前源端和終端不創建鏈接,當它想傳送時就簡單地去抓取來自應用程序的數據,並儘量快地把它扔到網絡上。在發送端,UDP傳送數據的速度僅僅是受應用程序生成數據的速度、計算機的能力和傳輸帶寬的限制;在接收端,UDP把每一個消息段放在隊列中,應用程序每次從隊列中讀一個消息段。

2.因爲傳輸數據不創建鏈接,所以也就不須要維護鏈接狀態,包括收發狀態等,所以一臺服務機可同時向多個客戶機傳輸相同的消息。

3.UDP信息包的標題很短,只有8個字節,相對於TCP的20個字節信息包的額外開銷很小。

4.吞吐量不受擁擠控制算法的調節,只受應用軟件生成數據的速率、傳輸帶寬、源端和終端主機性能的限制。

5.UDP使用盡最大努力交付,即不保證可靠交付,所以主機不須要維持複雜的連接狀態表(這裏面有許多參數)。

6.UDP是面向報文的。發送方的UDP對應用程序交下來的報文,在添加首部後就向下交付給IP層。既不拆分,也不合並,而是保留這些報文的邊界,所以,應用程序須要選擇合適的報文大小。

本章UDP程序例子的示意圖入以下:

從上圖能夠看出,例子程序由兩部分組成:廣播日誌文件和「監控器」,監控器用於接收廣播。爲了簡單,咱們將不作任何形式的身份驗證或加密。

13.3 日誌事件POJO

咱們的應用程序一般須要某種「消息POJO」用於保存消息,咱們把這個消息POJO當作是一個「事件消息」在本例子中咱們也建立一個POJO叫作LogEvent,LogEvent用來存儲事件數據,而後將數據輸出到日誌文件。看下面代碼:

package netty.in.action.udp;  
import java.net.InetSocketAddress;  
public class LogEvent {  
    public static final byte SEPARATOR = (byte) '|';  
    private final InetSocketAddress source;  
    private final String logfile;  
    private final String msg;  
    private final long received;  
    public LogEvent(String logfile, String msg) {  
        this(null, -1, logfile, msg);  
    }  
    public LogEvent(InetSocketAddress source, long received, String logfile, String msg) {  
        this.source = source;  
        this.logfile = logfile;  
        this.msg = msg;  
        this.received = received;  
    }  
  
    public InetSocketAddress getSource() {  
        return source;  
    }  
  
    public String getLogfile() {  
        return logfile;  
    }  
  
    public String getMsg() {  
        return msg;  
    }  
  
    public long getReceived() {  
        return received;  
    }  
  
}

接下來的章節,咱們將用這個POJO類來實現具體的邏輯。

13.4 編寫廣播器

咱們要作的是廣播一個DatagramPacket日誌條目,以下圖所示:

上圖顯示咱們有一個從日誌條路到DatagramPacket一對一的關係。如同全部的基於Netty的應用程序同樣,它由一個或多個ChannelHandler和一些實體對象綁定,用於引導該應用程序。首先讓咱們來看看LogEventBroadcaster的ChannelPipeline以及做爲數據載體的LogEvent的流向,看下圖:

上圖顯示,LogEventBroadcaster使用LogEvent消息並將消息寫入本地Channel,全部的信息封裝在LogEvent消息中,這些消息被傳到ChannelPipeline中。流進ChannelPipeline的LogEvent消息被編碼成DatagramPacket消息,最後經過UDP廣播到遠程對等通道。

這能夠歸結爲有一個自定義的ChannelHandler,從LogEvent消息編程成DatagramPacket消息。

回憶咱們在第七章講解的編解碼器,咱們定義個LogEventEncoder,代碼以下:

package netty.in.action.udp;  
  
import io.netty.buffer.ByteBuf;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.socket.DatagramPacket;  
import io.netty.handler.codec.MessageToMessageEncoder;  
import io.netty.util.CharsetUtil;  
  
import java.net.InetSocketAddress;  
import java.util.List;  
  
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {  
      
    private final InetSocketAddress remoteAddress;  
      
    public LogEventEncoder(InetSocketAddress remoteAddress){  
        this.remoteAddress = remoteAddress;  
    }  
  
    @Override  
    protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out)  
            throws Exception {  
        ByteBuf buf = ctx.alloc().buffer();  
        buf.writeBytes(msg.getLogfile().getBytes(CharsetUtil.UTF_8));  
        buf.writeByte(LogEvent.SEPARATOR);  
        buf.writeBytes(msg.getMsg().getBytes(CharsetUtil.UTF_8));  
        out.add(new DatagramPacket(buf, remoteAddress));  
    }  
  
}

下面咱們再編寫一個廣播器:

package netty.in.action.udp;  
  
import io.netty.bootstrap.Bootstrap;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelOption;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.nio.NioDatagramChannel;  
  
import java.io.File;  
import java.io.IOException;  
import java.io.RandomAccessFile;  
import java.net.InetSocketAddress;  
  
public class LogEventBroadcaster {  
  
    private final EventLoopGroup group;  
    private final Bootstrap bootstrap;  
    private final File file;  
  
    public LogEventBroadcaster(InetSocketAddress address, File file) {  
        group = new NioEventLoopGroup();  
        bootstrap = new Bootstrap();  
        bootstrap.group(group).channel(NioDatagramChannel.class)  
                .option(ChannelOption.SO_BROADCAST, true)  
                .handler(new LogEventEncoder(address));  
        this.file = file;  
    }  
  
    public void run() throws IOException {  
        Channel ch = bootstrap.bind(0).syncUninterruptibly().channel();  
        long pointer = 0;  
        for (;;) {  
            long len = file.length();  
            if (len < pointer) {  
                pointer = len;  
            } else {  
                RandomAccessFile raf = new RandomAccessFile(file, "r");  
                raf.seek(pointer);  
                String line;  
                while ((line = raf.readLine()) != null) {  
                    ch.write(new LogEvent(null, -1, file.getAbsolutePath(), line));  
                }  
                ch.flush();  
                pointer = raf.getFilePointer();  
                raf.close();  
            }  
            try {  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                Thread.interrupted();  
                break;  
            }  
        }  
    }  
  
    public void stop() {  
        group.shutdownGracefully();  
    }  
  
    public static void main(String[] args) throws Exception {  
        int port = 4096;  
        String path = System.getProperty("user.dir") + "/log.txt";  
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress(  
                "255.255.255.255", port), new File(path));  
        try {  
            broadcaster.run();  
        } finally {  
            broadcaster.stop();  
        }  
    }  
  
}

13.5 編寫監聽者

這一節咱們編寫一個監聽者:EventLogMonitor,也就是用來接收數據的程序。EventLogMonitor作下面事情:

接收LogEventBroadcaster廣播的DatagramPacket

解碼LogEvent消息

輸出LogEvent

EventLogMonitor的示意圖以下:

解碼器代碼以下:

package netty.in.action.udp;  
  
import io.netty.buffer.ByteBuf;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.socket.DatagramPacket;  
import io.netty.handler.codec.MessageToMessageDecoder;  
import io.netty.util.CharsetUtil;  
  
import java.util.List;  
  
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {  
  
    @Override  
    protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out)  
            throws Exception {  
        ByteBuf buf = msg.content();  
        int i = buf.indexOf(0, buf.readableBytes(), LogEvent.SEPARATOR);  
        String filename = buf.slice(0, i).toString(CharsetUtil.UTF_8);  
        String logMsg = buf.slice(i + 1, buf.readableBytes()).toString(CharsetUtil.UTF_8);  
        LogEvent event = new LogEvent(msg.sender(),  
                System.currentTimeMillis(), filename, logMsg);  
        out.add(event);  
    }  
  
}

處理消息的Handler代碼以下:

package netty.in.action.udp;  
  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.SimpleChannelInboundHandler;  
  
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {  
  
    @Override  
    protected void channelRead0(ChannelHandlerContext ctx, LogEvent msg) throws Exception {  
        StringBuilder builder = new StringBuilder();  
        builder.append(msg.getReceived());  
        builder.append(" [");  
        builder.append(msg.getSource().toString());  
        builder.append("] [");  
        builder.append(msg.getLogfile());  
        builder.append("] : ");  
        builder.append(msg.getMsg());  
        System.out.println(builder.toString());  
    }  
}

EventLogMonitor代碼以下:

package netty.in.action.udp;  
  
import io.netty.bootstrap.Bootstrap;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.ChannelOption;  
import io.netty.channel.ChannelPipeline;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.nio.NioDatagramChannel;  
  
import java.net.InetSocketAddress;  
  
public class LogEventMonitor {  
  
    private final EventLoopGroup group;  
    private final Bootstrap bootstrap;  
  
    public LogEventMonitor(InetSocketAddress address) {  
        group = new NioEventLoopGroup();  
        bootstrap = new Bootstrap();  
        bootstrap.group(group).channel(NioDatagramChannel.class)  
                .option(ChannelOption.SO_BROADCAST, true)  
                .handler(new ChannelInitializer<Channel>() {  
                    @Override  
                    protected void initChannel(Channel channel) throws Exception {  
                        ChannelPipeline pipeline = channel.pipeline();  
                        pipeline.addLast(new LogEventDecoder());  
                        pipeline.addLast(new LogEventHandler());  
                    }  
                }).localAddress(address);  
    }  
  
    public Channel bind() {  
        return bootstrap.bind().syncUninterruptibly().channel();  
    }  
  
    public void stop() {  
        group.shutdownGracefully();  
    }  
  
    public static void main(String[] args) throws InterruptedException {  
          
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(4096));  
        try {  
            Channel channel = monitor.bind();  
            System.out.println("LogEventMonitor running");  
            channel.closeFuture().sync();  
        } finally {  
            monitor.stop();  
        }  
    }  
}

13.6 使用LogEventBroadcaster和LogEventMonitor

爲避免LogEventMonitor接收不到數據,咱們必須先啓動LogEventMonitor後,再啓動LogEventBroadcaster,輸出內容就不貼圖了,讀者能夠本身運營本例子測試。

13.7 Summary

本章依然沒按照原書中的來翻譯,主要是以一個例子來講明UDP在Netty中的使用。概念性的東西都是從網上覆制的,讀者只須要了解UDP的概念再瞭解清楚例子代碼的含義,並試着運行一些例子。

相關文章
相關標籤/搜索