第十一章 dubbo通訊框架-netty4

netty4是2.5.6引入的,2.5.6以前的netty用的是netty3。在dubbo源碼中相較於netty3,添加netty4主要僅僅改了兩個類:NettyServer,NettyClient。還有就是編解碼。html

使用方式:java

服務端:bootstrap

1 <dubbo:provider server="netty4"/>

客戶端:socket

1 <dubbo:consumer client="netty4" />

 

1、服務端 - NettyServertcp

  1 package com.alibaba.dubbo.remoting.transport.netty4;
  2 
  3 import com.alibaba.dubbo.common.Constants;
  4 import com.alibaba.dubbo.common.URL;
  5 import com.alibaba.dubbo.common.logger.Logger;
  6 import com.alibaba.dubbo.common.logger.LoggerFactory;
  7 import com.alibaba.dubbo.common.utils.ExecutorUtil;
  8 import com.alibaba.dubbo.common.utils.NetUtils;
  9 import com.alibaba.dubbo.remoting.Channel;
 10 import com.alibaba.dubbo.remoting.ChannelHandler;
 11 import com.alibaba.dubbo.remoting.RemotingException;
 12 import com.alibaba.dubbo.remoting.Server;
 13 import com.alibaba.dubbo.remoting.transport.AbstractServer;
 14 import com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers;
 15 import com.alibaba.dubbo.remoting.transport.netty4.logging.NettyHelper;
 16 
 17 import io.netty.bootstrap.ServerBootstrap;
 18 import io.netty.buffer.PooledByteBufAllocator;
 19 import io.netty.channel.ChannelFuture;
 20 import io.netty.channel.ChannelInitializer;
 21 import io.netty.channel.ChannelOption;
 22 import io.netty.channel.EventLoopGroup;
 23 import io.netty.channel.nio.NioEventLoopGroup;
 24 import io.netty.channel.socket.nio.NioServerSocketChannel;
 25 import io.netty.channel.socket.nio.NioSocketChannel;
 26 import io.netty.util.concurrent.DefaultThreadFactory;
 27 
 28 import java.net.InetSocketAddress;
 29 import java.util.Collection;
 30 import java.util.HashSet;
 31 import java.util.Map;
 32 
 33 public class NettyServer extends AbstractServer implements Server {
 34 
 35     private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
 36 
 37     private Map<String, Channel> channels; // <ip:port, channel>
 38 
 39     private ServerBootstrap bootstrap;
 40 
 41     private io.netty.channel.Channel channel;
 42 
 43     private EventLoopGroup bossGroup;
 44     private EventLoopGroup workerGroup;
 45 
 46     public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
 47         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
 48     }
 49 
 50     @Override
 51     protected void doOpen() throws Throwable {
 52         NettyHelper.setNettyLoggerFactory();
 53 
 54         bootstrap = new ServerBootstrap();
 55 
 56         bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
 57         workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
 58                 new DefaultThreadFactory("NettyServerWorker", true));
 59 
 60         final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
 61         channels = nettyServerHandler.getChannels();
 62 
 63         bootstrap.group(bossGroup, workerGroup)
 64                 .channel(NioServerSocketChannel.class)
 65                 .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
 66                 .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
 67                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
 68                 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 69                     @Override
 70                     protected void initChannel(NioSocketChannel ch) throws Exception {
 71                         NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
 72                         ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
 73                                 .addLast("decoder", adapter.getDecoder())
 74                                 .addLast("encoder", adapter.getEncoder())
 75                                 .addLast("handler", nettyServerHandler);
 76                     }
 77                 });
 78         // bind
 79         ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
 80         channelFuture.syncUninterruptibly();
 81         channel = channelFuture.channel();
 82 
 83     }
 84 
 85     @Override
 86     protected void doClose() throws Throwable {
 87         try {
 88             if (channel != null) {
 89                 // unbind.
 90                 channel.close();
 91             }
 92         } catch (Throwable e) {
 93             logger.warn(e.getMessage(), e);
 94         }
 95         try {
 96             Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
 97             if (channels != null && channels.size() > 0) {
 98                 for (com.alibaba.dubbo.remoting.Channel channel : channels) {
 99                     try {
100                         channel.close();
101                     } catch (Throwable e) {
102                         logger.warn(e.getMessage(), e);
103                     }
104                 }
105             }
106         } catch (Throwable e) {
107             logger.warn(e.getMessage(), e);
108         }
109         try {
110             if (bootstrap != null) {
111                 bossGroup.shutdownGracefully();
112                 workerGroup.shutdownGracefully();
113             }
114         } catch (Throwable e) {
115             logger.warn(e.getMessage(), e);
116         }
117         try {
118             if (channels != null) {
119                 channels.clear();
120             }
121         } catch (Throwable e) {
122             logger.warn(e.getMessage(), e);
123         }
124     }
125 
126     public Collection<Channel> getChannels() {
127         Collection<Channel> chs = new HashSet<Channel>();
128         for (Channel channel : this.channels.values()) {
129             if (channel.isConnected()) {
130                 chs.add(channel);
131             } else {
132                 channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
133             }
134         }
135         return chs;
136     }
137 
138     public Channel getChannel(InetSocketAddress remoteAddress) {
139         return channels.get(NetUtils.toAddressString(remoteAddress));
140     }
141 
142     public boolean isBound() {
143         return channel.isActive();
144     }
145 }

