java nio廣播服務器

最近看了《netty in action》,關於netty的線程模型不太理解,因而學習了一下java nio的知識,利用java nio寫個簡單的服務器便於理解。html

java nio有3個重要的概念, Channels ,Buffers ,Selectors。經過他們咱們能夠用單個的線程監聽多個數據通道。
java nio能夠進行阻塞的io操做,也能夠進行非阻塞的io操做,咱們更可能是用非阻塞式的操做。

參考文章

Java NIO 系列教程java

NIO 入門ios

完整代碼.碼雲git

測試工具

USR-TCP232
SocketTool2web

服務器端使用兩個線程,一個線程負責 accept 鏈接,另外一個線程負責處理 接收到的數據 服務器

accept代碼

package me.zingon.nioserver;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;

/** * Created by zhidi on 2018-6-29. */
public class SocketAccepter implements Runnable{

    private Integer port = 9999;

    private ServerSocketChannel server=null;

    //接受到的鏈接
    private BlockingQueue<SocketChannel> queue=null;

    Selector acceptSelector=Selector.open();

    public SocketAccepter(Integer port, BlockingQueue<SocketChannel> queue) throws IOException {
        this.port = port;
        this.queue = queue;
    }


    @Override
    public void run() {
        try {
            this.server = ServerSocketChannel.open();
            //配置爲非阻塞
            this.server.configureBlocking(false);
            //註冊accept事件
            this.server.register(acceptSelector, SelectionKey.OP_ACCEPT);
            this.server.bind(new InetSocketAddress(port));
            System.out.println("服務器在 "+port +"端口啓動");
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            SocketChannel socketChannel=this.server.accept();
        } catch (IOException e) {
            e.printStackTrace();
        }

        while (true){
            int count = 0;
            try {
                count = acceptSelector.selectNow();
            } catch (IOException e) {
                e.printStackTrace();
            }
            if(count ==0 ){
                continue;
            }

            Iterator<SelectionKey> iterator = acceptSelector.selectedKeys().iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                try {
                    //接受鏈接
                    SocketChannel sc = ssc.accept();
                    queue.add(sc);
                    System.out.println("服務器接收鏈接 :" + sc);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                iterator.remove();
            }
        }
    }
}

處理線程

package me.zingon.nioserver;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;

/** * Created by zhidi on 2018-6-29. */
public class SocketLoop implements Runnable {

    //已接受鏈接
    private BlockingQueue<SocketChannel> queue;

    //讀selector
    private Selector readSelector=Selector.open();
    //寫selector
    private Selector writeSelector=Selector.open();

    private ByteBuffer readBuf=ByteBuffer.allocate(1024*64);

    private Queue<String> msgQueue=new LinkedList<>();


    public SocketLoop(BlockingQueue<SocketChannel> queue) throws IOException {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("服務器處理線程啓動");
        while (true){
            //處理已接收鏈接
            registerNewChannel();
            try {
                readFromChannels();
                writeToChannels();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    //給socketChannel註冊讀/寫時間
    private void registerNewChannel(){
        SocketChannel sc=null;
        while( (sc = queue.poll()) != null){
            try {
                sc.configureBlocking(false);
                sc.register(readSelector, SelectionKey.OP_READ);
                sc.register(writeSelector,SelectionKey.OP_WRITE);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    //從讀就緒的channel中讀取數據,添加到msgQueue中
    private void readFromChannels() throws IOException {
        int count = readSelector.selectNow();
        if(count == 0){
            return;
        }
        Iterator<SelectionKey> iterator = readSelector.selectedKeys().iterator();
        while (iterator.hasNext()){
            SelectionKey key =  iterator.next();
            if(!key.isValid()){
                continue;
            }
            SocketChannel channel = (SocketChannel) key.channel();
            try {
                channel.read(readBuf);
                readBuf.flip();
                StringBuilder sb=new StringBuilder();
                while(readBuf.hasRemaining()) {
                    sb.append((char)readBuf.get());
                }
                readBuf.compact();
                System.out.println(sb.toString());
                msgQueue.add(sb.toString());
            } catch (IOException e) {
                e.printStackTrace();
                key.cancel();
                channel.socket().close();
                channel.close();
            }
            iterator.remove();
        }
    }

    //當寫就緒而且msgQueue不爲空時,將msgQueue中的數據發送給全部寫就緒channel
    private void writeToChannels() throws IOException {
        int count = writeSelector.selectNow();
        if(count == 0){
            return;
        }

        Iterator<SelectionKey> iterator = writeSelector.selectedKeys().iterator();
        String msg=msgQueue.poll();
        while (iterator.hasNext()){
            SelectionKey key =  iterator.next();
            if(!key.isValid()){
                continue;
            }
            if(msg == null){
                iterator.remove();
                return;
            }
            SocketChannel channel = (SocketChannel) key.channel();
            byte[] asd=(Thread.currentThread().getName()+msg).getBytes();
            ByteBuffer bf=ByteBuffer.allocate(asd.length);
            bf.put(asd);
            bf.flip();
            while(bf.hasRemaining()) {
                try {
                    channel.write(bf);
                } catch (IOException e) {
                    key.cancel();
                    channel.socket().close();
                    channel.close();
                    e.printStackTrace();
                }
            }
            bf.clear();
            iterator.remove();
        }
    } 
}
相關文章
相關標籤/搜索