NIO如何多線程操做

由於NIO自己是非阻塞的,因此他的消息選擇器Selector能夠在單線程下鏈接多臺客戶端的訪問。java

爲了增強NIO的性能,咱們加入多線程的操做,固然NIO並不能簡單的把Selector.select()放入Executor.execute(Runnable)的run方法中。bootstrap

爲完成NIO的多線程,咱們應該有一個調度類,一個服務類。數組

調度類的目的是初始化必定數量的線程,以及線程交接。多線程

package com.netty.nionetty.pool;

import com.netty.nionetty.NioServerBoss;
import com.netty.nionetty.NioServerWorker;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioSelectorRunnablePool {
    private final AtomicInteger bossIndex = new AtomicInteger();
    //歡迎線程數組
    private Boss[] bosses;
    private final AtomicInteger workerIndex = new AtomicInteger();
    //工做線程數組
    private Worker[] workers;
    public NioSelectorRunnablePool(Executor boss,Executor worker) {
        initBoss(boss,1);
        initWorker(worker,Runtime.getRuntime().availableProcessors() * 2);
    }
    //初始化1個歡迎線程
    private void initBoss(Executor boss,int count) {
        this.bosses = new NioServerBoss[count];
        for (int i = 0;i < bosses.length;i++) {
            bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this);
        }
    }
    //初始化2倍計算機核數的工做線程
    private void initWorker(Executor worker,int count) {
        this.workers = new NioServerWorker[count];
        for (int i = 0; i < workers.length;i++) {
            workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this);
        }
    }
    //交接工做線程(從工做線程數組中挑出)
    public Worker nextWorker() {
        return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
    }
    //交接歡迎線程(從歡迎線程數組中挑出)
    public Boss nextBoss() {
        return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
    }
}

另外帶一個歡迎線程接口,一個工做線程接口socket

package com.netty.nionetty.pool;

import java.nio.channels.ServerSocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public interface Boss {
    void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.netty.nionetty.pool;

import java.nio.channels.SocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public interface Worker {
    void registerNewChannelTask(SocketChannel channel);
}

有兩種線程(歡迎線程和工做線程),因此咱們有一個抽象線程類ide

package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Created by Administrator on 2018-05-17.
 */
public abstract class AbstractNioSelector implements Runnable {
    //線程池
    private final Executor executor;
    //NIO消息選擇器
    protected Selector selector;
    protected final AtomicBoolean wakeUp = new AtomicBoolean();
    //線程任務隊列
    private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
    //線程名
    private String threadName;
    //線程調度器
    protected NioSelectorRunnablePool selectorRunnablePool;
    AbstractNioSelector(Executor executor,String threadName,NioSelectorRunnablePool selectorRunnablePool) {
        this.executor = executor;
        this.threadName = threadName;
        this.selectorRunnablePool = selectorRunnablePool;
        openSelector();
    }
    private void openSelector() {
        try {
            this.selector = Selector.open(); //打開消息選擇器
        } catch (IOException e) {
            e.printStackTrace();
        }
        //把線程放入線程池,開始執行run方法
        executor.execute(this);
    }

    public void run() {
        Thread.currentThread().setName(this.threadName);
        while (true) {
            try {
                wakeUp.set(false);  //把消息選擇器的狀態定爲未喚醒狀態
                select(selector);   //消息選擇器選擇消息方式
                processTaskQueue(); //由於在主程序中綁定端口的時候已經註冊了接收通道任務線程,因此這裏是讀出任務。
                process(selector);  //任務處理,歡迎線程跟工做線程各不相同
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    //歡迎線程跟工做線程各自添加不一樣的線程,再把消息選擇器喚醒
    protected final void registerTask(Runnable task) {
        taskQueue.add(task);
        Selector selector = this.selector;
        if (selector != null) {
            if (wakeUp.compareAndSet(false,true)) {
                selector.wakeup();
            }
        }else {
            taskQueue.remove(task);
        }
    }
    public NioSelectorRunnablePool getSelectorRunnablePool() {
        return selectorRunnablePool;
    }

    private void processTaskQueue() {
        for (;;) {
            final Runnable task = taskQueue.poll();
            if (task == null) {
                break;
            }
            task.run();
        }
    }
    protected abstract int select(Selector selector) throws IOException;
    protected abstract void process(Selector selector) throws IOException;
}

歡迎線程跟工做線程的具體實現性能

package com.netty.nionetty;

import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;

import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioServerBoss extends AbstractNioSelector implements Boss {
    public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(executor,threadName,selectorRunnablePool);
    }
    //註冊接收任務,會先調用抽象類,把任務線程先添加到任務隊列,再註冊接收消息類型
    public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            public void run() {
                try {
                    serverChannel.register(selector,SelectionKey.OP_ACCEPT);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select();
    }
    //NIO操做,開始接收,接收後再啓用工做線程,接收線程依然存在,並且工做線程也不斷給到線程池未使用線程
    //具體看初始化的時候初始了多少工做線程,可是是幾個鏈接對應一個工做線程。
    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator();i.hasNext();) {
            SelectionKey key = i.next();
            i.remove();
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Worker nextWorker = getSelectorRunnablePool().nextWorker();
            nextWorker.registerNewChannelTask(channel);
            System.out.println("新客戶端鏈接");
        }
    }
}
package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;
import com.netty.nionetty.pool.Worker;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Created by Administrator on 2018-05-17.
 */
public class NioServerWorker extends AbstractNioSelector implements Worker {
    public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(executor,threadName,selectorRunnablePool);
    }

    public void registerNewChannelTask(final SocketChannel channel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            public void run() {
                try {
                    channel.register(selector,SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select(500);
    }

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey)ite.next();
            ite.remove();
            SocketChannel channel = (SocketChannel)key.channel();
            int ret = 0;
            boolean failure = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                ret = channel.read(buffer);
                failure = false;
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (ret <= 0 || failure) {
                key.cancel();
                System.out.println("客戶端斷開鏈接");
            }else {
                System.out.println("收到數據:" + new String(buffer.array()));
                ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
                channel.write(outBuffer);
            }
        }
    }
}

服務類this

package com.netty.nionetty;

import com.netty.nionetty.pool.Boss;
import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;

/**
 * Created by Administrator on 2018-05-17.
 */
public class ServerBootstap {
    private NioSelectorRunnablePool selectorRunnablePool;
    public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) {
        this.selectorRunnablePool = selectorRunnablePool;
    }
    public void bind(final SocketAddress localAddress) {
        try {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(localAddress);
            Boss nextBoss = selectorRunnablePool.nextBoss();
            nextBoss.registerAcceptChannelTask(serverChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

主程序atom

package com.netty.nionetty;

import com.netty.nionetty.pool.NioSelectorRunnablePool;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2018-05-17.
 */
public class Start {
    public static void main(String[] args) {
        NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
        ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool);
        bootstrap.bind(new InetSocketAddress(10101));
        System.out.println("start");
    }
}

其實最主要的就是在線程調度器中,各類線程已經被初始化存在於線程池內存中了,因此後面只是把這些線程拿出來,並註冊消息類型,進行處理,這就是NIO的多線程處理了。spa

相關文章
相關標籤/搜索