NIO 源碼分析(02-2) BIO 源碼分析 Socket

NIO 源碼分析(02-2) BIO 源碼分析 Socketjava

Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)c++

在上一篇文章中詳細分析了 ServerSocket 的源碼,Socket 和 ServerSocket 同樣也只是一個門面模式,真正的實現也是 SocksSocketImpl,因此關於 setImpl、createImpl、new、bind、listen 都是相似的,本文重點關注其 connect 和 IO 流的讀取方法。windows

1、BIO 最簡使用姿式

//1. 鏈接服務器
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HOST, PORT), 0);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriterout = new PrintWriter(socket.getOutputStream(), true);

//2. 發送請求數據
out.println("客戶端發送請求數據...");

//3. 接收服務端數據
String response = in.readLine();
System.out.println("Client: " + response);

ok,代碼已經完成!!!下面的源碼分析都會基於這個 demo。服務器

2、connect 方法

Socket.connect 方法

2.1 Socket.connect 方法

// timeout=0 表示永久阻塞,timeout>0 則指定超時時間
public void connect(SocketAddress endpoint, int timeout) throws IOException {
  
    InetSocketAddress epoint = (InetSocketAddress) endpoint;
    InetAddress addr = epoint.getAddress ();
    int port = epoint.getPort();

    // 1. 建立底層 socket 套接字
    if (!created)
        createImpl(true);

    // 2. oldImpl 默認爲 false,也就是進入第一個 if 條件
    //    checkOldImpl 會判斷 impl 中有沒有 connect(SocketAddress address, int port) 方法
    //    來設置 oldImpl 的值
    if (!oldImpl)
        impl.connect(epoint, timeout);
    else if (timeout == 0) {
        if (epoint.isUnresolved())
            impl.connect(addr.getHostName(), port);
        else
            impl.connect(addr, port);
    } else
        throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
    connected = true;
    bound = true;
}

總結: Socket 首先和 ServerSocket 同樣調用 createImpl 建立底層 socket 對象,而後委託給 impl 完成鏈接操做jvm

2.2 AbstractPlainSocketImpl.connect 方法

protected void connect(SocketAddress address, int timeout) throws IOException {
    boolean connected = false;
    try {
        InetSocketAddress addr = (InetSocketAddress) address;
        this.port = addr.getPort();
        this.address = addr.getAddress();

        connectToAddress(this.address, port, timeout);
        connected = true;
    } finally {
        if (!connected) {
            close();
        }
    }
}

private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
    if (address.isAnyLocalAddress()) {
        doConnect(InetAddress.getLocalHost(), port, timeout);
    } else {
        doConnect(address, port, timeout);
    }
}

總結: connect 將鏈接具體由 doConnect 完成socket

synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
    synchronized (fdLock) {
        if (!closePending && (socket == null || !socket.isBound())) {
            NetHooks.beforeTcpConnect(fd, address, port);
        }
    }
    try {
        acquireFD();
        try {
            socketConnect(address, port, timeout);
            /* socket may have been closed during poll/select */
            synchronized (fdLock) {
                if (closePending) {
                    throw new SocketException ("Socket closed");
                }
            }
            if (socket != null) {
                socket.setBound();
                socket.setConnected();
            }
        } finally {
            releaseFD();
        }
    } catch (IOException e) {
        close();
        throw e;
    }
}

2.3 DualStackPlainSocketImpl.socketConnect 方法

void socketConnect(InetAddress address, int port, int timeout)
    throws IOException {
    int nativefd = checkAndReturnNativeFD();

    if (address == null)
        throw new NullPointerException("inet address argument is null.");

    int connectResult;
    if (timeout <= 0) {
        connectResult = connect0(nativefd, address, port);
    } else {
        configureBlocking(nativefd, false);
        try {
            connectResult = connect0(nativefd, address, port);
            if (connectResult == WOULDBLOCK) {
                waitForConnect(nativefd, timeout);
            }
        } finally {
            configureBlocking(nativefd, true);
        }
    }
   
    if (localport == 0)
        localport = localPort0(nativefd);
}
補充1:connect0 在 JVM 中的實現
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
  (JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) {
    SOCKETADDRESS sa;
    int rv;
    int sa_len = sizeof(sa);

    if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
                                 &sa_len, JNI_TRUE) != 0) {
      return -1;
    }

    rv = connect(fd, (struct sockaddr *)&sa, sa_len);
    if (rv == SOCKET_ERROR) {
        int err = WSAGetLastError();
        if (err == WSAEWOULDBLOCK) {
            return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
        } else if (err == WSAEADDRNOTAVAIL) {
            JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
                "connect: Address is invalid on local machine, or port is not valid on remote machine");
        } else {
            NET_ThrowNew(env, err, "connect");
        }
        return -1;  // return value not important.
    }
    return rv;
}

總結: rv = connect(fd, (struct sockaddr *)&sa, sa_len) 創建遠程鏈接。函數

補充2:waitForConnect 在 JVM 中的實現

和 ServerSocket.waitForNewConnection 同樣,也是經過 Winsock 庫的 select 函數來實現超時的功能。源碼分析

JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
  (JNIEnv *env, jclass clazz, jint fd, jint timeout) {
    int rv, retry;
    int optlen = sizeof(rv);
    fd_set wr, ex;
    struct timeval t;

    FD_ZERO(&wr);
    FD_ZERO(&ex);
    FD_SET(fd, &wr);
    FD_SET(fd, &ex);
    t.tv_sec = timeout / 1000;
    t.tv_usec = (timeout % 1000) * 1000;

    rv = select(fd+1, 0, &wr, &ex, &t);
    if (rv == 0) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                        "connect timed out");
        shutdown( fd, SD_BOTH );
        return;
    }
    if (!FD_ISSET(fd, &ex)) {
        return;         /* connection established */
    }

    for (retry=0; retry<3; retry++) {
        NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
                       (char*)&rv, &optlen);
        if (rv) {
            break;
        }
        Sleep(0);
    }

    if (rv == 0) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                        "Unable to establish connection");
    } else {
        NET_ThrowNew(env, rv, "connect");
    }
}

