【Netty】UDP廣播事件

1、前言bootstrap

  前面學習了WebSocket協議,而且經過示例講解了WebSocket的具體使用,接着學習如何使用無鏈接的UDP來廣播事件。網絡

2、UDP廣播事件app

  2.1 UDP基礎dom

  面向鏈接的TCP協議管理端到端的鏈接,在鏈接生命週期中,發送的消息會有序而且可靠地進行傳輸,最後鏈接有序地終止。然而,在無鏈接協議(如UDP)中,沒有持久鏈接的概念,每一個消息(UDP數據報)都是獨立的傳輸,此外,UDP沒有TCP的糾錯機制(即每一個對等體會確認其接收到的分組,而且發送者會重傳未確認的分組)。ide

  UDP的限制比TCP多,可是比TCP快不少,這是由於消除了握手和消息管理的全部開銷,UDP很是適合處理或容忍消息丟失的應用。oop

  2.2 UDP廣播學習

  迄今爲止全部的示例都使用了單播的傳輸模式,其被定義爲將消息發送到由惟一地址標識的單個網絡目的地,有鏈接和無鏈接的協議都支持這種模式,UDP爲多個收件人發送消息提供了額外的傳輸模式:ui

    · 組播--傳輸到定義的主機組。this

    · 廣播--傳輸到網絡(或子網)上的全部主機。編碼

  本章中的示例將經過發送在同一網絡上的全部主機接收的消息來使用UDP廣播。

  2.3 UDP簡單示例

  示例將打開一個文件,並經過UDP將每一行廣播爲指定端口。下圖展現了應用的結構圖。

  

  2.4 LogEvent POJO

  在消息應用中,消息常常以POJO形式展示,LogEvent的POJO以下。  

public final class LogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;
    public LogEvent(String logfile, String msg) {
        this(null, -1, logfile, msg);
    }
    public LogEvent(InetSocketAddress source, long received,
        String logfile, String msg) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }
    public InetSocketAddress getSource() {
        return source;
    }
    public String getLogfile() {
        return logfile;
    }
    public String getMsg() {
        return msg;
    }
    public long getReceivedTimestamp() {
        return received;
    }
}

  2.5 編寫broadcaster

  Netty提供了許多類來支持UDP應用程序,如Netty的DatagramPacket是DatagramChannel實現與遠程對等體進行通訊的簡單消息容器,咱們須要一個編碼器將EventLog消息轉換爲DatagramPackets,能夠擴展Netty的MessageToMessageEncoder,LogEventEncoder的代碼以下。  

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
        LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc()
            .buffer(file.length + msg.length + 1);
        buf.writeBytes(file);
        buf.writeByte(LogEvent.SEPARATOR);
        buf.writeBytes(msg);
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}

  完成編碼器後,便可以開始啓動服務端,其中服務端LogEventBroadcaster的代碼以下。  

public class LogEventBroadcaster {
    private final Bootstrap bootstrap;
    private final File file;
    private final EventLoopGroup group;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));

        this.file = file;
    }

    public void run() throws IOException {
        Channel ch = bootstrap.bind(0).syncUninterruptibly().channel();
        System.out.println("LogEventBroadcaster running");
        long pointer = 0;
        for (;;) {
            long len = file.length();
            if (len < pointer) {
                // file was reset
                pointer = len;
            } else if (len > pointer) {
                // Content was added
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            throw new IllegalArgumentException();
        }

        LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
                Integer.parseInt(args[0])), new File(args[1]));
        try {
            broadcaster.run();
        } finally {
            broadcaster.stop();
        }
    }
}

  2.6 編寫monitor

  在應用中

    · 接收由LogEventBroadcaster廣播的UDP DatagramPackets。

    · 將其解碼爲LogEvent。

    · 將LogEvent寫入輸出流System.out。

  下圖展現LogEvent的流動。

  

  LogEventDecoder負責將傳入的DatagramPackets解碼爲LogEvent消息,其代碼以下。  

public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
        ByteBuf data = datagramPacket.content();
        int i = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
        String filename = data.slice(0, i).toString(CharsetUtil.UTF_8);
        String logMsg =  data.slice(i + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);

        LogEvent event = new LogEvent(datagramPacket.recipient(), System.currentTimeMillis(),
                filename,logMsg);
        out.add(event);
    }
}

  而LogEventHandler用於處理LogEvent,其代碼以下。  

public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());

        System.out.println(builder.toString());
    }
}

  LogEventMonitor用於將處理器添加至管道中,其代碼以下。  

public class LogEventMonitor {

    private final Bootstrap bootstrap;
    private final EventLoopGroup group;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                }).localAddress(address);

    }

    public Channel bind() {
        return bootstrap.bind().syncUninterruptibly().channel();
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");

            channel.closeFuture().await();
        } finally {
            monitor.stop();
        }
    }
}

  運行LogEventBroadcaster和LogEventMonitor

3、總結

  本篇博文講解了UDP協議,以及其示例,在實際應用中須要根據不一樣的應用場景選擇不一樣的協議,謝謝各位園友的觀看~

相關文章
相關標籤/搜索