Netty網絡應用框架和Mina相似,都封裝了Socket。javascript
Mina入門
長鏈接的基本概念
鏈接後一直與服務器保持長期鏈接 html
長鏈接的基本原理
底層都是基於TCP/IP協議
經過Socket,ServerSocket與服務器保持鏈接
服務端通常使用ServerSocket創建監聽,監聽客戶端與之鏈接
客戶端使用Socket,指定端口和IP地址,與服務端進行鏈接java
長鏈接的意義
經過長鏈接,能夠實現服務器主動向客戶端推送消息。
經過長鏈接,能夠減小客戶端對服務器的輪詢,減少服務器的壓力
通訊效率遠高於HTTPreact
Mina的優點
很是適合C/S架構的通信框架
apache出品
使用很是簡單,減少學習成本git
Mina總體講解
客戶端和服務端通訊時主要通過三個步驟:
1.服務器將數據發送到Session中,2.Session將數據發送到過濾鏈中,3.過濾鏈將數據過濾之後才發送到客戶端web
Mian核心類講解
IOService接口、及其相關子類
Responsibilities: 職責
AbstractIoService:默認的方法、默認的成員變量、過濾鏈、線程池等.
IOAcceptor接口、及其相關子類,服務器端最重要的類
//接口繼承關係 //支持TCP協議 //UDP協議
IOService<——IOAcceptor<——SocketAcceptor、DatagramAcceptor
SocketAcceptor<——NioSocketAcceptor(類、TCP協議的監聽器)
DatagramAcceptor<——NioDatagramAcceptor(類、UDP協議的監聽器) 算法
IOConnector接口/及其相關子類
//支持TCP協議 //UDP協議
IOService<——IOConnector<——SocketConnector、DatagramConnector
SocketConnector<——NioSocketConnector(類)
DatagramConnector<——DatagramConnector(類)spring
Filter接口、及其相關子類
LoggingFilter: 記錄mina全部日誌
ProtocolCodecFilter: 數據轉化過濾器
CompressionFilter: 數據壓縮過濾器
//HTTPS之因此支持加密傳輸,就是由於它在HTTP協議和TCP協議之間加了一層SSL協議數據加密
SSLFilter數據加密過濾器
經過繼承IoFilterAdapter本身能夠實現過濾器apache
IOSession類、
狀態創建後會返回一個 IOSession的對象,以後就能夠read()/write()讀寫數據到服務器,也能夠closed掉
receive buffer size:設置接收數據緩存區大小
sending buffer size:設置數據發送緩存區大小
ldel time: 設置狀態恢復時間
write timeout: 設置寫數據超時時間,等
Handler類、應用層較重要類、全部業務邏輯都要在Handler中完成
sessionCreated/sessionOper/sessionClosed 事件監聽
messageReceived/messageSend 事件監聽
exceptionCaught 異常監聽編程
Mina服務器搭建
調用Mina爲咱們提供的服務器API。
IOAcceptor acceptor = new NioSocketAcceptor();
//添加的日誌過濾器
acceptor.getFilterChain().addLast("logger",new LoggingFilter());
acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
//添加事件處理
acceptor.setHandler( new DemoServerHandler());
acceptor.getSessionConfig().SetReadBufferSize(2048);設置讀緩存區大小
acceptor.getSessionConfig().SetIdleTime(IdleStatus.BOTH_IDLE,10);//Session的空閒時間
//監聽客戶端的連接
try{
acceptor.bind(new InetSocketAddress(9123));//指定監聽端口
}catch(Exception e){
}
//該類,負責session對象的建立監聽以及消息發送和接收的監聽
private static class DemeServerHandler extends IoHandlerAdapter{
//session建立之後,會調這個方法
@Override
public void sessionCreated(IoSession session)throws Exception{
super.sessionCreated(session);
}
//session打開後,會調這個方法
@Override
public void sessionOpened(IoSession session)throws Exception{
super.sessionOpened(session);
}
//服務器接收到消息後,會調這個方法
@Override
public void messageReceived(IoSession sessionm,Object message)throws Exception{
super.messageReceived(session,message);
String str = message.toString();
Date date = new Date();
session.write(date.toString());
System.out.println("接收到的數據:"+str);
}
//服務器發送消息後,會調這個方法
@Override
public void messageSent(IoSession session,Object message)throws Exception{
super.messageSent(session,message);
}
//session關閉後,會調這個方法
@Override
public void sessionClosed(IoSession session)throws Exception{
super.sessionClosed(session);
}
}
Mina客戶端搭建
建立一個Service,用來與遠程服務器鏈接
封裝一個ConnectionManager類來提供與服務器的鏈接與斷開方法
在Service中啓動線程,調用ConnectionManager完成鏈接的建立
//構建者模式,
public class ConnectionConfig{
private Context context;
private String ip;
private int port;
private int readBufferSize;
private long connectionTimeout;
public static class Builder{
private Context context;
private String ip = "192.168.1.16";
private int port = 9123;
private int readBufferSize = 10240;
private long connectionTimeout = 10000;
public Builder(Context context){
this.context = context;
}
//構建者標準寫法
public Builder setIP(String ip){
this.ip = ip;
return this;
}
public Builder setPort(int port){
this.port = port;
return this;
}
public Builder setReadBufferSize(int size){
this.readBufferSize = size;
return this;
}
public Builder setConnectionTimeout(int timeout){
this.connectionTimeout = timeout;
return this;
}
private void applyConfig(ConnectionConfig config){
config.context = this.context;
config.ip = this.ip;
config.port = this.port;
config.connectionTimeout = this.connectionTimeout;
}
public ConnectionConfig builder(){
ConnectionConfig config = new ConnectionConfig();
applyConfig(config);
return config;
}
}
}
public class ConnectionManager{
public static final String BROADCAST_ACTION = "com.commonlibrary.mina";
public static final String MESSAGE = "message";
//動態的爲鏈接配置、是構建者模式、更爲靈活
private ConnectionConfig mConfig;
//確保不會內存溢出、用弱引用進行包裝
private WeakReference<Context> mContext;
//鏈接對象
private NioSocketConnector mConnection;
//session對象
private IoSession mSession;
//服務器地址
private InetSocketAddress mAddress;
public ConnectionManager(ConnectionConfig config){
this.mConfig = config;
this.mContext = new WeakReference<Context>(config.getContext());
init();
}
private void init(){//初始化方法
mAddress = new InetSocketAddress(mConfig.getIp(),mConfig.getPort());
mConnection = new NioSocketConnector();
//配置參數
mConnection.getSessionConfig().setReadBufferSize(mConfig.getReadBufferSize());
//配置過濾器
mConnection.getFilterChain().addLast("logging", new LoggingFilter());
mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(
new ObjectSerializationCodecFactory()));
//業務處理類Handler
mConnection.setHandler(new DefaultHandler(mContext.get()));
}
public boolean connect(){//鏈接方法
try{
ConnectFuture future = mConnection.connect();
future.awaitUniterruptibly();
mSession = future.getSession();
}catch(Exception e){
return false;
}
return mSession == null ? false : true;
}
public void disConnection(){//斷開鏈接方法
mConnection.dispose();
mConnection = null;
mSession = null;
mAddress = null;
mContext = null;
}
private static class DefultHandler extends IoHandlerAdapter{
private Context mContext;
DefaultHandler(Context context){
this.mContext = context
}
@Override
public void sessionOpened(IoSession session)throws Exception{
//將咱們的session保存到個人session manager類中,
}
@Override
public void messageReceived(IoSession session,Object message)throws Exception{
if (mContext != null){
//局部廣播、安全
Intent intent = new Intent(BROADCAST_ACTION);
intent.putExtra(MESSAGE,message.toString());
LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
}
}
}
}
public class minaService extends Service{
private ConnectionThread thread;
@Oversion
public void onCreate(){
super.onCreate();
thread = new ConnectionThread("mina",getApplicationContext());//建立時初始化線程
thread.start();//打開
}
@Oversion
public int onStartCommand(Intent intent,int flags,int startId){
return super.onStartCommand(intent,flags,startId);
}
@Oversion
public void onDestroy(){//關閉釋放掉
super.onDestroy();
thread.disConnection();
}
@Nullable
@Oversion
public IBinder onBind(Intent intent){
return null;
}
//線程類,負責調用connection manager類來完成與服務器的鏈接
public class ConnectionThread extends HandlerThread{
private Context context;
boolean isConnection;
ConnectionManager mManager;
ConnectionThread (String name,Context context){
super(name);
this.context = context;
ConnectionConfig_config = new ConnectionConfig().Builder(context)
.setIP("192.168.1.16")
.setPort(9123)
.setReadBufferSize(10240)
.setConnectionTimeout(10000).builder();
}
@Override
protected void onLooperPrepared(){/開始鏈接服務器
for(;;){
isConnection = mManager.connect();//完成服務器的鏈接
if(isConnection){
break;
}
try{
Threaddd.sleep(3000);
}catch(Exception e){
}
}
}
@Override
public void disConnection(){//斷開鏈接
mManager.disConnection();//完成服務器的斷開
}
}
}
Mina客戶端與服務器通訊
public class SessionManager{
private static SessionManager mInstance = null;
private IoSession mSession;//與服務器通訊的對象
public static SessionManager getInstance(){
if(mInstance == null){
synchronized(SessionManager.class){
if(mInstance == null){
mInstance = new SessionManager();
}
}
}
return mInstance;
}
private SessionManager(){}
public void setSession(IoSession session){this.mSession = session;}
public void writeToServer(Object msg){//將對象寫到服務器
if(mSesssion != null){
mSession.write(msg);
}
}
public void closeSession(){
if(mSession != null){
mSession.closeOnFlush();
}
}
public void removeSession(){this.mSession = null;}
}
//Mina測試類
public class MinaTestActivity extends BaseActivity implements View.OnClickListener{
@Bind(R.id.start_servie_view)
protected TextView mConnectView;
@Bind(R.id.send_view)
protected TextView mSendView;
//自定義了一個廣播接收器
private MessageBroadcastReveiver receiver = new MessageBroadcastReveiver();
@Override
protected void onCreate(Bundle savedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_mina_layout);
initButterknife();
initView();
registerBroadcast();註冊廣播接收器
}
private void registerBroadcast(){
IntentFilter filter = new IntentFilter("com.commonlibrary.mina");
LocalBroadcastManager.getInstance(this)
.registerReceiver(receiver,filter);
}
private void unregisterBroadcast(){
LocalBroadcastManager.getInstance(this)
.unregisterReceiver(receiver);
}
private void initView(){
mConnectView.setOnClickListener(this);
mSendView.setOnClickListener(this);
}
@Override
protected void onDestroy(){
super.onDestroy();
stopService(new Intent(this,MinaService.class));//關閉,MianService
unregisterBroadcast()//清空動態註冊的廣播接收器
}
@Overrid
public void onClick(View v){
switch(v.getId()){
case R.id.send_view:
SessionManager.getInstance().writeToServer("123");發送
break;
case R.id.start_servie_view:
Intent intent = new Intent(this,MinaService.class);啓動
startService(intent);
break;
}
}
//接收mina發送來的消息,並更新UI
private class MessageBroadcastReveiver extends BroadcastReceiver{
@Override
public void onReceive(Context context,Intent,intent){
setTitle(intent.getStringExtra("message"));
}
}
}
-----------------------------------------------------------------------------------------------------
阿里巴巴的分佈式服務框架Dubbo,底層通信框架使用了Netty
Netty介紹
高性能事件驅動、異步非堵塞,Jboss開源,Java所寫
支持http,webSocket,破肉的霸服,百納瑞,UDP,TCP
基於NIO的客戶端,服務端編程框架
Netty使用場景
多線程併發領域
大數據領域
異步通訊領域
IO通訊
BIO: 客戶端的個數和服務端的個數相同。 阻塞同步的
僞異步IO:線程池負責鏈接,線程池阻塞。 阻塞同步的
NIO: 緩存區Buffer對象,任何操做都是:讀到緩衝區和寫到緩衝, 非阻塞同步的
通道Channel,讀,寫,能夠兩者同時進行,
多路複用器Selector,會不斷的輪詢註冊的'拆No'。
AIO: 主動通知程序,讀寫方法異步,鏈接註冊讀寫事件和回調函數 非阻塞異步的
Netty入門
API簡單,入門門檻低,性能高,成熟穩定
WebSocket入門
H5提出的的協議規範,
握手機制
解決客戶端與服務端實時通信技術
服務器主動傳送數據給客戶端
WebSocket創建鏈接:1.客戶端發起握手請求,2.服務端響應請求,3.創建鏈接
WebSocket生命週期:1打開事件,2.消息事件,3.錯誤事件,4.關閉事件
WebSocket關閉鏈接:兩種關閉方式:1.服務器關閉底層TCP鏈接,2.客戶端發起TCP Close
Netty實現WebSocket通訊案例
Netty開發服務端:
/**
* 存儲整個工程的全局配置 類
* @author AAA
*
*/
public class NettyConfig {
private static final String GlobalEventExecutor = null;
/**
* 存儲每個客戶端接入進來時的channel對象
*/
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
/**
*接收/處理/響應客戶端websocket請求的核心業務處理類
* @author AAA
*
*/
public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL="ws://localhost:8080/";
//客戶端與服務端建立鏈接的時候調用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.add(ctx.channel());
System.out.println("客戶端與服務端鏈接開啓");
}
//客戶端與服務端斷開鏈接的時候調用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.remove(ctx.channel());
System.out.println("客戶端與服務端鏈接關閉");
}
//服務端接收客戶端發送過來的結束以後調用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//工程出現異常的時候調用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//服務端處理客戶端websocket請求的核心方法
@Override
protected void messageReceived()(ChannelHandlerContext context,Object msg) throws Exception {
if(msg instanceof FullHttpRequest){//處理客戶端向服務端發起http握手請求的業務
handHttpRequest(context,(FullHttpRequest) msg);//調用 客戶端向服務端發起http握手請求的業務
}else if(msg instanceof WebSocketFrame){//處理websocket鏈接業務
handWebsocketFrame(context,(WebSocketFrame) msg);
}
}
//處理客戶端與服務端以前的websocket業務
private void handWebsocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){
//判斷是不是關閉websocket指令
if(frame instanceof CloseWebSocketFrame){
handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame.retain());
}
//判斷是不是ping消息
if(frame instanceof CloseWebSocketFrame){
ctx.channel().write(new PingWebSocketFrame(frame.content().retain()));
}
//判斷是不是二進制消息,若是是二進制消息,拋出異常
if(!(frame instanceof TextWebSocketFrame)){
System.out.println("目前咱們不支持二進制消息");
throw new RuntimeException("{"+this.getClass().getName()+"},不支持消息");
}
//返回應答消息
String request = ((TextWebSocketFrame) frame).text();
System.out.println("服務端收到客戶端的消息");
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
+ctx.channel().id()
//羣發,服務端向每一個鏈接上來的客戶端羣發消息
NettyConfig.group.writeAndFlush(tws);
}
//處理客戶端向服務端發起http握手請求的業務
private void handHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){
if(!req.getDecoderResult().isSuccess()||!("websocket".equals(req.headers().get("upgrade")))){
sendHttResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory
(WEB_SOCKET_URL, null, false);
handshaker = wsFactory.newHandshaker(req);
if(handshaker == null){
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel())
}else{
handshaker.handshake(ctx.channel(), req);
}
}
//服務端向客戶端響應信息
private void sendHttResponse(ChannelHandlerContext ctx,FullHttpRequest req,
DefaultFullHttpResponse res){
if(res.getStatus().code() != 200){
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
//服務端向客戶端發送數據
ChannelFuture f = ctx.channel().writeAndFlush(res);
if(res.getStatus().code() != 200){
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
protected void channelRead0(ChannelHandlerContext arg0, Object arg1) throws Exception {
// TODO Auto-generated method stub
}
}
/**
* 初始化鏈接時,加載各個組件
* @author AAA
*
*/
public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel e) throws Exception {
e.pipeline().addLast("http-codec",new HttpServerCodec());
e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
e.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
e.pipeline().addLast("handler",new MyWebSocketHandler());
}
}
/**
* WebSocket啓動類
* @author AAA
*
*/
public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup()
try{
ServerBootstrap b =new ServerBootstrap();
b.group(bossGroup,workGroup);
b.channel(NioSctpServerChannel.class);
b.childHandler(new MyWebSocketChannelHandler());
System.out.println("服務端開啓等待客戶端鏈接。。。");
Channel ch = b.bind(8888).sync().channel();
ch.closeFuture().sync();
}catch(Exception e ){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
HTML開發客戶端:
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
<title>WebSocket客戶端</title>
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8888/websocket");
socket.onmessage = function(event){
var ta = document.getElementById('responseContent');
ta.value += event.data + "\r\n";
};
socket.onopen = function(event){
var ta = document.getElementById('responseContent');
ta.value = "你當前的瀏覽器支持WebSocket,請繼續操做\r\n";
};
socket.onclose = function(event){
var ta = document.getElementById('responseContent');
ta.value = null;
ta.value = "WebSocket鏈接已關閉\r\n";
};
}else{
alert("您的瀏覽器不支持WebSocket");
}
function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket鏈接沒有創建成功");
}
}
</script>
</head>
<body>
<form onsubmit="return false;">
<input type="text" name="message" value=""/>
<br/><br/>
<input type="button" value="發送WebSocket請求消息" onclick="send(this.form.message.value)">
<hr color="red">
<h2>客戶端接收到服務端返回的應答消息</h2>
<textarea id="responseContent" style="width:1024px;height:300px"></textarea>
</form>
</body>
</html>
實現服務端與客戶端的實時交互
---------------------------------------------------------------------------------------------------------
Java讀源碼之Netty深刻剖析
Netty是什麼?
異步事件驅動框架,用於快速開發高性能客戶端和服務端
封裝了JDK底層BIO和NIO模型,提供高度能夠的API
自帶編解碼器解決拆包粘包問題,用戶只要關心業務邏輯
精心設計的reactor線程模型支持高併發海量鏈接
自帶何種協議棧讓你處理任何一種通用協議都幾乎不用親自動手
Netty基本組件
服務端首先去監聽服務端口、客戶端去鏈接服務端、服務端每次寫數據、每隔五秒寫一次
監聽端口 NioEventLoop:新鏈接的接入、鏈接當前存在的數據流讀寫
新鏈接 Channel: 讀寫操做
接收數據 ByteBuf: 服務端接收客戶端數據
業務邏輯 ChannelHandler:
發送數據
業務邏輯 ChannelHanler:
Netty和Mina的對比:
一、都是Trustin Lee的做品,Netty更晚;
二、Mina將內核和一些特性的聯繫過於緊密,使得用戶在不須要這些特性的時候沒法脫離,相比下性能會有所降低,Netty解決了這個設計問題;
三、Netty的文檔更清晰,不少Mina的特性在Netty裏都有;
四、Netty更新週期更短,新版本的發佈比較快;
五、它們的架構差異不大,Mina靠apache生存,而Netty靠jboss,和jboss的結合度很是高,Netty有對google protocal buf的支持,有更完整的ioc容器支持(spring,guice,jbossmc和osgi);
六、Netty比Mina使用起來更簡單,Netty裏你能夠自定義的處理upstream events 或/和 downstream events,可使用decoder和encoder來解碼和編碼發送內容;
七、Netty和Mina在處理UDP時有一些不一樣,Netty將UDP無鏈接的特性暴露出來;而Mina對UDP進行了高級層次的抽象,能夠把UDP當成"面向鏈接"的協議,而要Netty作到這一點比較困難。
Netty服務端啓動
分爲4個過程:1.建立服務端Channel
bind()用戶代碼入口
initAndRegister()初始化並註冊
newChannel()建立服務端channel
//反射建立服務端Channel
newSocket()經過jdk來建立底層jdk channel
NioServerSocketChannelConfig()tcp參數配置類
AbstractNioChannel()
configureBlocking(false)(非)阻塞模式
AbstractChannel()建立id、unsafe、pipeline(重點、用於邏輯處理)
2.初始化服務端Channel
init()初始化入口
set ChannelOptions、ChannelAttrs
set ChildOptions、ChlidAttrs
config handler配置服務端pipeline
add ServerBootstrapAcceptor添加鏈接器
3.註冊selector
AbstractChannel.register(channel)入口
this.eventLoop = eventLoop綁定線程
resgiterO()實際註冊
doRegister()調用jdk底層註冊
invokeHandlerAddedIfNeeded()事件的回調
fireChannelRegistered()傳播事件
4.端口綁定
AbstractUnsafe.bind()入口
doBind()
JavaChannel.bing()jdk底層綁定
pipeline.fireChannelActive()傳播事件
HeadContext.readIfIsAutoRead()從新綁定新鏈接
服務端啓動核心路徑總結:newChannel()——》init()——》doBind()——》register()
默認狀況下,Netty服務端器多少線程?什麼時候啓動?
Netty是如何解決jdk空輪訓bug的?
Netty如何保證異步串行無鎖化?
NioEventLoop建立
new NioEventLoopGroup()線程組,默認2*cpu
new ThreadPerTaskExecutor()線程建立器
for(){newChild()}構造NioEventLoop
chooserFactory.newChooser()線程選擇器
ThreadPerTaskExecuteor線程執行器
//每次執行任務都會建立一個線程實體
//NioEventLoop線程命名規則nioEventLoop-1-xx
newchild()
//保存線程執行器ThreadPerTaskExecutor
//建立一個MpscQueue
//建立一個selector
chooserFactory.newChooser()
//爲新鏈接綁定NioEventLoop
NioEventLoop啓動:分爲兩種觸發器
服務端啓動綁定端口
bind()->execute(task)入口
startThread()->doStartThread()建立線程
ThreadPerTaskExecutor.execute()
thread = Thread.currentThread()
NioEventLoop.run啓動(Neety運轉的核心)
run()-> for(;;)
select()檢查是否有io事件
//deadline以及任務穿插邏輯處理
//阻塞是select
//避免jdk空輪詢的bug
processSelectedKeys()處理io事件
//selected keySet優化
//processSelectedKeysOptimized()對key作處理
runAllTasks()(執行邏輯)處理異步任務隊列
//task的分類和添加
//任務的聚合
//任務的執行
新鏈接接入經過chooser綁定一個NioEventLoop
第五章 Netty新鏈接接入
兩個問題:
Netty是在哪裏檢測有新鏈接接入的?
新鏈接是怎樣註冊到NioEventLoop線程的?
Netty新鏈接接入處理邏輯:
檢測新鏈接
processSelectedKey(key,channel)入口
NioMessageUnsafe.read()
doReadMessages()while循環
javaChannel.accept();
建立NioSocketChannel
new NioSocketChannel(parent,ch)入口
AbstractNioByteChannel(p,ch,op_read)
configureBlocking(false) & save op
create id,unsafe,pipeline
new NioSocketChannelConfig()
setTcpNoDelay(true)禁止Nagle算法
Channel的分類
NioServerSocketChannel
NioSocketChannel
Unsafe
Channel的層級
AbstractChannel
AbstractNioChannel
AbstractNioByteChannel
NioSocketChannel
NioByteUnsafe
NioSocketChannelConfig
AbstractNioMessageChannel
NioMessageUnsafe
NioServerSocketChannel
NioServerSocketChannelConfig
分配線程及註冊selector
服務端Channel的pipeline構成
Head - ServerBootstrapAcceptor - Tail
ServerBootstrapAcceptor
//添加childHandler
//設置options和attrs
//NioEventLoop並註冊selector
向selector註冊讀事件
NioSocketChannel讀事件的註冊
第六章pipeline
三個問題:
netty是如何判斷ChannelHandler類型的?
對於ChannelHandler的添加應該遵循什麼樣的順序?
用戶手動觸發事件傳播,不動的觸發方式有什麼的區別?
pipline初始化:
pipline在建立Channel的時候被建立
pipline節點數據結構:ChannelHandlerContext
pipline中的兩大哨兵:head和tail
添加ChannelHandler
//判斷是否重複添加
//建立節點並添加至鏈表
//回調添加完成事件
刪除ChannelHandler
//找到節點
//鏈接的刪除
//回調刪除Handler事件
inBound事件的傳播
//何爲inBound事件以及ChannelInboundHandler
//ChannelRead事件的傳播
//SimpleInBoundHandler處理器
outBound事件的創博
//可謂outBound事件以及ChannelOutBoundHandler
//write()事件的傳播
異常的傳播
//異常觸發鏈
//異常處理的最佳實踐
pipeline總結:pipeline的初始化、pipeline的數據結構是雙向列表結構
添加刪除ChannelHandler
默認pipeline中存在兩種類型節點:Head和Tail
三個問題:
netty是如何判斷ChannelHandler類型的?
對於ChannelHandler的添加應該遵循什麼樣的順序?
用戶手動觸發事件傳播,不一樣的觸發方式有什麼樣的區別
第七章ByteBuf(內存分配、最爲底層的、
主要負責把數據從底層讀到ByteBuf傳遞應用程序,處理完後封裝爲ByteBuf寫會到Io)
三個問題:
Netty的內存類別有哪些?
如何減小多線程內存分配之間的競爭?
不一樣大小的內存如何進行分配的?
內存與內存管理器的抽象
不一樣規格大小和不一樣類別的內存的分配策略
內存的回收過程
ByteBuf結構:
0 <= readerIndex <= writerIndex <= capacity
(讀數據) (寫數據) (擴容)
read,write,set方法
mark和reset方法(保持讀或寫完數據、後能夠返回原樣)
ByteBuf分類:
AbstractByteBuf
PooledHeapByteBuf
UnpooledUnsafeDirectByteBuf
UnpooledDirectByteBuf
PooledUnsafeHeapByteBuf
UnpooledHeapByteBuf
PooledUnsafeDirectByteBuf
PooledDirectByteBuf
UnpooledUnsafeHeapByteBuf
Pooled和Unpooled(預先分配、直接分配)
Unsafe和非Unsafe(直接拿到ByteBuf內存、不會依賴JDK底層的Unsafe對象)
Heap和Direct (在堆上內存分配的、調用JDK的API內存分配的、)
ByteBufAllocator(內存分配管理器)
ByteBufAllocator功能:分配內存、在堆上內存分配、對外內存分配
AbstractByteBufAllocator
UnpooledByteBufAllocator
//heap內存的分配
//direct內存的分配
PooledByteBufAllocator
//拿到線程局部緩存PoolThreadCache
//在線程局部緩存的Area上進行內存分配
directArena分配direct內存的流程
//從對象池裏面拿到PooledByteBuf進行復用
//從緩存上進行內存分配
//從內存堆裏面進行內存分配
內存規格介紹:0、tiny 512B、small 8k、 normal 16M、 huge
(0--8k SubPage) (Page) (Chunk)
緩存數據結構:MemoryRegionCache
queue: chunk handler chunk handler chunk handler
sizeClass: tiny(0~512B) small(512B~8k) normal(8k~16M)
size: N*16B 512B、1k、2k、4k 8k、1六、32k
命中緩存的分配流程
找到對應size的MemoryRegionCache
從queue中彈出一個entry給ByteBuf初始化
將彈出的entry扔到對象池進行服用
arena/chunk /page/subpage
page級別的內存分配:allocateNormal()
嘗試在現有的chunk上分配
建立一個chunk進行內存分配
初始化PooledByteBuf
subpage級別的內存分配:allocateTiny()
定位一個Subpage對象
初始化Subpage
初始化PooledByteBuf
ByteBuf的釋放:
連續的內存區段加到緩存
標記連續的內存區段爲未使用
ByteBuf加到對象池
總結:
ByteBuf的api和分類
分配pooled內存的總步驟
不一樣規格的pooled內存分配與釋放
第八章 Netty解碼(把二進制數據流解析成一個定義協議的數據包,也就是ByteBuf)
兩個問題:
解碼器抽象的解碼過程
答:(ByteToMessageDecoder)
netty裏面有哪些拆箱即用的解碼器
答:有以下四中!!!
解碼器基類
ByteToMessageDecoder解碼步驟
累加字節流
調用子類的decode方法進行解析
將解析到的ByteBuf向下傳播
Netty中常見的解碼器分析
基於固定長度解碼器(netty中較爲簡單的解碼器)
基於行解碼器: LineBasedFrameDecoder
基於分隔符解碼器:DelimiterBasedFrameDecoder
基於長度域解碼器:LengthFieldBasedFrameDecoder
長度域解碼器步驟
計算須要抽取的數據包長度
跳過字節邏輯處理
丟棄模式下的處理
第九章 Netty編碼
一個問題:
如何把對象變成字節流,最終寫到socket底層?
write And Flush()
Head - encoder - ... - biz - Tail
從tail節點開始往前傳播
逐個調用channelHandler的write方法
逐個調用channelHandler的flush方法
編碼器處理邏輯:MessageToByteEncoder
//匹配對象
//分配內存
//編碼實現
//釋放對象
//傳播數據
//釋放內存
write-寫buffer隊列
direct化ByteBuf(若是不是對外內存、就轉換爲對外內存)
插入寫隊列
設置寫狀態
flush-刷新buffer隊列
添加刷新標誌並設置寫狀態
遍歷buffer隊列,過濾ByteBuf
調用jdk底層api進行自旋寫
第十章 Netty性能優化工具類解析
兩大性能優化工具類:
FastThreadLocal的實現機制
FastThreadLocal的建立
FastThreadLocal的get()方法實現
獲取ThreadLocalMap
直接經過索引取出對象
初始化
FastThreadlocal的set()方法實現
獲取ThreadLocalMap
直接經過索引set對象
Recycler(對象池,減小GC的壓力、不須要每次new對象、)
Recycler的建立
Thread
ratioMask控制對象回收頻率
maxCapacity池子最大大小
maxDelayedQueues
(head)(prev)(cursor )
availableSharedCapacity 緩存最大的個數
Recycler獲取對象
獲取當前線程的Stack
從Stack裏面彈出對象
建立對象並綁定到Stack
回收對象到Recycler
同線程回收對象
異線程回收對象:一個對象在一個線程中建立、在另外一個線程中回收
獲取WeakOrderQueue
建立WeakOrderQueue
將對象追加到WeakOrderQueue
異線程收割對象
第十一章 Netty設計模式的應用
單例模式:ReadTimeoutException/MqttEncoder
一個類全局只有一個對象
延遲建立
避免線程安全問題
策略模式:DefaultEventExecutorChooserFactory
封裝一系列可相互替換的算法家族
動態選擇某一個策略
裝飾者模式:WrappedByteBuf
觀察者模式:
觀察者和被觀察者
觀察者訂閱消息,被觀察者發佈消息
訂閱則能收到,取消訂閱則收不到
迭代器模式:
迭代器接口
對容器裏面各個對象進行訪問
責任鏈模式:(使得多個對象都有機會處理同一個對象,)
責任處理器接口 :ChannelHandler
建立鏈、添加刪除責任處理器接口 :ChannelPipeline
上下文 :ChannelHandlerContext
責任終止機制
第十二章 Netty高併發性能調優
單機百萬鏈接調優
如何模擬百萬鏈接
/data/centos6.5/server/server.jar
netty-study mvn package -DskipTests maven打成jar包:
突破局部文件句柄限制
ulimit -n
vi /etc/security/limits.conf添加以下:
* hard nofile 1000000 (最大文件數)
* soft nofile 1000000
exit退出虛擬機
server vagrant reload重啓虛擬機
server vagrant ssh登陸虛擬機
./start.sh啓動客戶端/服務端
突破全局文件句柄限制
cat /proc/sys/fs/file-max
sudo -s
echo 20000 > /proc/sys/fs/file-max
sudo vi /etc/sysctl.conf添加以下:
fs.file-max = 1000000
sudo sysctl -p
Netty應用級別性能調優
完結!
-----------------------------------------------------------------------------------
在我看來Channel、Buffer、Selector構成了核心的APIJava NIO的主要Channel實現:包含UDP/TCP/網絡IO/文件IO FileChannel DatagramChannel SocketChannel ServerSocketChannelJava NIO的主要Buffer的實現: ByteBuffer CharBuffer DoubleBuffer FloatBuffer intBuffer LongBuffer ShortBuffer MappedByteBuffer,表示內存映射文件,S