netty4的寫法與netty3有很大不一樣,下面是netty3:(http://www.cnblogs.com/java-zhao/p/7625596.htmlide

 1 /**
 2      * 啓動netty服務,監聽客戶端鏈接
 3      */
 4     @Override
 5     protected void doOpen() throws Throwable {
 6         NettyHelper.setNettyLoggerFactory();
 7         ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
 8         ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
 9         ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
10         bootstrap = new ServerBootstrap(channelFactory);
11 
12         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
13         channels = nettyHandler.getChannels();
14         // https://issues.jboss.org/browse/NETTY-365
15         // https://issues.jboss.org/browse/NETTY-379
16         // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
17         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
18             public ChannelPipeline getPipeline() {
19                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
20                 ChannelPipeline pipeline = Channels.pipeline();
21                 /*int idleTimeout = getIdleTimeout();
22                 if (idleTimeout > 10000) {
23                     pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
24                 }*/
25                 pipeline.addLast("decoder", adapter.getDecoder());
26                 pipeline.addLast("encoder", adapter.getEncoder());
27                 pipeline.addLast("handler", nettyHandler);
28                 return pipeline;
29             }
30         });
31         // bind
32         channel = bootstrap.bind(getBindAddress());
33     }

 

2、客戶端 - NettyClientoop

  1 package com.alibaba.dubbo.remoting.transport.netty4;
  2 
  3 import com.alibaba.dubbo.common.Constants;
  4 import com.alibaba.dubbo.common.URL;
  5 import com.alibaba.dubbo.common.Version;
  6 import com.alibaba.dubbo.common.logger.Logger;
  7 import com.alibaba.dubbo.common.logger.LoggerFactory;
  8 import com.alibaba.dubbo.common.utils.NetUtils;
  9 import com.alibaba.dubbo.remoting.ChannelHandler;
 10 import com.alibaba.dubbo.remoting.RemotingException;
 11 import com.alibaba.dubbo.remoting.transport.AbstractClient;
 12 import com.alibaba.dubbo.remoting.transport.netty4.logging.NettyHelper;
 13 
 14 import io.netty.bootstrap.Bootstrap;
 15 import io.netty.buffer.PooledByteBufAllocator;
 16 import io.netty.channel.Channel;
 17 import io.netty.channel.ChannelFuture;
 18 import io.netty.channel.ChannelInitializer;
 19 import io.netty.channel.ChannelOption;
 20 import io.netty.channel.nio.NioEventLoopGroup;
 21 import io.netty.channel.socket.nio.NioSocketChannel;
 22 import io.netty.util.concurrent.DefaultThreadFactory;
 23 
 24 import java.util.concurrent.TimeUnit;
 25 
 26 public class NettyClient extends AbstractClient {
 27 
 28     private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
 29 
 30     private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
 31 
 32     private Bootstrap bootstrap;
 33 
 34     private volatile Channel channel; // volatile, please copy reference to use
 35 
 36     public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
 37         super(url, wrapChannelHandler(url, handler));
 38     }
 39 
 40     @Override
 41     protected void doOpen() throws Throwable {
 42         NettyHelper.setNettyLoggerFactory();
 43         final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
 44         bootstrap = new Bootstrap();
 45         bootstrap.group(nioEventLoopGroup)
 46                 .option(ChannelOption.SO_KEEPALIVE, true)
 47                 .option(ChannelOption.TCP_NODELAY, true)
 48                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
 49                 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
 50                 .channel(NioSocketChannel.class);
 51 
 52         if (getTimeout() < 3000) {
 53             bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
 54         } else {
 55             bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
 56         }
 57 
 58         bootstrap.handler(new ChannelInitializer() {
 59             protected void initChannel(Channel ch) throws Exception {
 60                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
 61                 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
 62                         .addLast("decoder", adapter.getDecoder())
 63                         .addLast("encoder", adapter.getEncoder())
 64                         .addLast("handler", nettyClientHandler);
 65             }
 66         });
 67     }
 68 
 69     protected void doConnect() throws Throwable {
 70         long start = System.currentTimeMillis();
 71         ChannelFuture future = bootstrap.connect(getConnectAddress());
 72         try {
 73             boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
 74 
 75             if (ret && future.isSuccess()) {
 76                 Channel newChannel = future.channel();
 77                 try {
 78                     // Close old channel
 79                     Channel oldChannel = NettyClient.this.channel; // copy reference
 80                     if (oldChannel != null) {
 81                         try {
 82                             if (logger.isInfoEnabled()) {
 83                                 logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
 84                             }
 85                             oldChannel.close();
 86                         } finally {
 87                             NettyChannel.removeChannelIfDisconnected(oldChannel);
 88                         }
 89                     }
 90                 } finally {
 91                     if (NettyClient.this.isClosed()) {
 92                         try {
 93                             if (logger.isInfoEnabled()) {
 94                                 logger.info("Close new netty channel " + newChannel + ", because the client closed.");
 95                             }
 96                             newChannel.close();
 97                         } finally {
 98                             NettyClient.this.channel = null;
 99                             NettyChannel.removeChannelIfDisconnected(newChannel);
100                         }
101                     } else {
102                         NettyClient.this.channel = newChannel;
103                     }
104                 }
105             } else if (future.cause() != null) {
106                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
107                         + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
108             } else {
109                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
110                         + getRemoteAddress() + " client-side timeout "
111                         + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
112                         + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
113             }
114         } finally {
115             if (!isConnected()) {
116                 //future.cancel(true);
117             }
118         }
119     }
120 
121     @Override
122     protected void doDisConnect() throws Throwable {
123         try {
124             NettyChannel.removeChannelIfDisconnected(channel);
125         } catch (Throwable t) {
126             logger.warn(t.getMessage());
127         }
128     }
129 
130     @Override
131     protected void doClose() throws Throwable {
132         //can't shutdown nioEventLoopGroup
133         //nioEventLoopGroup.shutdownGracefully();
134     }
135 
136     @Override
137     protected com.alibaba.dubbo.remoting.Channel getChannel() {
138         Channel c = channel;
139         if (c == null || !c.isActive())
140             return null;
141         return NettyChannel.getOrAddChannel(c, getUrl(), this);
142     }
143 }

