由於NIO自己是非阻塞的,因此他的消息選擇器Selector能夠在單線程下鏈接多臺客戶端的訪問。java
爲了增強NIO的性能,咱們加入多線程的操做,固然NIO並不能簡單的把Selector.select()放入Executor.execute(Runnable)的run方法中。bootstrap
爲完成NIO的多線程,咱們應該有一個調度類,一個服務類。數組
調度類的目的是初始化必定數量的線程,以及線程交接。多線程
package com.netty.nionetty.pool; import com.netty.nionetty.NioServerBoss; import com.netty.nionetty.NioServerWorker; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Administrator on 2018-05-17. */ public class NioSelectorRunnablePool { private final AtomicInteger bossIndex = new AtomicInteger(); //歡迎線程數組 private Boss[] bosses; private final AtomicInteger workerIndex = new AtomicInteger(); //工做線程數組 private Worker[] workers; public NioSelectorRunnablePool(Executor boss,Executor worker) { initBoss(boss,1); initWorker(worker,Runtime.getRuntime().availableProcessors() * 2); } //初始化1個歡迎線程 private void initBoss(Executor boss,int count) { this.bosses = new NioServerBoss[count]; for (int i = 0;i < bosses.length;i++) { bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this); } } //初始化2倍計算機核數的工做線程 private void initWorker(Executor worker,int count) { this.workers = new NioServerWorker[count]; for (int i = 0; i < workers.length;i++) { workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this); } } //交接工做線程(從工做線程數組中挑出) public Worker nextWorker() { return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; } //交接歡迎線程(從歡迎線程數組中挑出) public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } }
另外帶一個歡迎線程接口,一個工做線程接口socket
package com.netty.nionetty.pool; import java.nio.channels.ServerSocketChannel; /** * Created by Administrator on 2018-05-17. */ public interface Boss { void registerAcceptChannelTask(ServerSocketChannel serverChannel); }
package com.netty.nionetty.pool; import java.nio.channels.SocketChannel; /** * Created by Administrator on 2018-05-17. */ public interface Worker { void registerNewChannelTask(SocketChannel channel); }
有兩種線程(歡迎線程和工做線程),因此咱們有一個抽象線程類ide
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.io.IOException; import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** * Created by Administrator on 2018-05-17. */ public abstract class AbstractNioSelector implements Runnable { //線程池 private final Executor executor; //NIO消息選擇器 protected Selector selector; protected final AtomicBoolean wakeUp = new AtomicBoolean(); //線程任務隊列 private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); //線程名 private String threadName; //線程調度器 protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor,String threadName,NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; openSelector(); } private void openSelector() { try { this.selector = Selector.open(); //打開消息選擇器 } catch (IOException e) { e.printStackTrace(); } //把線程放入線程池,開始執行run方法 executor.execute(this); } public void run() { Thread.currentThread().setName(this.threadName); while (true) { try { wakeUp.set(false); //把消息選擇器的狀態定爲未喚醒狀態 select(selector); //消息選擇器選擇消息方式 processTaskQueue(); //由於在主程序中綁定端口的時候已經註冊了接收通道任務線程,因此這裏是讀出任務。 process(selector); //任務處理,歡迎線程跟工做線程各不相同 } catch (IOException e) { e.printStackTrace(); } } } //歡迎線程跟工做線程各自添加不一樣的線程,再把消息選擇器喚醒 protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakeUp.compareAndSet(false,true)) { selector.wakeup(); } }else { taskQueue.remove(task); } } public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; } private void processTaskQueue() { for (;;) { final Runnable task = taskQueue.poll(); if (task == null) { break; } task.run(); } } protected abstract int select(Selector selector) throws IOException; protected abstract void process(Selector selector) throws IOException; }
歡迎線程跟工做線程的具體實現性能
package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; /** * Created by Administrator on 2018-05-17. */ public class NioServerBoss extends AbstractNioSelector implements Boss { public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); } //註冊接收任務,會先調用抽象類,把任務線程先添加到任務隊列,再註冊接收消息類型 public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) { final Selector selector = this.selector; registerTask(new Runnable() { public void run() { try { serverChannel.register(selector,SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(); } //NIO操做,開始接收,接收後再啓用工做線程,接收線程依然存在,並且工做線程也不斷給到線程池未使用線程 //具體看初始化的時候初始了多少工做線程,可是是幾個鏈接對應一個工做線程。 @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator();i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); Worker nextWorker = getSelectorRunnablePool().nextWorker(); nextWorker.registerNewChannelTask(channel); System.out.println("新客戶端鏈接"); } } }
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; /** * Created by Administrator on 2018-05-17. */ public class NioServerWorker extends AbstractNioSelector implements Worker { public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); } public void registerNewChannelTask(final SocketChannel channel) { final Selector selector = this.selector; registerTask(new Runnable() { public void run() { try { channel.register(selector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(500); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey)ite.next(); ite.remove(); SocketChannel channel = (SocketChannel)key.channel(); int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); try { ret = channel.read(buffer); failure = false; } catch (IOException e) { e.printStackTrace(); } if (ret <= 0 || failure) { key.cancel(); System.out.println("客戶端斷開鏈接"); }else { System.out.println("收到數據:" + new String(buffer.array())); ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuffer); } } } }
服務類this
package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.ServerSocketChannel; /** * Created by Administrator on 2018-05-17. */ public class ServerBootstap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } public void bind(final SocketAddress localAddress) { try { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(localAddress); Boss nextBoss = selectorRunnablePool.nextBoss(); nextBoss.registerAcceptChannelTask(serverChannel); } catch (IOException e) { e.printStackTrace(); } } }
主程序atom
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.net.InetSocketAddress; import java.util.concurrent.Executors; /** * Created by Administrator on 2018-05-17. */ public class Start { public static void main(String[] args) { NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start"); } }
其實最主要的就是在線程調度器中,各類線程已經被初始化存在於線程池內存中了,因此後面只是把這些線程拿出來,並註冊消息類型,進行處理,這就是NIO的多線程處理了。spa