此篇文章會詳細解讀由BIO到NIO的逐步演進的心靈路程,爲Reactor-Netty 庫的講解鋪平道路。java
關於Java編程方法論-Reactor與Webflux
的視頻分享,已經完成了Rxjava 與 Reactor,b站地址以下:linux
Rxjava源碼解讀與分享:www.bilibili.com/video/av345…編程
Reactor源碼解讀與分享:www.bilibili.com/video/av353…數組
咱們經過一個BIO的Demo來展現其用法:bash
//服務端
public class BIOServer {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服務端Socket
Socket socket = null;//客戶端socket
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
serverSocket = new ServerSocket(port);
System.out.println(stringNowTime() + ": serverSocket started");
while(true)
{
socket = serverSocket.accept();
System.out.println(stringNowTime() + ": id爲" + socket.hashCode()+ "的Clientsocket connected");
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id爲" + socket.hashCode() + " "+inputContent);
count++;
}
System.out.println("id爲" + socket.hashCode()+ "的Clientsocket "+stringNowTime()+"讀取結束");
}
} catch (IOException e) {
e.printStackTrace();
}finally{
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOServer server = new BIOServer();
server.initBIOServer(8888);
}
}
// 客戶端
public class BIOClient {
public void initBIOClient(String host, int port) {
BufferedReader reader = null;
BufferedWriter writer = null;
Socket socket = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(System.in));
socket = new Socket(host, port);
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
System.out.println("clientSocket started: " + stringNowTime());
while (((inputContent = reader.readLine()) != null) && count < 2) {
inputContent = stringNowTime() + ": 第" + count + "條消息: " + inputContent + "\n";
writer.write(inputContent);//將消息發送給服務端
writer.flush();
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOClient client = new BIOClient();
client.initBIOClient("127.0.0.1", 8888);
}
}
複製代碼
經過上面的例子,咱們能夠知道,不管是服務端仍是客戶端,咱們關注的幾個操做有基於服務端的serverSocket = new ServerSocket(port)
serverSocket.accept()
,基於客戶端的Socket socket = new Socket(host, port);
以及二者都有的讀取與寫入Socket數據的方式,即經過流來進行讀寫,這個讀寫難免經過一箇中間字節數組buffer來進行。服務器
因而,咱們經過源碼來看這些相應的邏輯。咱們先來看ServerSocket.java
這個類的相關代碼。 咱們查看ServerSocket.java
的構造器能夠知道,其最後依然會調用它的bind
方法:數據結構
//java.net.ServerSocket#ServerSocket(int)
public ServerSocket(int port) throws IOException {
this(port, 50, null);
}
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
複製代碼
按照咱們的Demo和上面的源碼可知,這裏傳入的參數endpoint並不會爲null,同時,屬於InetSocketAddress
類型,backlog大小爲50,因而,咱們應該關注的主要代碼邏輯也就是getImpl().bind(epoint.getAddress(), epoint.getPort());
:多線程
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
// 咱們應該關注的主要邏輯
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
複製代碼
這裏getImpl()
,由上面構造器的實現中,咱們有看到setImpl();
,可知,其factory
默認爲null,因此,這裏咱們關注的是SocksSocketImpl
這個類,建立其對象,並將當前ServerSocket
對象設定其中,這個設定的源碼請在SocksSocketImpl
的父類java.net.SocketImpl
中查看。 那麼getImpl也就明瞭了,其實就是咱們Socket的底層實現對應的實體類了,由於不一樣的操做系統內核是不一樣的,他們對於Socket的實現固然會各有不一樣,咱們這點要注意下,這裏針對的是win下面的系統。異步
/** * The factory for all server sockets. */
private static SocketImplFactory factory = null;
private void setImpl() {
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
impl = new SocksSocketImpl();
}
if (impl != null)
impl.setServerSocket(this);
}
/** * Get the {@code SocketImpl} attached to this socket, creating * it if necessary. * * @return the {@code SocketImpl} attached to that ServerSocket. * @throws SocketException if creation fails. * @since 1.4 */
SocketImpl getImpl() throws SocketException {
if (!created)
createImpl();
return impl;
}
/** * Creates the socket implementation. * * @throws IOException if creation fails * @since 1.4 */
void createImpl() throws SocketException {
if (impl == null)
setImpl();
try {
impl.create(true);
created = true;
} catch (IOException e) {
throw new SocketException(e.getMessage());
}
}
複製代碼
咱們再看SocksSocketImpl
的bind方法實現,而後獲得其最後無非是調用本地方法bind0
。socket
//java.net.AbstractPlainSocketImpl#bind
/** * Binds the socket to the specified address of the specified local port. * @param address the address * @param lport the port */
protected synchronized void bind(InetAddress address, int lport) throws IOException {
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpBind(fd, address, lport);
}
}
socketBind(address, lport);
if (socket != null)
socket.setBound();
if (serverSocket != null)
serverSocket.setBound();
}
//java.net.PlainSocketImpl#socketBind
@Override
void socketBind(InetAddress address, int port) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (address == null)
throw new NullPointerException("inet address argument is null.");
if (preferIPv4Stack && !(address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
bind0(nativefd, address, port, useExclusiveBind);
if (port == 0) {
localport = localPort0(nativefd);
} else {
localport = port;
}
this.address = address;
}
//java.net.PlainSocketImpl#bind0
static native void bind0(int fd, InetAddress localAddress, int localport, boolean exclBind) throws IOException;
複製代碼
這裏,咱們還要了解的是,使用了多線程只是可以實現對"業務邏輯處理"的多線程,可是對於數據報文的接收仍是須要一個一個來的,也就是咱們上面Demo中見到的accept以及read方法阻塞問題,多線程是根本解決不了的,那麼首先咱們來看看accept爲何會形成阻塞,accept方法的做用是詢問操做系統是否有新的Socket套接字信息從端口XXX處發送過來,注意這裏詢問的是操做系統,也就是說Socket套接字IO模式的支持是基於操做系統的,若是操做系統沒有發現有套接字從指定端口XXX鏈接進來,那麼操做系統就會等待,這樣accept方法就會阻塞,他的內部實現使用的是操做系統級別的同步IO。
因而,咱們來分析下ServerSocket.accept
方法的源碼過程:
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
複製代碼
首先進行的是一些判斷,接着建立了一個Socket對象(爲何這裏要建立一個Socket對象,後面會講到),執行了implAccept方法,來看看implAccept方法:
/** * Subclasses of ServerSocket use this method to override accept() * to return their own subclass of socket. So a FooServerSocket * will typically hand this method an <i>empty</i> FooSocket. On * return from implAccept the FooSocket will be connected to a client. * * @param s the Socket * @throws java.nio.channels.IllegalBlockingModeException * if this socket has an associated channel, * and the channel is in non-blocking mode * @throws IOException if an I/O error occurs when waiting * for a connection. * @since 1.1 * @revised 1.4 * @spec JSR-51 */
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
if (s.impl == null)
s.setImpl();
else {
s.impl.reset();
}
si = s.impl;
s.impl = null;
si.address = new InetAddress();
si.fd = new FileDescriptor();
getImpl().accept(si); // <1>
SocketCleanable.register(si.fd); // raw fd has been set
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
s.impl = si;
s.postAccept();
}
複製代碼
上面執行了<1>處getImpl的accept方法以後,咱們在AbstractPlainSocketImpl找到accept方法:
//java.net.AbstractPlainSocketImpl#accept
/** * Accepts connections. * @param s the connection */
protected void accept(SocketImpl s) throws IOException {
acquireFD();
try {
socketAccept(s);
} finally {
releaseFD();
}
}
複製代碼
能夠看到他調用了socketAccept方法,由於每一個操做系統的Socket地實現都不一樣,因此這裏Windows下就執行了咱們PlainSocketImpl裏面的socketAccept方法:
// java.net.PlainSocketImpl#socketAccept
@Override
void socketAccept(SocketImpl s) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (s == null)
throw new NullPointerException("socket is null");
int newfd = -1;
InetSocketAddress[] isaa = new InetSocketAddress[1];
if (timeout <= 0) { //<1>
newfd = accept0(nativefd, isaa); // <2>
} else {
configureBlocking(nativefd, false);
try {
waitForNewConnection(nativefd, timeout);
newfd = accept0(nativefd, isaa); // <3>
if (newfd != -1) {
configureBlocking(newfd, true);
}
} finally {
configureBlocking(nativefd, true);
}
} // <4>
/* Update (SocketImpl)s' fd */
fdAccess.set(s.fd, newfd);
/* Update socketImpls remote port, address and localport */
InetSocketAddress isa = isaa[0];
s.port = isa.getPort();
s.address = isa.getAddress();
s.localport = localport;
if (preferIPv4Stack && !(s.address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
}
//java.net.PlainSocketImpl#accept0
static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;
複製代碼
這裏<1>到<4>之間是咱們關注的代碼,<2>和<3>執行了accept0方法,這個是native方法,具體來講就是與操做系統交互來實現監聽指定端口上是否有客戶端接入,正是由於accept0在沒有客戶端接入的時候會一直處於阻塞狀態,因此形成了咱們程序級別的accept方法阻塞,固然對於程序級別的阻塞,咱們是能夠避免的,也就是咱們能夠將accept方法修改爲非阻塞式,可是對於accept0形成的阻塞咱們暫時是無法改變的,操做系統級別的阻塞其實就是咱們一般所說的同步異步中的同步了。 前面說到咱們能夠在程序級別改變accept的阻塞,具體怎麼實現?其實就是經過咱們上面socketAccept方法中判斷timeout的值來實現,在第<1>處判斷timeout的值若是小於等於0,那麼直接執行accept0方法,這時候將一直處於阻塞狀態,可是若是咱們設置了timeout的話,即timeout值大於0的話,則程序會在等到咱們設置的時間後返回,注意這裏的newfd若是等於-1的話,表示此次accept沒有發現有數據從底層返回;那麼到底timeout的值是在哪設置?咱們能夠經過ServerSocket的setSoTimeout方法進行設置,來看看這個方法:
/** * Enable/disable {@link SocketOptions#SO_TIMEOUT SO_TIMEOUT} with the * specified timeout, in milliseconds. With this option set to a non-zero * timeout, a call to accept() for this ServerSocket * will block for only this amount of time. If the timeout expires, * a <B>java.net.SocketTimeoutException</B> is raised, though the * ServerSocket is still valid. The option <B>must</B> be enabled * prior to entering the blocking operation to have effect. The * timeout must be {@code > 0}. * A timeout of zero is interpreted as an infinite timeout. * @param timeout the specified timeout, in milliseconds * @exception SocketException if there is an error in * the underlying protocol, such as a TCP error. * @since 1.1 * @see #getSoTimeout() */
public synchronized void setSoTimeout(int timeout) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.SO_TIMEOUT, timeout);
}
複製代碼
其執行了getImpl的setOption方法,而且設置了timeout時間,這裏,咱們從AbstractPlainSocketImpl中查看:
//java.net.AbstractPlainSocketImpl#setOption
public void setOption(int opt, Object val) throws SocketException {
if (isClosedOrPending()) {
throw new SocketException("Socket Closed");
}
boolean on = true;
switch (opt) {
/* check type safety b4 going native. These should never * fail, since only java.Socket* has access to * PlainSocketImpl.setOption(). */
case SO_LINGER:
if (val == null || (!(val instanceof Integer) && !(val instanceof Boolean)))
throw new SocketException("Bad parameter for option");
if (val instanceof Boolean) {
/* true only if disabling - enabling should be Integer */
on = false;
}
break;
case SO_TIMEOUT: //<1>
if (val == null || (!(val instanceof Integer)))
throw new SocketException("Bad parameter for SO_TIMEOUT");
int tmp = ((Integer) val).intValue();
if (tmp < 0)
throw new IllegalArgumentException("timeout < 0");
timeout = tmp;
break;
case IP_TOS:
if (val == null || !(val instanceof Integer)) {
throw new SocketException("bad argument for IP_TOS");
}
trafficClass = ((Integer)val).intValue();
break;
case SO_BINDADDR:
throw new SocketException("Cannot re-bind socket");
case TCP_NODELAY:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for TCP_NODELAY");
on = ((Boolean)val).booleanValue();
break;
case SO_SNDBUF:
case SO_RCVBUF:
if (val == null || !(val instanceof Integer) ||
!(((Integer)val).intValue() > 0)) {
throw new SocketException("bad parameter for SO_SNDBUF " +
"or SO_RCVBUF");
}
break;
case SO_KEEPALIVE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_KEEPALIVE");
on = ((Boolean)val).booleanValue();
break;
case SO_OOBINLINE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_OOBINLINE");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEADDR:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEADDR");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEPORT:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEPORT");
if (!supportedOptions().contains(StandardSocketOptions.SO_REUSEPORT))
throw new UnsupportedOperationException("unsupported option");
on = ((Boolean)val).booleanValue();
break;
default:
throw new SocketException("unrecognized TCP option: " + opt);
}
socketSetOption(opt, on, val);
}
複製代碼
這個方法比較長,咱們僅看與timeout
有關的代碼,即<1>處的代碼。其實這裏僅僅就是將咱們setOption裏面傳入的timeout值設置到了AbstractPlainSocketImpl的全局變量timeout裏而已。
這樣,咱們就能夠在程序級別將accept方法設置成爲非阻塞式的了,可是read方法如今仍是阻塞式的,即後面咱們還須要改造read方法,一樣將它在程序級別上變成非阻塞式。
在正式改造前,咱們有必要來解釋下Socket下同步/異步和阻塞/非阻塞:
同步/異步是屬於操做系統級別的,指的是操做系統在收到程序請求的IO以後,若是IO資源沒有準備好的話,該如何響應程序的問題,同步的話就是不響應,直到IO資源準備好;而異步的話則會返回給程序一個標誌,這個標誌用於當IO資源準備好後經過事件機制發送的內容應該發到什麼地方。
阻塞/非阻塞是屬於程序級別的,指的是程序在請求操做系統進行IO操做時,若是IO資源沒有準備好的話,程序該怎麼處理的問題,阻塞的話就是程序什麼都不作,一直等到IO資源準備好,非阻塞的話程序則繼續運行,可是會時不時的去查看下IO到底準備好沒有呢;
咱們一般見到的BIO是同步阻塞式的,同步的話說明操做系統底層是一直等待IO資源準備直到ok的,阻塞的話是程序自己也在一直等待IO資源準備直到ok,具體來說程序級別的阻塞就是accept和read形成的,咱們能夠經過改造將其變成非阻塞式,可是操做系統層次的阻塞咱們無法改變。
咱們的NIO是同步非阻塞式的,其實它的非阻塞實現原理和咱們上面的講解差很少的,就是爲了改善accept和read方法帶來的阻塞現象,因此引入了Channel
和Buffer
的概念。 好了,咱們對咱們的Demo進行改進,解決accept帶來的阻塞問題(爲多個客戶端鏈接作的異步處理,這裏就很少解釋了,讀者可自行思考,實在不行可到本人相關視頻中找到對應解讀):
public class BIOProNotB {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服務端Socket
Socket socket = null;//客戶端socket
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientSocketThread thread = null;
try {
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(1000);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
try {
socket = serverSocket.accept();
} catch (SocketTimeoutException e) {
//運行到這裏表示本次accept是沒有收到任何數據的,服務端的主線程在這裏能夠作一些其餘事情
System.out.println("now time is: " + stringNowTime());
continue;
}
System.out.println(stringNowTime() + ": id爲" + socket.hashCode() + "的Clientsocket connected");
thread = new ClientSocketThread(socket);
threadPool.execute(thread);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
class ClientSocketThread extends Thread {
public Socket socket;
public ClientSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id爲" + socket.hashCode() + " " + inputContent);
count++;
}
System.out.println("id爲" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "讀取結束");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BIOProNotB server = new BIOProNotB();
server.initBIOServer(8888);
}
}
複製代碼
爲咱們的ServerSocket設置了timeout時間,這樣的話調用accept方法的時候每隔1s他就會被喚醒一次,而再也不是一直在那裏,只有有客戶端接入纔會返回信息;咱們運行一下看看結果:
2019-01-02 17:28:43:362: serverSocket started
now time is: 2019-01-02 17:28:44:363
now time is: 2019-01-02 17:28:45:363
now time is: 2019-01-02 17:28:46:363
now time is: 2019-01-02 17:28:47:363
now time is: 2019-01-02 17:28:48:363
now time is: 2019-01-02 17:28:49:363
now time is: 2019-01-02 17:28:50:363
now time is: 2019-01-02 17:28:51:364
now time is: 2019-01-02 17:28:52:365
now time is: 2019-01-02 17:28:53:365
now time is: 2019-01-02 17:28:54:365
now time is: 2019-01-02 17:28:55:365
now time is: 2019-01-02 17:28:56:365 // <1>
2019-01-02 17:28:56:911: id爲1308927845的Clientsocket connected
now time is: 2019-01-02 17:28:57:913 // <2>
now time is: 2019-01-02 17:28:58:913
複製代碼
能夠看到,咱們剛開始並無客戶端接入的時候,是會執行System.out.println("now time is: " + stringNowTime());
的輸出,還有一點須要注意的就是,仔細看看上面的輸出結果的標記<1>與<2>,你會發現<2>處時間值不是17:28:57:365,緣由就在於若是accept正常返回值的話,是不會執行catch語句部分的。
這樣的話,咱們就把accept部分改形成了非阻塞式了,那麼read部分能夠改造麼?固然能夠,改造方法和accept很相似,咱們在read的時候,會調用 java.net.AbstractPlainSocketImpl#getInputStream
:
/** * Gets an InputStream for this socket. */
protected synchronized InputStream getInputStream() throws IOException {
synchronized (fdLock) {
if (isClosedOrPending())
throw new IOException("Socket Closed");
if (shut_rd)
throw new IOException("Socket input is shutdown");
if (socketInputStream == null)
socketInputStream = new SocketInputStream(this);
}
return socketInputStream;
}
複製代碼
這裏面建立了一個SocketInputStream
對象,會將當前AbstractPlainSocketImpl
對象傳進去,因而,在讀數據的時候,咱們會調用以下方法:
public int read(byte b[], int off, int length) throws IOException {
return read(b, off, length, impl.getTimeout());
}
int read(byte b[], int off, int length, int timeout) throws IOException {
int n;
// EOF already encountered
if (eof) {
return -1;
}
// connection reset
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
// bounds check
if (length <= 0 || off < 0 || length > b.length - off) {
if (length == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException("length == " + length
+ " off == " + off + " buffer length == " + b.length);
}
// acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try {
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
impl.setConnectionReset();
} finally {
impl.releaseFD();
}
/* * If we get here we are at EOF, the socket has been closed, * or the connection has been reset. */
if (impl.isClosedOrPending()) {
throw new SocketException("Socket closed");
}
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
eof = true;
return -1;
}
private int socketRead(FileDescriptor fd, byte b[], int off, int len, int timeout) throws IOException {
return socketRead0(fd, b, off, len, timeout);
}
複製代碼
這裏,咱們看到了socketRead一樣設定了timeout,並且這個timeout就是咱們建立這個SocketInputStream
對象時傳入的AbstractPlainSocketImpl
對象來控制的,因此,咱們只須要設定serverSocket.setSoTimeout(1000)
便可。 咱們再次修改服務端代碼(代碼總共兩次設定,第一次是設定的是ServerSocket級別的,第二次設定的客戶端鏈接返回的那個Socket,二者不同):
public class BIOProNotBR {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;//服務端Socket
Socket socket = null;//客戶端socket
ExecutorService threadPool = Executors.newCachedThreadPool();
ClientSocketThread thread = null;
try {
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(1000);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
try {
socket = serverSocket.accept();
} catch (SocketTimeoutException e) {
//運行到這裏表示本次accept是沒有收到任何數據的,服務端的主線程在這裏能夠作一些其餘事情
System.out.println("now time is: " + stringNowTime());
continue;
}
System.out.println(stringNowTime() + ": id爲" + socket.hashCode() + "的Clientsocket connected");
thread = new ClientSocketThread(socket);
threadPool.execute(thread);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
class ClientSocketThread extends Thread {
public Socket socket;
public ClientSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
socket.setSoTimeout(1000);
} catch (SocketException e1) {
e1.printStackTrace();
}
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while (true) {
try {
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id爲" + socket.hashCode() + " " + inputContent);
count++;
}
} catch (Exception e) {
//執行到這裏表示read方法沒有獲取到任何數據,線程能夠執行一些其餘的操做
System.out.println("Not read data: " + stringNowTime());
continue;
}
//執行到這裏表示讀取到了數據,咱們能夠在這裏進行回覆客戶端的工做
System.out.println("id爲" + socket.hashCode() + "的Clientsocket " + stringNowTime() + "讀取結束");
sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BIOProNotBR server = new BIOProNotBR();
server.initBIOServer(8888);
}
}
複製代碼
執行以下:
2019-01-02 17:59:03:713: serverSocket started
now time is: 2019-01-02 17:59:04:714
now time is: 2019-01-02 17:59:05:714
now time is: 2019-01-02 17:59:06:714
2019-01-02 17:59:06:932: id爲1810132623的Clientsocket connected
now time is: 2019-01-02 17:59:07:934
Not read data: 2019-01-02 17:59:07:935
now time is: 2019-01-02 17:59:08:934
Not read data: 2019-01-02 17:59:08:935
now time is: 2019-01-02 17:59:09:935
Not read data: 2019-01-02 17:59:09:936
收到id爲1810132623 2019-01-02 17:59:09: 第0條消息: ccc // <1>
now time is: 2019-01-02 17:59:10:935
Not read data: 2019-01-02 17:59:10:981 // <2>
收到id爲1810132623 2019-01-02 17:59:11: 第1條消息: bbb
now time is: 2019-01-02 17:59:11:935
Not read data: 2019-01-02 17:59:12:470
now time is: 2019-01-02 17:59:12:935
id爲1810132623的Clientsocket 2019-01-02 17:59:13:191讀取結束
now time is: 2019-01-02 17:59:13:935
id爲1810132623的Clientsocket 2019-01-02 17:59:14:192讀取結束
複製代碼
其中,Not read data輸出部分解決了咱們的read阻塞問題,每隔1s會去喚醒咱們的read操做,若是在1s內沒有讀到數據的話就會執行System.out.println("Not read data: " + stringNowTime())
,在這裏咱們就能夠進行一些其餘操做了,避免了阻塞中當前線程的現象,當咱們有數據發送以後,就有了<1>處的輸出了,由於read獲得輸出,因此再也不執行catch語句部分,所以你會發現<2>處輸出時間是和<1>處的時間相差1s而不是和以前的17:59:09:936相差一秒;
這樣的話,咱們就解決了accept以及read帶來的阻塞問題了,同時在服務端爲每個客戶端都建立了一個線程來處理各自的業務邏輯,這點其實基本上已經解決了阻塞問題了,咱們能夠理解成是最第一版的NIO,可是,爲每一個客戶端都建立一個線程這點確實讓人頭疼的,特別是客戶端多了的話,很浪費服務器資源,再加上線程之間的切換開銷,更是雪上加霜,即便你引入了線程池技術來控制線程的個數,可是當客戶端多起來的時候會致使線程池的BlockingQueue隊列愈來愈大,那麼,這時候的NIO就能夠爲咱們解決這個問題,它並不會爲每一個客戶端都建立一個線程,在服務端只有一個線程,會爲每一個客戶端建立一個通道。
accept()本地方法,咱們能夠來試着看一看Linux這塊的相關解讀:
#include <sys/types.h>
#include <sys/socket.h>
int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);
複製代碼
accept()系統調用主要用在基於鏈接的套接字類型,好比SOCK_STREAM和SOCK_SEQPACKET。它提取出所監聽套接字的等待鏈接隊列中第一個鏈接請求,建立一個新的套接字,並返回指向該套接字的文件描述符。新創建的套接字不在監聽狀態,原來所監聽的套接字也不受該系統調用的影響。
備註:新創建的套接字準備發送send()和接收數據recv()。
參數:
sockfd, 利用系統調用socket()創建的套接字描述符,經過bind()綁定到一個本地地址(通常爲服務器的套接字),而且經過listen()一直在監聽鏈接;
addr, 指向struct sockaddr的指針,該結構用通信層服務器對等套接字的地址(通常爲客戶端地址)填寫,返回地址addr的確切格式由套接字的地址類別(好比TCP或UDP)決定;若addr爲NULL,沒有有效地址填寫,這種狀況下,addrlen也不使用,應該置爲NULL;
備註:addr是個指向局部數據結構sockaddr_in的指針,這就是要求接入的信息本地的套接字(地址和指針)。
addrlen, 一個值結果參數,調用函數必須初始化爲包含addr所指向結構大小的數值,函數返回時包含對等地址(通常爲服務器地址)的實際數值;
備註:addrlen是個局部整形變量,設置爲sizeof(struct sockaddr_in)。
若是隊列中沒有等待的鏈接,套接字也沒有被標記爲Non-blocking,accept()會阻塞調用函數直到鏈接出現;若是套接字被標記爲Non-blocking,隊列中也沒有等待的鏈接,accept()返回錯誤EAGAIN或EWOULDBLOCK。
備註:通常來講,實現時accept()爲阻塞函數,當監聽socket調用accept()時,它先到本身的receive_buf中查看是否有鏈接數據包;如有,把數據拷貝出來,刪掉接收到的數據包,建立新的socket與客戶發來的地址創建鏈接;若沒有,就阻塞等待;
爲了在套接字中有到來的鏈接時獲得通知,可使用select()或poll()。當嘗試創建新鏈接時,系統發送一個可讀事件,而後調用accept()爲該鏈接獲取套接字。另外一種方法是,當套接字中有鏈接到來時設定套接字發送SIGIO信號。
返回值 成功時,返回非負整數,該整數是接收到套接字的描述符;出錯時,返回-1,相應地設定全局變量errno。
因此,咱們在咱們的Java部分的源碼裏(java.net.ServerSocket#accept)會new 一個Socket出來,方便鏈接後拿到的新Socket的文件描述符的信息給設定到咱們new出來的這個Socket上來,這點在java.net.PlainSocketImpl#socketAccept
中看到的尤其明顯,讀者能夠回顧相關源碼。