java nio模型理解

 

一、tcp信道,具體參數詳情參考apihtml

ServerSocketChannel:建立、接收、關閉、讀寫、阻塞java

SocketChannel:建立、鏈接、關閉、讀寫、阻塞(測試鏈接性)react

二、Selector:建立、關閉選擇器api

案例一:socket

NIOAccepter服務端線程tcp

package com.warehouse.data.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

import javax.imageio.IIOException;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 9:55
 **/
public class NIOAcceptor extends Thread {
    private Selector selector;
    private ServerSocketChannel channel;
    private NIOReactorPool reactorPool;


    public NIOAcceptor(String name, String host, int port, NIOReactorPool reactorPool) throws IOException {
        super(name);
        this.reactorPool = reactorPool;
        //獲取一個管理通道器
        this.selector = Selector.open();
        //獲取一個接受鏈接socket
        this.channel = ServerSocketChannel.open();
        //設置非阻塞
        this.channel.configureBlocking(false);
        //綁定端口
        this.channel.bind(new InetSocketAddress(host, port));
        //註冊事件
        this.channel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("start NIOAcceptor thread server.");
    }


    @Override
    public void run() {
        final Selector selector = this.selector;
        //輪詢
        for (; ; ) {
            try {
                //阻塞,直到select事件到達
                selector.select(1000L);
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                try {
                    for (SelectionKey key : selectionKeySet) {
                        if (key.isValid() && key.isAcceptable()) {
                            accept(selector);
                        }/* else if (key.isValid() && key.isReadable()) {
                            Processor processor = (Processor) key.attachment();
                            try{
                                processor.process(key);
                            }catch (IOException e){
                                processor.close();
                            }
                        } */else {
                            key.cancel();
                        }
                    }
                } finally {
                    selectionKeySet.clear();
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }


    public void accept(Selector selector) {
        SocketChannel socketChannel = null;
        try {
            System.out.println("accept client success.");
            socketChannel = this.channel.accept();
            socketChannel.configureBlocking(false);

            //單個Reactor線程處理
            //SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            //selectionKey.attach(new Processor(socketChannel));
            //多個Reactor線程處理
            NIOReactor reactor = this.reactorPool.getNextReactor();
            reactor.postRegister(new Processor(socketChannel));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

NIOReactorPool線程池ide

package com.warehouse.data.nio;

import java.io.IOException;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:50
 **/
public class NIOReactorPool {

    private final NIOReactor[] nioReactors;
    private volatile int nextReactor;

    public NIOReactorPool(String name, int poolSize) throws IOException {
        nioReactors = new NIOReactor[poolSize];
        for (int i = 0; i < poolSize; i++) {
            NIOReactor nioReactor = new NIOReactor(name + "-" + i);
            nioReactors[i] = nioReactor;
            nioReactor.startup();
        }
    }


    public NIOReactor getNextReactor() {
        int i = ++nextReactor;
        if (i > nioReactors.length) {
            i = nextReactor = 0;
        }
        return nioReactors[i];
    }
}

 

NIOReactor線程post

package com.warehouse.data.nio;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:51
 **/
public final class NIOReactor {
    private final String name;
    private final RWThread reactorR;

    public NIOReactor(String name) throws IOException {
        this.name = name;
        this.reactorR = new RWThread();
    }

    public void startup() {
        new Thread(reactorR, this.name + "-RW").start();
    }

    public void postRegister(Processor processor) {
        this.reactorR.registerQueue.offer(processor);
        this.reactorR.selector.wakeup();
    }

    private final class RWThread extends Thread {
        private final Selector selector;
        private final ConcurrentLinkedQueue<Processor> registerQueue;

        public RWThread() throws IOException {
            this.selector = Selector.open();
            this.registerQueue = new ConcurrentLinkedQueue<Processor>();
        }

        @Override
        public void run() {
            final Selector selector = this.selector;
            Set<SelectionKey> selectionKeySet = null;

            for (; ; ) {
                try {
                    selector.select(1000L);
                    register(selector);
                    selectionKeySet = selector.selectedKeys();
                    for (SelectionKey key : selectionKeySet) {
                        Object att = key.attachment();
                        Processor processor = null;
                        try {
                            if (att != null && key.isValid()) {
                                processor = (Processor) att;
                                if (key.isReadable()) {
                                    processor.process(key);
                                }
                                if (key.isWritable()) {

                                }
                            } else {
                                key.channel();
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                            if(processor != null){
                                processor.close();
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (selectionKeySet != null) {
                        selectionKeySet.clear();
                    }
                }
            }
        }


        private void register(Selector selector) {
            if (this.registerQueue.isEmpty()) {
                return;
            }
            Processor processor = null;
            while ((processor = this.registerQueue.poll()) != null) {
                try {
                    processor.register(selector);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        }
    }


}

 

Processor事件處理測試

 

package com.warehouse.data.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 10:49
 **/
public class Processor {
    private SocketChannel channel;
    private SelectionKey selectionKey;

    public Processor(SocketChannel channel) {
        this.channel = channel;
    }

    public void register(Selector selector) throws ClosedChannelException {
        selectionKey = this.channel.register(selector, SelectionKey.OP_READ, this);
    }

    public void process(SelectionKey key) throws IOException {
        //能夠採用線程池處理
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int count = socketChannel.read(byteBuffer);
        System.out.println(new String(byteBuffer.array()));
    }



    public void close(){
        if(this.channel != null){
            try {
                this.channel.close();
                this.selectionKey.cancel();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Server服務端this

 

package com.warehouse.data.nio;

import java.io.IOException;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:03
 **/
public class Server {

    public static void main(String[] args) throws IOException {
        //5個reactor線程
        NIOReactorPool nioReactorPool = new NIOReactorPool("NIOReactor-IO", 5);
        NIOAcceptor nioAcceptor = new NIOAcceptor("NIOAcceptor-IO", "127.0.0.1", 8888, nioReactorPool);
        nioAcceptor.start();


    }
}

Client客戶端

 

package com.warehouse.data.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * ${DESCRIPTION}
 * package com.warehouse.data.nio
 *
 * @author zli [liz@yyft.com]
 * @version v1.0
 * @create 2017-03-28 11:00
 **/
public class Client {


    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);


        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));


        if(socketChannel.isConnectionPending()){
            //要finishConnect,不然會出現NotYetConnectedException異常
            socketChannel.finishConnect();
            socketChannel.write(ByteBuffer.wrap(new String("hello world").getBytes()));
        }

    }
}

 

資料:具體能夠參考netty權威指南

http://www.cnblogs.com/good-temper/p/5003892.html

相關文章
相關標籤/搜索