Netty是Trustin Lee在2004年開發的一款高性能的網絡應用程序框架。相比於JDK自帶的NIO,Netty作了至關多的加強,且隔離了jdk nio的實現細節,API也比較友好,還支持流量整形等高級特性。在咱們常見的一些開源項目中已經廣泛的應用到了Netty,好比Dubbo、Elasticsearch、Zookeeper等。java
Netty的具體開發
提示:因代碼相對較多,這裏只展現其主要部分,至於項目中用到的編解碼器、工具類,請直接拉到最後下載源碼!也歡迎順手給個Star~react
須要的依賴算法
<dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> <version>4.1.1</version> </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-jmx</artifactId> <version>4.1.1</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.29.Final</version> </dependency>
Client端代碼apache
package com.example.nettydemo.client; import com.example.nettydemo.client.codec.*; import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture; import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter; import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler; import com.example.nettydemo.common.RequestMessage; import com.example.nettydemo.common.string.StringOperation; import com.example.nettydemo.util.IdUtil; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import javax.net.ssl.SSLException; import java.util.concurrent.ExecutionException; public class Client { public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); //客戶端鏈接服務器最大容許時間,默認爲30s bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s NioEventLoopGroup group = new NioEventLoopGroup(); try { bootstrap.group(group); RequestPendingCenter requestPendingCenter = new RequestPendingCenter(); LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new FrameDecoder()); pipeline.addLast(new FrameEncoder()); pipeline.addLast(new ProtocolEncoder()); pipeline.addLast(new ProtocolDecoder()); pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter)); pipeline.addLast(new OperationToRequestMessageEncoder()); // pipeline.addLast(loggingHandler); } }); //鏈接服務 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888); //由於future是異步執行,因此須要先鏈接上後,再進行下一步操做 channelFuture.sync(); long streamId = IdUtil.nextId(); /** * 發送數據測試,按照定義的規則組裝數據 */ // OrderOperation orderOperation = new OrderOperation(1001, "你好啊,hi"); RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi")); //將future放入center OperationResultFuture operationResultFuture = new OperationResultFuture(); requestPendingCenter.add(streamId, operationResultFuture); //發送消息 for (int i = 0; i < 10; i++) { channelFuture.channel().writeAndFlush(requestMessage); } //阻塞等待結果,結果來了以後會調用ResponseDispatcherHandler去set結果
// OperationResult operationResult = operationResultFuture.get();
// //將結果打印
// System.out.println("返回:"+operationResult);bootstrap
channelFuture.channel().closeFuture().get(); } finally { group.shutdownGracefully(); } }
}
Server端代碼api
package com.example.nettydemo.server; import com.example.nettydemo.server.codec.FrameDecoder; import com.example.nettydemo.server.codec.FrameEncoder; import com.example.nettydemo.server.codec.ProtocolDecoder; import com.example.nettydemo.server.codec.ProtocolEncoder; import com.example.nettydemo.server.handler.MetricsHandler; import com.example.nettydemo.server.handler.ServerIdleCheckHandler; import com.example.nettydemo.server.handler.ServerProcessHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor; import lombok.extern.slf4j.Slf4j; import javax.net.ssl.SSLException; import java.security.cert.CertificateException; import java.util.concurrent.ExecutionException; /** * netty server 入口 */ @Slf4j public class Server { public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException { ServerBootstrap serverBootstrap = new ServerBootstrap(); //設置channel模式,由於是server因此使用NioServerSocketChannel serverBootstrap.channel(NioServerSocketChannel.class); //最大的等待鏈接數量 serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024); //設置是否啓用 Nagle 算法:用將小的碎片數據鏈接成更大的報文 來提升發送效率。 //若是須要發送一些較小的報文,則須要禁用該算法 serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true); //設置netty自帶的log,並設置級別 serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); //thread //用戶指定線程名 NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss")); NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker")); UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business")); //只能使用一個線程,因GlobalTrafficShapingHandler比較輕量級 NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS")); try { //設置react方式 serverBootstrap.group(bossGroup, workGroup); //metrics MetricsHandler metricsHandler = new MetricsHandler(); //trafficShaping流量整形 //long writeLimit 寫入時控制, long readLimit 讀取時控制 具體設置看業務修改 GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024); //log LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG); LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO); //設置childHandler,按執行順序放 serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("debugLog", debugLogHandler); pipeline.addLast("tsHandler", globalTrafficShapingHandler); pipeline.addLast("metricHandler", metricsHandler); pipeline.addLast("idleHandler", new ServerIdleCheckHandler()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("frameEncoder", new FrameEncoder()); pipeline.addLast("protocolDecoder", new ProtocolDecoder()); pipeline.addLast("protocolEncoder", new ProtocolEncoder()); pipeline.addLast("infoLog", infoLogHandler); //對flush加強,減小flush次數犧牲延遲加強吞吐量 pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true)); //爲業務處理指定單獨的線程池 pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup, } }); //綁定端口並阻塞啓動 ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); businessGroup.shutdownGracefully(); eventLoopGroupForTrafficShaping.shutdownGracefully(); } }
}
最後
以上介紹了Netty的基本用法,在代碼中也作了一部分的關鍵註釋,但可能還會有許多不足,也不可能知足全部人的要求,你們可根據本身的實際需求去改造此項目服務器