netty3:http://www.cnblogs.com/java-zhao/p/7811040.htmlthis

 1 protected void doOpen() throws Throwable {
 2         NettyHelper.setNettyLoggerFactory();
 3         bootstrap = new ClientBootstrap(channelFactory);
 4         // config
 5         // @see org.jboss.netty.channel.socket.SocketChannelConfig
 6         bootstrap.setOption("keepAlive", true);
 7         bootstrap.setOption("tcpNoDelay", true);
 8         bootstrap.setOption("connectTimeoutMillis", getTimeout());
 9         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
10         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
11             public ChannelPipeline getPipeline() {
12                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
13                 ChannelPipeline pipeline = Channels.pipeline();
14                 pipeline.addLast("decoder", adapter.getDecoder());
15                 pipeline.addLast("encoder", adapter.getEncoder());
16                 pipeline.addLast("handler", nettyHandler);
17                 return pipeline;
18             }
19         });
20     }
21 
22 protected void doConnect() throws Throwable {
23         long start = System.currentTimeMillis();
24         ChannelFuture future = bootstrap.connect(getConnectAddress());
25         try {
26             boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
27 
28             if (ret && future.isSuccess()) {
29                 Channel newChannel = future.getChannel();
30                 newChannel.setInterestOps(Channel.OP_READ_WRITE);
31                 try {
32                     // 關閉舊的鏈接
33                     Channel oldChannel = NettyClient.this.channel; // copy reference
34                     if (oldChannel != null) {
35                         try {
36                             if (logger.isInfoEnabled()) {
37                                 logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
38                             }
39                             oldChannel.close();
40                         } finally {
41                             NettyChannel.removeChannelIfDisconnected(oldChannel);
42                         }
43                     }
44                 } finally {
45                     if (NettyClient.this.isClosed()) {
46                         try {
47                             if (logger.isInfoEnabled()) {
48                                 logger.info("Close new netty channel " + newChannel + ", because the client closed.");
49                             }
50                             newChannel.close();
51                         } finally {
52                             NettyClient.this.channel = null;
53                             NettyChannel.removeChannelIfDisconnected(newChannel);
54                         }
55                     } else {
56                         NettyClient.this.channel = newChannel;
57                     }
58                 }
59             } else if (future.getCause() != null) {
60                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
61                         + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
62             } else {
63                 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
64                         + getRemoteAddress() + " client-side timeout "
65                         + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
66                         + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
67             }
68         } finally {
69             if (!isConnected()) {
70                 future.cancel();
71             }
72         }
73     }

還有就是編解碼。url

後續會作netty4源碼閱讀計劃。spa

相關文章
相關標籤/搜索