JAVA NIO 實例

JAVA NIO的服務器端實現java

package com.flyer.cn.javaIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class EchoServer {
	public static SelectorLoop connectionBell;
	public static SelectorLoop readBell;
	public boolean isReadBellRunning=false;
	private ExecutorService thdPool=Executors.newCachedThreadPool();

	public static void main(String[] args) throws IOException {
		new EchoServer().startServer();
	}
	
	// 啓動服務器
	public void startServer() throws IOException {
		// 準備好一個鬧鐘.當有連接進來的時候響.
		connectionBell = new SelectorLoop();
		
		// 準備好一個鬧裝,當有read事件進來的時候響.
		readBell = new SelectorLoop();
		
		// 開啓一個server channel來監聽
		ServerSocketChannel ssc = ServerSocketChannel.open();
		// 開啓非阻塞模式
		ssc.configureBlocking(false);
		
		ServerSocket socket = ssc.socket();
		socket.bind(new InetSocketAddress("localhost",7878));
		
		// 給鬧鐘規定好要監聽報告的事件,這個鬧鐘只監聽新鏈接事件.
		ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
		new Thread(connectionBell,"connectionBell").start();
	}
	
	// Selector輪詢線程類
	public class SelectorLoop implements Runnable {
		private Selector selector;
		private ByteBuffer temp = ByteBuffer.allocate(1024);
		
		public SelectorLoop() throws IOException {
			this.selector = Selector.open();
		}
		
		public Selector getSelector() {
			return this.selector;
		}

		@Override
		public void run() {
			while(true) {
				try {
				    // 阻塞,只有當至少一個註冊的事件發生的時候纔會繼續.
					this.selector.select();
					
					Set<SelectionKey> selectKeys = this.selector.selectedKeys();
					Iterator<SelectionKey> it = selectKeys.iterator();
					while (it.hasNext()) {
						SelectionKey key = it.next();
						it.remove();
						if (key.isAcceptable()) {
							// 這是一個connection accept事件, 而且這個事件是註冊在serversocketchannel上的.
							ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
							// 接受一個鏈接.
							SocketChannel sc = ssc.accept();
							
							// 對新的鏈接的channel註冊read事件. 使用readBell鬧鐘.
							sc.configureBlocking(false);
							sc.register(readBell.getSelector(), SelectionKey.OP_READ);
							
							// 若是讀取線程尚未啓動,那就啓動一個讀取線程.
							synchronized(EchoServer.this) {
								if (!EchoServer.this.isReadBellRunning) {
									EchoServer.this.isReadBellRunning = true;
									new Thread(readBell,"readBell").start();
								}
							}
							
						} 
						else if (key.isReadable()){
							// 這是一個read事件,而且這個事件是註冊在socketchannel上的.
							SocketChannel sc = (SocketChannel) key.channel();
							// 寫數據到buffer
							int count = sc.read(temp);
							if (count < 0) {
								// 客戶端已經斷開鏈接.
								key.cancel();
								sc.close();
								return;
							}
							// 切換buffer到讀狀態,內部指針歸位.
							temp.flip();
							String msg = Charset.forName("UTF-8").decode(temp).toString();
							System.out.println(new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + sc.getRemoteAddress());
							
							// 清空buffer
							temp.clear();
					
						thdPool.submit(new Dispatch(sc,msg));
						}
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
		
		public class Dispatch implements Runnable{
			private SocketChannel sc;
			private String msg;
			public Dispatch(SocketChannel _sc,String _msg){
				this.sc=_sc;
				this.msg=_msg;
			}

			public void run() {
				try{
					Thread.sleep(1000);
					// echo back.
					sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
				}
			catch(Exception ex){
				ex.printStackTrace();
			}
			}
		}
		
		
	}

}

JAVA NIO的客戶端實現服務器

package com.flyer.cn.javaIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class Client implements Runnable {
	// 空閒計數器,若是空閒超過10次,將檢測server是否中斷鏈接.
	private String clientName;
	private static int idleCounter = 0;
	private Selector selector;
	private SocketChannel socketChannel;
	private ByteBuffer temp = ByteBuffer.allocate(1024);

	public static void main(String[] args) throws IOException {
		for(int i=0;i<100;i++){
			Client client= new Client("client"+i);
			new Thread(client).start();
			//client.sendFirstMsg();
		}
	}
	
	public Client(String name) throws IOException {
		this.clientName=name;
		// 一樣的,註冊鬧鐘.
		this.selector = Selector.open();
		
		// 鏈接遠程server
		socketChannel = SocketChannel.open();
		// 若是快速的創建了鏈接,返回true.若是沒有創建,則返回false,並在鏈接後出發Connect事件.
		Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 7878));
		socketChannel.configureBlocking(false);
		SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
		
		if (isConnected) {
			this.sendFirstMsg();
		} else {
			// 若是鏈接還在嘗試中,則註冊connect事件的監聽. connect成功之後會出發connect事件.
		    key.interestOps(SelectionKey.OP_CONNECT);
		}
	}
	
	public void sendFirstMsg() throws IOException {
		String msg = "Hello NIO.From "+this.clientName;
		socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
	}

	@Override
	public void run() {
        while (true) {
			try {
				// 阻塞,等待事件發生,或者1秒超時. num爲發生事件的數量.
				int num = this.selector.select(1000);
				if (num ==0) {
					idleCounter ++;
					if(idleCounter >10) {
						// 若是server斷開了鏈接,發送消息將失敗.
						try {
						    this.sendFirstMsg();
						} catch(ClosedChannelException e) {
							e.printStackTrace();
							this.socketChannel.close();
							return;
						}
					}
					continue;
				} else {
					idleCounter = 0;
				}
				Set<SelectionKey> keys = this.selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();
					if (key.isConnectable()) {
						// socket connected
						SocketChannel sc = (SocketChannel)key.channel();
						if (sc.isConnectionPending()) {
						    sc.finishConnect();
						}
						// send first message;
						this.sendFirstMsg();
					}
					if (key.isReadable()) {
						// msg received.
						SocketChannel sc = (SocketChannel)key.channel();
						this.temp = ByteBuffer.allocate(1024);
						int count = sc.read(temp);
						if (count<0) {
							sc.close();
							continue;
						}
						// 切換buffer到讀狀態,內部指針歸位.
						temp.flip();
						String msg = Charset.forName("UTF-8").decode(temp).toString();
						System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress()+new Date().toLocaleString());
						
						Thread.sleep(1000);
						// echo back.
						sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
						
						// 清空buffer
						temp.clear();
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

這個實例在高併發下會產生粘包和段包的問題,解決方法參考下一篇併發

相關文章
相關標籤/搜索