Java NIO中的SocketChannel是一個鏈接到TCP網絡套接字的通道。能夠經過如下2種方式建立SocketChannel:java
打開一個SocketChannel並鏈接到互聯網上的某臺服務器。服務器
一個新鏈接到達ServerSocketChannel時,會建立一個SocketChannel。網絡
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
Java NIO中的 ServerSocketChannel 是一個能夠監聽新進來的TCP鏈接的通道, 就像標準IO中的ServerSocket同樣。app
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //do something with socketChannel... }
如如下代碼,socket
MyClient.javaide
package com.lyx.nio.socket; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.logging.Level; import java.util.logging.Logger; public class MyClient { private final static Logger logger = Logger.getLogger(MyClient.class.getName()); public static void main(String[] args) throws Exception { for (int i = 0; i < 100; i++) { final int idx = i; new Thread(new MyRunnable(idx)).start(); } } private static final class MyRunnable implements Runnable { private final int idx; private MyRunnable(int idx) { this.idx = idx; } public void run() { //SocketChannel是一個鏈接到TCP網絡套接字的通道 SocketChannel socketChannel = null; try { //建立一個SocketChannel socketChannel = SocketChannel.open(); //InetSocketAddress此類實現IP套接字地址(IP 地址 + 端口號) SocketAddress socketAddress = new InetSocketAddress("localhost", 10000); socketChannel.connect(socketAddress); MyRequestObject myRequestObject = new MyRequestObject("request_" + idx, "request_" + idx); logger.log(Level.INFO, myRequestObject.toString()); sendData(socketChannel, myRequestObject); MyResponseObject myResponseObject = receiveData(socketChannel); logger.log(Level.INFO, myResponseObject.toString()); } catch (Exception ex) { logger.log(Level.SEVERE, null, ex); } finally { try { socketChannel.close(); } catch (Exception ex) { } } } private void sendData(SocketChannel socketChannel, MyRequestObject myRequestObject) throws IOException { byte[] bytes = SerializableUtil.toBytes(myRequestObject); ByteBuffer buffer = ByteBuffer.wrap(bytes); socketChannel.write(buffer); socketChannel.socket().shutdownOutput(); } private MyResponseObject receiveData(SocketChannel socketChannel) throws IOException { MyResponseObject myResponseObject = null; ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { //字節緩衝區ByteBuffer //是一個直接字節緩衝區 ByteBuffer buffer = ByteBuffer.allocateDirect(1024); byte[] bytes; int count = 0; while ((count = socketChannel.read(buffer)) >= 0) { /** * flip()方法 * flip方法將Buffer從寫模式切換到讀模式。調用flip()方法會將position設回0,並將limit設置成以前position的值。 * 換句話說,position如今用於標記讀的位置,limit表示以前寫進了多少個byte、char等 —— 如今能讀取多少個byte、char等。 */ buffer.flip(); bytes = new byte[count]; //使用get()方法從Buffer中讀取數據 buffer.get(bytes); baos.write(bytes); buffer.clear(); } bytes = baos.toByteArray(); //反序列化 Object obj = SerializableUtil.toObject(bytes); myResponseObject = (MyResponseObject) obj; socketChannel.socket().shutdownInput(); } finally { try { baos.close(); } catch (Exception ex) { } } return myResponseObject; } } }
MyServer.javathis
package com.lyx.nio.socket; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.logging.Level; import java.util.logging.Logger; public class MyServer { private final static Logger logger = Logger.getLogger(MyServer.class.getName()); public static void main(String[] args) { Selector selector = null; ServerSocketChannel serverSocketChannel = null; try { //打開一個selector selector = Selector.open(); // Create a new server socket and set to non blocking mode serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // Bind the server socket to the local host and port serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(new InetSocketAddress(10000)); // Register accepts on the server socket with the selector. This // step tells the selector that the socket wants to be put on the // ready list when accept operations occur, so allowing multiplexed // non-blocking I/O to take place. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // Here's where everything happens. The select method will // return when any operations registered above have occurred, the // thread has been interrupted, etc. while (selector.select() > 0) { // Someone is ready for I/O, get the ready keys Iterator<SelectionKey> it = selector.selectedKeys().iterator(); // Walk through the ready keys collection and process date requests. while (it.hasNext()) { SelectionKey readyKey = it.next(); it.remove(); // The key indexes into the selector so you // can retrieve the socket that's ready for I/O execute((ServerSocketChannel) readyKey.channel()); } } } catch (ClosedChannelException ex) { logger.log(Level.SEVERE, null, ex); } catch (IOException ex) { logger.log(Level.SEVERE, null, ex); } finally { try { selector.close(); } catch (Exception ex) { } try { serverSocketChannel.close(); } catch (Exception ex) { } } } private static void execute(ServerSocketChannel serverSocketChannel) throws IOException { SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); MyRequestObject myRequestObject = receiveData(socketChannel); logger.log(Level.INFO, myRequestObject.toString()); MyResponseObject myResponseObject = new MyResponseObject( "response for " + myRequestObject.getName(), "response for " + myRequestObject.getValue()); sendData(socketChannel, myResponseObject); logger.log(Level.INFO, myResponseObject.toString()); } finally { try { socketChannel.close(); } catch (Exception ex) { } } } private static MyRequestObject receiveData(SocketChannel socketChannel) throws IOException { MyRequestObject myRequestObject = null; ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteBuffer buffer = ByteBuffer.allocate(1024); try { byte[] bytes; int size = 0; /** * 從SocketChannel中讀取數據 * socketChannel.read(buffer) * 首先,分配一個Buffer。從SocketChannel讀取到的數據將會放到這個Buffer中。 * 而後,調用SocketChannel.read()。該方法將數據從SocketChannel 讀到Buffer中。 * read()方法返回的int值表示讀了多少字節進Buffer裏。若是返回的是-1,表示已經讀到了流的末尾(鏈接關閉了)。 */ while ((size = socketChannel.read(buffer)) >= 0) { buffer.flip(); bytes = new byte[size]; buffer.get(bytes); baos.write(bytes); buffer.clear(); } bytes = baos.toByteArray(); Object obj = SerializableUtil.toObject(bytes); myRequestObject = (MyRequestObject) obj; } finally { try { baos.close(); } catch (Exception ex) { } } return myRequestObject; } private static void sendData(SocketChannel socketChannel, MyResponseObject myResponseObject) throws IOException { byte[] bytes = SerializableUtil.toBytes(myResponseObject); ByteBuffer buffer = ByteBuffer.wrap(bytes); /** * 寫入 SocketChannel * 寫數據到SocketChannel用的是SocketChannel.write()方法, * 該方法以一個Buffer做爲參數 */ socketChannel.write(buffer); } }
輔助類spa
MyRequestObject.java.net
package com.lyx.nio.socket; import java.io.Serializable; public class MyRequestObject implements Serializable { private static final long serialVersionUID = 1L; private String name; private String value; private byte[] bytes; public MyRequestObject(String name, String value) { this.name = name; this.value = value; this.bytes = new byte[1024]; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } @Override public String toString() { StringBuffer sb = new StringBuffer(); sb.append("Request [name: " + name + ", value: " + value + ", bytes: " + bytes.length + "]"); return sb.toString(); } }
MyResponseObject.javacode
package com.lyx.nio.socket; import java.io.Serializable; public class MyResponseObject implements Serializable { private static final long serialVersionUID = 1L; private String name; private String value; private byte[] bytes; public MyResponseObject(String name, String value) { this.name = name; this.value = value; this.bytes = new byte[1024]; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } @Override public String toString() { StringBuffer sb = new StringBuffer(); sb.append("Response [name: " + name + ", value: " + value + ", bytes: " + bytes.length + "]"); return sb.toString(); } }
SerializableUtil.java
package com.lyx.nio.socket; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class SerializableUtil { public static byte[] toBytes(Object object) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = null; try { oos = new ObjectOutputStream(baos); oos.writeObject(object); byte[] bytes = baos.toByteArray(); return bytes; } catch (IOException ex) { throw new RuntimeException(ex.getMessage(), ex); } finally { try { oos.close(); } catch (Exception e) { } } } public static Object toObject(byte[] bytes) { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = null; try { ois = new ObjectInputStream(bais); Object object = ois.readObject(); return object; } catch (IOException ex) { throw new RuntimeException(ex.getMessage(), ex); } catch (ClassNotFoundException ex) { throw new RuntimeException(ex.getMessage(), ex); } finally { try { ois.close(); } catch (Exception e) { } } } }
==========END==========