netty入門及示例

介紹:
初步學習了一下netty,都知道Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。按照 http://netty.io/wiki/user-guide-for-4.x.html所作的介紹  嘗試編寫了幾個demo  最後加入本身寫的pojo對象,這裏只是一些入門介紹 ,對於原理以及它如何高性能,高可靠性 後續會繼續瞭解。後面會參考一個簡單rpc框架的搭建。這裏的版本html

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.6.Final</version>
</dependency>

第一個:咱們嘗試寫一個簡單的服務端程序  而且經過telnet  在服務端打印一些東西java

server端:bootstrap

package com.manyi.iw.agentcall.soa.server.service;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by huhaosumail on 16/12/22.
 */
public class Server {
    private  int  port;

    public Server(int port){
        this.port=port;
    }

    public void  run() throws Exception{
        EventLoopGroup   bossGroup = new NioEventLoopGroup();
        EventLoopGroup   workGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap  b= new ServerBootstrap();

            b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
//                    socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler());
                    socketChannel.pipeline().addLast(new TimeServerHandler());
                }
            }).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true);

            //綁定和開始接受連接
            ChannelFuture f= b.bind(port).sync();
            f.channel().closeFuture().sync();

        }finally {
            //關閉
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }


    }


    public static void main(String[] args) throws Exception {
        int  port=1080;

        new Server(port).run();

    }
}

serverhandler:服務器

package com.manyi.iw.agentcall.soa.server.service;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * Created by huhaosumail on 16/12/23.
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }

    }


    //    @Override
//    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
////        final ByteBuf  time= ctx.alloc().buffer(4);
////        time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L));
////        final ChannelFuture  f=  ctx.writeAndFlush(time);
////        f.addListener(new ChannelFutureListener() {
////            @Override
////            public void operationComplete(ChannelFuture channelFuture) throws Exception {
////                assert f==channelFuture;
////                ctx.close();
////            }
////        });
//
//        ChannelFuture  f=ctx.writeAndFlush(new Person(26,"胡浩是傻逼"));
//        f.addListener(ChannelFutureListener.CLOSE);
//    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

這裏啓動以後  終端經過telnet localhost 1080  能夠在終端輸入信息 並在控制檯打印  網絡

 

第一個:咱們嘗試編寫一個服務端  寫一些內容發送給客戶端  並打印這個pojo對象  不過這裏我用到序列化框架

不太規範  優先達到效果異步

server:socket

package com.manyi.iw.agentcall.soa.server.service;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by huhaosumail on 16/12/22.
 */
public class Server {
    private  int  port;

    public Server(int port){
        this.port=port;
    }

    public void  run() throws Exception{
        EventLoopGroup   bossGroup = new NioEventLoopGroup();
        EventLoopGroup   workGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap  b= new ServerBootstrap();

            b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler());
//                    socketChannel.pipeline().addLast(new TimeServerHandler());
                }
            }).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true);

            //綁定和開始接受連接
            ChannelFuture f= b.bind(port).sync();
            f.channel().closeFuture().sync();

        }finally {
            //關閉
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }


    }


    public static void main(String[] args) throws Exception {
        int  port=8080;

        new Server(port).run();

    }
}
TimeServerHandler:
package com.manyi.iw.agentcall.soa.server.service;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * Created by huhaosumail on 16/12/23.
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }

    }


        @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
//        final ByteBuf  time= ctx.alloc().buffer(4);
//        time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L));
//        final ChannelFuture  f=  ctx.writeAndFlush(time);
//        f.addListener(new ChannelFutureListener() {
//            @Override
//            public void operationComplete(ChannelFuture channelFuture) throws Exception {
//                assert f==channelFuture;
//                ctx.close();
//            }
//        });

        ChannelFuture  f=ctx.writeAndFlush(new Person(26,"胡浩是傻逼"));
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}
TimeEncoder:
package com.manyi.iw.agentcall.soa.server.service;


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
 * Created by huhaosumail on 16/12/23.
 */
public class TimeEncoder extends MessageToByteEncoder<Person> implements Serializable {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Person person, ByteBuf byteBuf) throws Exception {
//          byteBuf.writeInt(person.getAge());
//        byteBuf.writeBytes(person.getName().getBytes());
        ByteArrayOutputStream  bo=new ByteArrayOutputStream();
        ObjectOutputStream oo=new ObjectOutputStream(bo);
        oo.writeObject(person);
        bo.close();
        oo.close();
        byteBuf.writeBytes(bo.toByteArray());
    }
}

client端:ide

package com.manyi.iw.agentcall.soa.server.service;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import sun.misc.Unsafe;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;

/**
 * Created by huhaosumail on 16/12/23.
 */
public class TimeClient   {

    public static void main(String[] args) throws  Exception{
        EventLoopGroup  workerGroup=new NioEventLoopGroup();
        try{
            Bootstrap  b=new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE,true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new TimeDecoder(),new TimeClientHandler());
                }
            });

            //開啓客戶端
            ChannelFuture  f=b.connect("localhost",8080).sync();

            //等待直到鏈接關閉
            f.channel().closeFuture().sync();

        }finally {
            workerGroup.shutdownGracefully();
        }

    }

}
TimeDecoder:
package com.manyi.iw.agentcall.soa.server.service;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.List;

/**
 * Created by huhaosumail on 16/12/23.
 */
public class TimeDecoder extends ByteToMessageDecoder implements Serializable {


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if(byteBuf.readableBytes()<4){
            return;
        }
//        list.add(byteBuf.readBytes(4));
//        list.add(new Person(byteBuf.readInt()));

//
//        byte[]  req=new byte[byteBuf.readableBytes()];
//        byteBuf.readBytes(req);
//        list.add(new Person(new String(req,"UTF-8")));

        byte[]  req=new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(req);
        ByteArrayInputStream bi = new ByteArrayInputStream(req);
        ObjectInputStream oi = new ObjectInputStream(bi);
        Person  p=(Person)oi.readObject();
        bi.close();
        oi.close();
        list.add(p);
    }

}
TimeClientHandler:
package com.manyi.iw.agentcall.soa.server.service;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

/**
 * Created by huhaosumail on 16/12/23.
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        ByteBuf f= (ByteBuf)msg;
//        try{
//            long currentTimeMills=(f.readUnsignedInt()-2208988800L)*1000L;
//            System.out.println(new Date(currentTimeMills));
//            ctx.close();
//        }finally {
//            f.release();
//        }
        Person  p=(Person)msg;
        System.out.println("name:"+p.getName()+",age:"+p.getAge());
        ctx.close();

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

 

效果工具

相關文章
相關標籤/搜索