目錄html
NIO 源碼分析(02-2) BIO 源碼分析 Socketjava
Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)c++
在上一篇文章中詳細分析了 ServerSocket 的源碼,Socket 和 ServerSocket 同樣也只是一個門面模式,真正的實現也是 SocksSocketImpl,因此關於 setImpl、createImpl、new、bind、listen 都是相似的,本文重點關注其 connect 和 IO 流的讀取方法。windows
//1. 鏈接服務器 Socket socket = new Socket(); socket.connect(new InetSocketAddress(HOST, PORT), 0); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriterout = new PrintWriter(socket.getOutputStream(), true); //2. 發送請求數據 out.println("客戶端發送請求數據..."); //3. 接收服務端數據 String response = in.readLine(); System.out.println("Client: " + response);
ok,代碼已經完成!!!下面的源碼分析都會基於這個 demo。服務器
// timeout=0 表示永久阻塞,timeout>0 則指定超時時間 public void connect(SocketAddress endpoint, int timeout) throws IOException { InetSocketAddress epoint = (InetSocketAddress) endpoint; InetAddress addr = epoint.getAddress (); int port = epoint.getPort(); // 1. 建立底層 socket 套接字 if (!created) createImpl(true); // 2. oldImpl 默認爲 false,也就是進入第一個 if 條件 // checkOldImpl 會判斷 impl 中有沒有 connect(SocketAddress address, int port) 方法 // 來設置 oldImpl 的值 if (!oldImpl) impl.connect(epoint, timeout); else if (timeout == 0) { if (epoint.isUnresolved()) impl.connect(addr.getHostName(), port); else impl.connect(addr, port); } else throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)"); connected = true; bound = true; }
總結: Socket 首先和 ServerSocket 同樣調用 createImpl 建立底層 socket 對象,而後委託給 impl 完成鏈接操做jvm
protected void connect(SocketAddress address, int timeout) throws IOException { boolean connected = false; try { InetSocketAddress addr = (InetSocketAddress) address; this.port = addr.getPort(); this.address = addr.getAddress(); connectToAddress(this.address, port, timeout); connected = true; } finally { if (!connected) { close(); } } } private void connectToAddress(InetAddress address, int port, int timeout) throws IOException { if (address.isAnyLocalAddress()) { doConnect(InetAddress.getLocalHost(), port, timeout); } else { doConnect(address, port, timeout); } }
總結: connect 將鏈接具體由 doConnect 完成socket
synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException { synchronized (fdLock) { if (!closePending && (socket == null || !socket.isBound())) { NetHooks.beforeTcpConnect(fd, address, port); } } try { acquireFD(); try { socketConnect(address, port, timeout); /* socket may have been closed during poll/select */ synchronized (fdLock) { if (closePending) { throw new SocketException ("Socket closed"); } } if (socket != null) { socket.setBound(); socket.setConnected(); } } finally { releaseFD(); } } catch (IOException e) { close(); throw e; } }
void socketConnect(InetAddress address, int port, int timeout) throws IOException { int nativefd = checkAndReturnNativeFD(); if (address == null) throw new NullPointerException("inet address argument is null."); int connectResult; if (timeout <= 0) { connectResult = connect0(nativefd, address, port); } else { configureBlocking(nativefd, false); try { connectResult = connect0(nativefd, address, port); if (connectResult == WOULDBLOCK) { waitForConnect(nativefd, timeout); } } finally { configureBlocking(nativefd, true); } } if (localport == 0) localport = localPort0(nativefd); }
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0 (JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) { SOCKETADDRESS sa; int rv; int sa_len = sizeof(sa); if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa, &sa_len, JNI_TRUE) != 0) { return -1; } rv = connect(fd, (struct sockaddr *)&sa, sa_len); if (rv == SOCKET_ERROR) { int err = WSAGetLastError(); if (err == WSAEWOULDBLOCK) { return java_net_DualStackPlainSocketImpl_WOULDBLOCK; } else if (err == WSAEADDRNOTAVAIL) { JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException", "connect: Address is invalid on local machine, or port is not valid on remote machine"); } else { NET_ThrowNew(env, err, "connect"); } return -1; // return value not important. } return rv; }
總結: rv = connect(fd, (struct sockaddr *)&sa, sa_len)
創建遠程鏈接。函數
和 ServerSocket.waitForNewConnection 同樣,也是經過 Winsock 庫的 select 函數來實現超時的功能。源碼分析
JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect (JNIEnv *env, jclass clazz, jint fd, jint timeout) { int rv, retry; int optlen = sizeof(rv); fd_set wr, ex; struct timeval t; FD_ZERO(&wr); FD_ZERO(&ex); FD_SET(fd, &wr); FD_SET(fd, &ex); t.tv_sec = timeout / 1000; t.tv_usec = (timeout % 1000) * 1000; rv = select(fd+1, 0, &wr, &ex, &t); if (rv == 0) { JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "connect timed out"); shutdown( fd, SD_BOTH ); return; } if (!FD_ISSET(fd, &ex)) { return; /* connection established */ } for (retry=0; retry<3; retry++) { NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR, (char*)&rv, &optlen); if (rv) { break; } Sleep(0); } if (rv == 0) { JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "Unable to establish connection"); } else { NET_ThrowNew(env, rv, "connect"); } }
總結: rv = select(fd+1, 0, &wr, &ex, &t)
輪詢會阻塞程序。ui
SocketInputStream(AbstractPlainSocketImpl impl) throws IOException { super(impl.getFileDescriptor()); this.impl = impl; socket = impl.getSocket(); }
總結: SocketInputStream 內部實現上也是對 impl 的封裝。SocketInputStream.read 其實也是調用底層 socket 的 read 方法。
int read(byte b[], int off, int length, int timeout) throws IOException { int n; // EOF already encountered if (eof) { return -1; } // connection reset if (impl.isConnectionReset()) { throw new SocketException("Connection reset"); } // bounds check if (length <= 0 || off < 0 || off + length > b.length) { if (length == 0) { return 0; } throw new ArrayIndexOutOfBoundsException(); } boolean gotReset = false; // acquire file descriptor and do the read FileDescriptor fd = impl.acquireFD(); try { n = socketRead(fd, b, off, length, timeout); if (n > 0) { return n; } } catch (ConnectionResetException rstExc) { gotReset = true; } finally { impl.releaseFD(); } /* * We receive a "connection reset" but there may be bytes still * buffered on the socket */ if (gotReset) { impl.setConnectionResetPending(); impl.acquireFD(); try { n = socketRead(fd, b, off, length, timeout); if (n > 0) { return n; } } catch (ConnectionResetException rstExc) { } finally { impl.releaseFD(); } } /* * If we get here we are at EOF, the socket has been closed, * or the connection has been reset. */ if (impl.isClosedOrPending()) { throw new SocketException("Socket closed"); } if (impl.isConnectionResetPending()) { impl.setConnectionReset(); } if (impl.isConnectionReset()) { throw new SocketException("Connection reset"); } eof = true; return -1; } private int socketRead(FileDescriptor fd, byte b[], int off, int len, int timeout) throws IOException { return socketRead0(fd, b, off, len, timeout); }
// src/windows/native/java/net/SocketInputStream.c JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this, jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) { char *bufP; char BUF[MAX_BUFFER_LEN]; jint fd, newfd; jint nread; if (IS_NULL(fdObj)) { JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed"); return -1; } fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID); if (fd == -1) { NET_ThrowSocketException(env, "Socket closed"); return -1; } /* * If the caller buffer is large than our stack buffer then we allocate * from the heap (up to a limit). If memory is exhausted we always use * the stack buffer. */ if (len <= MAX_BUFFER_LEN) { bufP = BUF; } else { if (len > MAX_HEAP_BUFFER_LEN) { len = MAX_HEAP_BUFFER_LEN; } bufP = (char *)malloc((size_t)len); if (bufP == NULL) { /* allocation failed so use stack buffer */ bufP = BUF; len = MAX_BUFFER_LEN; } } if (timeout) { if (timeout <= 5000 || !isRcvTimeoutSupported) { int ret = NET_Timeout (fd, timeout); if (ret <= 0) { if (ret == 0) { JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out"); } else if (ret == JVM_IO_ERR) { JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed"); } else if (ret == JVM_IO_INTR) { JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException", "Operation interrupted"); } if (bufP != BUF) { free(bufP); } return -1; } /*check if the socket has been closed while we were in timeout*/ newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID); if (newfd == -1) { NET_ThrowSocketException(env, "Socket Closed"); if (bufP != BUF) { free(bufP); } return -1; } } } // 最關鍵的代碼,recv 從 socketfd 中讀取數據 nread = recv(fd, bufP, len, 0); if (nread > 0) { (*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP); } else { if (nread < 0) { // Check if the socket has been closed since we last checked. // This could be a reason for recv failing. if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) { NET_ThrowSocketException(env, "Socket closed"); } else { switch (WSAGetLastError()) { case WSAEINTR: JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed"); break; case WSAECONNRESET: case WSAESHUTDOWN: /* * Connection has been reset - Windows sometimes reports * the reset as a shutdown error. */ JNU_ThrowByName(env, "sun/net/ConnectionResetException", ""); break; case WSAETIMEDOUT : JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out"); break; default: NET_ThrowCurrent(env, "recv failed"); } } } } if (bufP != BUF) { free(bufP); } return nread; }
總結: socketRead0 實現很長,其實咱們只用關注核心的實現 nread = recv(fd, bufP, len, 0);
便可,畢竟咱們不是專門作 c++。
和 SocketInputStream 相似,就不繼續分析了。
天天用心記錄一點點。內容也許不重要,但習慣很重要!