總結: rv = select(fd+1, 0, &wr, &ex, &t) 輪詢會阻塞程序。ui

3、SocketInputStream

3.1 構造方法

SocketInputStream(AbstractPlainSocketImpl impl) throws IOException {
    super(impl.getFileDescriptor());
    this.impl = impl;
    socket = impl.getSocket();
}

總結: SocketInputStream 內部實現上也是對 impl 的封裝。SocketInputStream.read 其實也是調用底層 socket 的 read 方法。

3.2 read 方法

int read(byte b[], int off, int length, int timeout) throws IOException {
    int n;

    // EOF already encountered
    if (eof) {
        return -1;
    }

    // connection reset
    if (impl.isConnectionReset()) {
        throw new SocketException("Connection reset");
    }

    // bounds check
    if (length <= 0 || off < 0 || off + length > b.length) {
        if (length == 0) {
            return 0;
        }
        throw new ArrayIndexOutOfBoundsException();
    }

    boolean gotReset = false;

    // acquire file descriptor and do the read
    FileDescriptor fd = impl.acquireFD();
    try {
        n = socketRead(fd, b, off, length, timeout);
        if (n > 0) {
            return n;
        }
    } catch (ConnectionResetException rstExc) {
        gotReset = true;
    } finally {
        impl.releaseFD();
    }

    /*
     * We receive a "connection reset" but there may be bytes still
     * buffered on the socket
     */
    if (gotReset) {
        impl.setConnectionResetPending();
        impl.acquireFD();
        try {
            n = socketRead(fd, b, off, length, timeout);
            if (n > 0) {
                return n;
            }
        } catch (ConnectionResetException rstExc) {
        } finally {
            impl.releaseFD();
        }
    }

    /*
     * If we get here we are at EOF, the socket has been closed,
     * or the connection has been reset.
     */
    if (impl.isClosedOrPending()) {
        throw new SocketException("Socket closed");
    }
    if (impl.isConnectionResetPending()) {
        impl.setConnectionReset();
    }
    if (impl.isConnectionReset()) {
        throw new SocketException("Connection reset");
    }
    eof = true;
    return -1;
}

private int socketRead(FileDescriptor fd, byte b[], int off, int len,
        int timeout) throws IOException {
    return socketRead0(fd, b, off, len, timeout);
}
補充2:socketRead0 在 JVM 中的實現
// src/windows/native/java/net/SocketInputStream.c
JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
        jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) {
    char *bufP;
    char BUF[MAX_BUFFER_LEN];
    jint fd, newfd;
    jint nread;

    if (IS_NULL(fdObj)) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
        return -1;
    }
    fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
    if (fd == -1) {
        NET_ThrowSocketException(env, "Socket closed");
        return -1;
    }

    /*
     * If the caller buffer is large than our stack buffer then we allocate
     * from the heap (up to a limit). If memory is exhausted we always use
     * the stack buffer.
     */
    if (len <= MAX_BUFFER_LEN) {
        bufP = BUF;
    } else {
        if (len > MAX_HEAP_BUFFER_LEN) {
            len = MAX_HEAP_BUFFER_LEN;
        }
        bufP = (char *)malloc((size_t)len);
        if (bufP == NULL) {
            /* allocation failed so use stack buffer */
            bufP = BUF;
            len = MAX_BUFFER_LEN;
        }
    }

    if (timeout) {
        if (timeout <= 5000 || !isRcvTimeoutSupported) {
            int ret = NET_Timeout (fd, timeout);

            if (ret <= 0) {
                if (ret == 0) {
                    JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                                    "Read timed out");
                } else if (ret == JVM_IO_ERR) {
                    JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
                } else if (ret == JVM_IO_INTR) {
                    JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
                                    "Operation interrupted");
                }
                if (bufP != BUF) {
                    free(bufP);
                }
                return -1;
            }

            /*check if the socket has been closed while we were in timeout*/
            newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
            if (newfd == -1) {
                NET_ThrowSocketException(env, "Socket Closed");
                if (bufP != BUF) {
                    free(bufP);
                }
                return -1;
            }
        }
    }

    // 最關鍵的代碼,recv 從 socketfd 中讀取數據
    nread = recv(fd, bufP, len, 0);

    if (nread > 0) {
        (*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
    } else {
        if (nread < 0) {
            // Check if the socket has been closed since we last checked.
            // This could be a reason for recv failing.
            if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) {
                NET_ThrowSocketException(env, "Socket closed");
            } else {
                switch (WSAGetLastError()) {
                    case WSAEINTR:
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                            "socket closed");
                        break;
                    case WSAECONNRESET:
                    case WSAESHUTDOWN:
                        /*
                         * Connection has been reset - Windows sometimes reports
                         * the reset as a shutdown error.
                         */
                        JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
                        break;
                    case WSAETIMEDOUT :
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
                        break;
                    default:
                        NET_ThrowCurrent(env, "recv failed");
                }
            }
        }
    }
    if (bufP != BUF) {
        free(bufP);
    }
    return nread;
}

總結: socketRead0 實現很長,其實咱們只用關注核心的實現 nread = recv(fd, bufP, len, 0); 便可,畢竟咱們不是專門作 c++。

4、SocketInputStream

和 SocketInputStream 相似,就不繼續分析了。


天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索