java的網絡工具netty簡介

java的網絡工具netty簡介

         Netty是一個NIO的客服端服務器框架,它能夠簡單、快速的搭建器一個協議包客服端服務器的應用程序。它極大的簡化了TCP和UDP這類的網絡編程。
java

   「快速」和「簡單」並不意味着會讓你的最終應用產生維護性或性能上的問題。Netty 是一個吸取了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各類二進制,文本協議,並通過至關精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性。git

        這裏簡單記錄下學習要點,詳細的講解。能夠看官網(github:https://github.com/netty/netty )或者查看李林鋒的的系列文章http://ifeve.com/author/linfeng/ 。github

        體系結構圖:
編程

        由李林鋒講解的易懂的架構圖:
promise

    一、兩個selector線程:mainReactor處理accpet事件、subReactor處理connection、read、send事件
服務器

    二、業務處理線程池:包括編碼、解碼、業務處理。
網絡


一、官網案例

    a、處理bytes的serverhandler
架構

/**
 * @see 進入的channel:用於處理接受時候的事件處理
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

	/**
	 * @see 當一個channel準備好的時候,發送一個32位的數字
	 */
	public void channelActive(final ChannelHandlerContext ctx) {
		// ByteBuf:沒有了flip()。它只有2個功能:讀、寫
		// 讀:
		// 寫:當你寫的時候,若是讀取下標沒有改變,則繼續增加
		final ByteBuf time = ctx.alloc().buffer(4);
		time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

		final ChannelFuture f = ctx.writeAndFlush(time);
		// 當寫如完成的時候,執行
		f.addListener(new ChannelFutureListener() {

			public void operationComplete(ChannelFuture future) throws Exception {
				// TODO Auto-generated method stub
				assert f == future;
				ctx.close();
			}
		});
	}

	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

    sever的啓動部分框架

public class TimeServer {

	private int port;

	public TimeServer() {
		this.port = port;
	}

	public void runn() throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {

			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup);
			b.channel(NioServerSocketChannel.class);
			b.childHandler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					// TODO Auto-generated method stub
					ch.pipeline().addLast(new TimeServerHandler());
				}

			});
			b.option(ChannelOption.SO_BACKLOG, 128);
			b.childOption(ChannelOption.SO_KEEPALIVE, true);

			ChannelFuture f = b.bind(port).sync();
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}

	}

	public static void main(String[] args) {
		try {
			new TimeServer().runn();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}


    b、client部分:處理字節
socket

public class TimeDecoder extends ByteToMessageDecoder {
	/**
	 * @see 定義一個回調的數據累加buff
	 * @see 若是有out,則表示解析成功。
	 */
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		if (in.readableBytes() < 4) {
			return;
		}
		out.add(in.readBytes(4));
	}

}

   client的 channel處理類

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ByteBuf buf = (ByteBuf) msg;
		try {
			long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
			System.out.println(new Date(currentTimeMillis));
			ctx.close();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			buf.release();
		}
	}

	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

    client的啓動類:

public class TimeClient {
	public static void main(String[] args) throws InterruptedException {
		String host = args[0];
		int port = Integer.parseInt(args[1]);
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();// 啓動客服端鏈接
			b.group(workerGroup);// 同時用於主線程和工做線程
			b.channel(NioSocketChannel.class);// 客服端須要的channel
			b.option(ChannelOption.SO_KEEPALIVE, true); // socketChannel沒有父類
			
			b.handler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					// TODO Auto-generated method stub
					ch.pipeline().addLast(new TimeClientHandler());
				}
			});
			ChannelFuture f = b.connect(host, port).sync();
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
		}
	}
}


二、stream處理

    小型buffer的socket傳送流傳輸依據TCP/IP,接受的數據是儲存在一個接受的socket的buffer中。可是,傳送的buffer不是一個隊列包、而是一個隊列btyes。這就意味着,即便你使用兩個包去傳送兩端信息,系統不會將它們視爲兩端信息,而是做爲一串bytes。所以,這不能保證你讀去的數據是你遠程寫入的數據。例如,咱們須要使用系統的TCP/IP棧接受到3個數據包。

     

由於根據流協議,你頗有可能在你的應用中讀取到你下面的部分

因次,在服務器和客服端的接受部分,對接受數據必須定義一個協議的框架(處理方式),這個框架可以被應用程序使用。接收到的部分必須是下面這種方式。

    a、第一種解決方式:

        在TIME client的實例中。咱們一樣是有一個類似的問題,一個很是小的32位bit的整數數據,它不太可能分散。然而,隨着流量的增長,問題是它會碎片化。

        簡單的解決方式,增長一個內部的累加buffer,而後將接受的4bytes傳輸到這個buffer中。在TimeClientHandler直接修改

public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {

	private ByteBuf buf;

	@Override
	public void handlerAdded(ChannelHandlerContext ctx) {
		buf = ctx.alloc().buffer(4);
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) {
		buf.release();
		buf = null;
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ByteBuf m = (ByteBuf) msg;
		buf.writeBytes(m);
		m.release();
		if (buf.readableBytes() >= 4) {
			long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
			System.out.println(new Date(currentTimeMillis));
			ctx.close();
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}


    b、第二種就是前面的實例方式、將decode分離出來處理。看起來清晰、方便

三、編解object數據

    object

public class UnixTime {

	private final long value;

	public UnixTime() {
		this(System.currentTimeMillis() / 1000L + 2208988800L);
	}

	public UnixTime(long value) {
		this.value = value;
	}

	public long value() {
		return this.value;
	}

	public String toString() {
		return new Date((value() - 2208988800L) * 1000L).toString();
	}

}

    object:decode

public class TimeDecoder2 extends ByteToMessageDecoder {

	/**
	 * @see 定義一個回調的數據累加buff
	 * @see 若是有out,則表示解析成功。
	 */
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		if (in.readableBytes() < 4) {
			return;
		}
		out.add(new UnixTime(in.readUnsignedInt()));
	}
}

    objec:encode

public class TimeEncoder extends ChannelOutboundHandlerAdapter {

	public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
		UnixTime m = (UnixTime) msg;
		ByteBuf encoded = ctx.alloc().buffer(4);
		encoded.writeInt((int) m.value());
		ctx.write(encoded, promise);
	}

   

    server:handler

public class TimeServerHandler2 extends ChannelInboundHandlerAdapter {

	/**
	 * @see 當一個channel準備好的時候,發送一個32位的數字
	 */
	public void channelActive(final ChannelHandlerContext ctx) {
		// ByteBuf:沒有了flip()。它只有2個功能:讀、寫
		// 讀:
		// 寫:當你寫的時候,若是讀取下標沒有改變,則繼續增加
		final ByteBuf time = ctx.alloc().buffer(4);
		time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

		final ChannelFuture f = ctx.writeAndFlush(new UnixTime());
		// 當寫如完成的時候,執行
		f.addListener(ChannelFutureListener.CLOSE);
	}

	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}

 

client:handler

public class TimeClientHandler3 extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		UnixTime m = (UnixTime) msg;
		System.out.println(m);
		ctx.close();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}
相關文章
相關標籤/搜索