最近作一個帶屏的冰箱項目,其中有個文件上傳的功能。基本的思路是在一個局域網中的設備端起一個服務,這樣局域網中的其餘設備就能夠經過Http訪問的方式,實現文件上傳的功能了。在設備端起一個服務,這裏使用了一個開源的微服務項目NanoHTTPD. 雖然只有一個java文件,可是裏面包含了不少網絡處理方面的細節。所謂麻雀雖小,五臟俱全。這篇文章會介紹NanoHTTPD的源碼,但又不只如此。我但願這篇文章會把socket和http方面的基礎概念介紹一下,由於我在閱讀NanoHTTPD源碼的時候發現,這些概念對於理解NanoHTTPD很是的重要。另外NanoHTTPD包含了網絡處理的一些細節,當你以前沒有深刻的研究過這些細節的時候,你就很難系統清晰的理解網絡傳輸。html
下面這段代碼是官網提供的Sample,其實挺簡單,指定一個端口,調用start方法就能夠了。serve方法的做用就是處理請求做出響應,sample中返回了一個html頁面。java
public class App extends NanoHTTPD {
public App() throws IOException {
super(8080);
start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
System.out.println("\nRunning! Point your browsers to http://localhost:8080/ \n");
}
public static void main(String[] args) {
try {
new App();
} catch (IOException ioe) {
System.err.println("Couldn't start server:\n" + ioe);
}
}
@Override
public Response serve(IHTTPSession session) {
String msg = "<html><body><h1>Hello server</h1>\n";
Map<String, String> parms = session.getParms();
if (parms.get("username") == null) {
msg += "<form action='?' method='get'>\n <p>Your name: <input type='text' name='username'></p>\n" + "</form>\n";
} else {
msg += "<p>Hello, " + parms.get("username") + "!</p>";
}
return newFixedLengthResponse(msg + "</body></html>\n");//返回html頁面
}
}
複製代碼
調用start方法後,服務就起來了。下面咱們深刻其內部看看這個start作了那些的操做。android
/**
* Start the server.
*
* @param timeout timeout to use for socket connections.
* @param daemon start the thread daemon or not.
* @throws IOException if the socket is in use.
*/
public void start(final int timeout, boolean daemon) throws IOException {
this.myServerSocket = this.getServerSocketFactory().create();
this.myServerSocket.setReuseAddress(true);
ServerRunnable serverRunnable = createServerRunnable(timeout);
this.myThread = new Thread(serverRunnable);
this.myThread.setDaemon(daemon);//線程分爲User線程和Daemon線程,當用戶線程結束時,jvm也會推出,
//Daemon線程也就結束了,可是隻要有User線程在, Jvm就不會退出。
this.myThread.setName("NanoHttpd Main Listener");
this.myThread.start();
while (!serverRunnable.hasBinded && serverRunnable.bindException == null) {
try {
Thread.sleep(10L);
} catch (Throwable e) {
// on android this may not be allowed, that's why we // catch throwable the wait should be very short because we are // just waiting for the bind of the socket } } if (serverRunnable.bindException != null) { throw serverRunnable.bindException; } } 複製代碼
首先建立了一個ServerSocket實例,而後起了一個線程,在這個線程中進行操做,至於進行了什麼操做,它的邏輯在ServerRunnable中。git
/**
* The runnable that will be used for the main listening thread.
*/
public class ServerRunnable implements Runnable {
private final int timeout;
private IOException bindException;
private boolean hasBinded = false;
private ServerRunnable(int timeout) {
this.timeout = timeout;
}
@Override
public void run() {
try {
myServerSocket.bind(hostname != null ? new InetSocketAddress(hostname, myPort) : new InetSocketAddress(myPort));
hasBinded = true;
} catch (IOException e) {
this.bindException = e;
return;
}
do {
try {
final Socket finalAccept = NanoHTTPD.this.myServerSocket.accept();
if (this.timeout > 0) {
finalAccept.setSoTimeout(this.timeout);
}
final InputStream inputStream = finalAccept.getInputStream();
NanoHTTPD.this.asyncRunner.exec(createClientHandler(finalAccept, inputStream));
} catch (IOException e) {
NanoHTTPD.LOG.log(Level.FINE, "Communication with the client broken", e);
}
} while (!NanoHTTPD.this.myServerSocket.isClosed());
}
}
複製代碼
咱們看一下ServerRunnable的run方法,咱們建立了一個ServerSocket實例,如今咱們調用它的bind方法, 在Java層面只給我暴露了一個bind方法,可是咱們要知道,Java底層也是要調用系統提供的接口的,就是所謂的系統調用, Java中的bind方法其實對應的是系統調用的bind和listen兩個方法。那咱們看看系統調用的bind和listen是作什麼的。你們查看這些系統調用的api的文檔可使用下面的命令:github
man 2 bind
複製代碼
bind()函數把一個地址族中的特定地址賦給socket。例如對應AF_INET、AF_INET6就是把一個ipv4或ipv6地址和端口號組合賦給socket。至關於將ip地址和端口與socket創建了聯繫。編程
其實在真正的系統層級的Socket接口中還有一個listen方法。它的做用讓其餘進程的socket能夠訪問當前的socket.api
當ServerSocket調用accept方法的時候,就能夠獲取到客戶端的請求了,這個方法是個阻塞方法(所謂阻塞,就是進程或是線程執行到這個函數時必須等待某個事件的發生,若是這個事件沒有發生,則進程和線程就會阻塞在這個地方,不能繼續往下執行了。),當有客戶端connect時,這個方法就會被調用。它返回了一個Socket,這個Socket其實既包含了服務端的Socket描述符,也包含了客戶端返回的地址信息等。從這個Socket有getInputStream和getOutputStream兩個方法,分別表明從客戶端發送過來的數據流和咱們返回給客戶端的數據流。瀏覽器
上面的部分咱們已經在服務端建立了一個Socket而且調用了它的bind、listen等方法。可是到底什麼是Socket呢,咱們如今討論一下,在討論什麼是Socket以前,咱們先了解一下什麼是網絡協議,咱們人交談時,說出的語言要符合語法和用語規範。機器之間的通話也要符合必定的協議。不然,雞同鴨講,沒法相互理解。咱們平時所用的網絡是由:緩存
這四層協議組成。它們的順序爲由上到下,也就是說上層協議要依賴下層協議。好比HTTP協議它要依賴TCP協議。 瞭解了這些內容以後咱們講什麼是Socket,它是系統對TCP/IP協議的封裝的接口,也就是說Socket不是協議,它只是對TCP/IP協議的實現,方便開發者對網絡進行開發而已。在Unix中一切皆文件,其實Socket也是一種文件,換句話說網絡鏈接就是一個文件。由於它有對數據流有讀寫和關閉功能。當咱們創建一個網絡鏈接時,就是建立了一個socket文件,這樣咱們就能夠read從別的計算器傳輸過來的數據,write數據給別的計算機。bash
做爲服務端,面臨的一種狀況就是併發訪問。NanoHTTPD其實在AsyncRunner的exec方法中作的處理,在DefaultAsyncRunner的exec方法中,啓動了一個線程處理每個的訪問鏈接。鏈接默認的上限是50.而真正處理請求的方法的地方在ClientHandler中。
客戶端的請求處理都是在這個方法中完成的。
/**
* The runnable that will be used for every new client connection.
*/
public class ClientHandler implements Runnable {
...
@Override
public void run() {
OutputStream outputStream = null;
try {
outputStream = this.acceptSocket.getOutputStream();
TempFileManager tempFileManager = NanoHTTPD.this.tempFileManagerFactory.create();
HTTPSession session = new HTTPSession(tempFileManager, this.inputStream, outputStream, this.acceptSocket.getInetAddress());
while (!this.acceptSocket.isClosed()) {
session.execute();
}
} catch (Exception e) {
// When the socket is closed by the client,
// we throw our own SocketException
// to break the "keep alive" loop above. If
// the exception was anything other
// than the expected SocketException OR a
// SocketTimeoutException, print the
// stacktrace
if (!(e instanceof SocketException && "NanoHttpd Shutdown".equals(e.getMessage())) && !(e instanceof SocketTimeoutException)) {
NanoHTTPD.LOG.log(Level.FINE, "Communication with the client broken", e);
}
} finally {
safeClose(outputStream);
safeClose(this.inputStream);
safeClose(this.acceptSocket);
NanoHTTPD.this.asyncRunner.closed(this);
}
}
}
複製代碼
從上面的邏輯能夠發現,這裏有建立了一個HTTPSession用於處理每一次的http請求,那咱們就看看HTTPSession.execute這個方法。
在這個方法中獲取到InputStream,這個就是客戶端請求數據流,經過它咱們就能夠拿到客戶端的請求數據了。咱們常常據說的HTTP協議這個時候就有用處了,Socket封裝了TCP/IP協議,可是沒有封裝應用層協議。這一層的協議須要咱們本身處理。這個execute方法裏的邏輯就是咱們對Http協議的實現。咱們的服務在一個機器上開始運轉了,這個時候在另外一臺機器的瀏覽器裏輸入了前面機器的Ip和端口。我知道這就是一個Http請求了,那麼請求數據就到了inputStream這個輸入流中。那麼接下來就根據http協議規定的內容來解析數據了。
@Override
public void execute() throws IOException {
Response r = null;
try {
//讀取前8192字節的數據,其實就是header,Apache默認的header限制大小爲8KB
byte[] buf = new byte[HTTPSession.BUFSIZE];
this.splitbyte = 0;
this.rlen = 0;
int read = -1;
this.inputStream.mark(HTTPSession.BUFSIZE);
try {
read = this.inputStream.read(buf, 0, HTTPSession.BUFSIZE);
} catch (Exception e) {
safeClose(this.inputStream);
safeClose(this.outputStream);
throw new SocketException("NanoHttpd Shutdown");
}
if (read == -1) {
// socket was been closed
safeClose(this.inputStream);
safeClose(this.outputStream);
throw new SocketException("NanoHttpd Shutdown");
}
while (read > 0) {
this.rlen += read;
this.splitbyte = findHeaderEnd(buf, this.rlen);
if (this.splitbyte > 0) {
break;
}
read = this.inputStream.read(buf, this.rlen, HTTPSession.BUFSIZE - this.rlen);
}
if (this.splitbyte < this.rlen) {
this.inputStream.reset();
this.inputStream.skip(this.splitbyte);
}
this.parms = new HashMap<String, String>();
if (null == this.headers) {
this.headers = new HashMap<String, String>(); //建立header用於存儲咱們解析出來的請求頭
} else {
this.headers.clear();
}
// 建立BufferedReader用於解析header
BufferedReader hin = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buf, 0, this.rlen)));
// Decode the header into parms and header java properties
Map<String, String> pre = new HashMap<String, String>();
decodeHeader(hin, pre, this.parms, this.headers);//咱們將解析的header存入map中
//打印解析出來的header數據,筆者加的日誌
for (Map.Entry<String, String> entry : headers.entrySet()) {
Log.d(TAG, "header key = " + entry.getKey() + " value = " + entry.getValue());
}
if (null != this.remoteIp) {
this.headers.put("remote-addr", this.remoteIp);
this.headers.put("http-client-ip", this.remoteIp);
}
this.method = Method.lookup(pre.get("method"));//獲取請求方法 get、post、put
if (this.method == null) {
throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Syntax error.");
}
this.uri = pre.get("uri");//獲取請求的uri
this.cookies = new CookieHandler(this.headers);//處理cookie
String connection = this.headers.get("connection");
boolean keepAlive = protocolVersion.equals("HTTP/1.1") && (connection == null || !connection.matches("(?i).*close.*"));//判斷是否支持keepAlive
// Ok, now do the serve()
// TODO: long body_size = getBodySize();
// TODO: long pos_before_serve = this.inputStream.totalRead()
// (requires implementaion for totalRead())
r = serve(this);//這個serve就是咱們在開篇的實例代碼中實現的方法,在這個方法中咱們要建立返回給客戶端的響應數據。
// TODO: this.inputStream.skip(body_size -
// (this.inputStream.totalRead() - pos_before_serve))
if (r == null) {
throw new ResponseException(Response.Status.INTERNAL_ERROR, "SERVER INTERNAL ERROR: Serve() returned a null response.");
} else {
//r爲響應數據,這裏是添加一些公共的響應頭
String acceptEncoding = this.headers.get("accept-encoding");
this.cookies.unloadQueue(r);
r.setRequestMethod(this.method);
r.setGzipEncoding(useGzipWhenAccepted(r) && acceptEncoding != null && acceptEncoding.contains("gzip"));
r.setKeepAlive(keepAlive);
r.send(this.outputStream);//將數據響應給客戶端
}
if (!keepAlive || "close".equalsIgnoreCase(r.getHeader("connection"))) {
throw new SocketException("NanoHttpd Shutdown");
}
} catch (SocketException e) {
// throw it out to close socket object (finalAccept)
throw e;
} catch (SocketTimeoutException ste) {
// treat socket timeouts the same way we treat socket exceptions
// i.e. close the stream & finalAccept object by throwing the
// exception up the call stack.
throw ste;
} catch (IOException ioe) {
Response resp = newFixedLengthResponse(Response.Status.INTERNAL_ERROR, NanoHTTPD.MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage());
resp.send(this.outputStream);
safeClose(this.outputStream);
} catch (ResponseException re) {
Response resp = newFixedLengthResponse(re.getStatus(), NanoHTTPD.MIME_PLAINTEXT, re.getMessage());
resp.send(this.outputStream);
safeClose(this.outputStream);
} finally {
safeClose(r);
this.tempFileManager.clear();
}
}
複製代碼
我介紹一下這段代碼的功能:
下面看看serve方法,這個方法是須要咱們實現的,可是其內部是有個默認實現,那就是處理文件上傳的一個實現
public Response serve(IHTTPSession session) {
Map<String, String> files = new HashMap<String, String>();
Method method = session.getMethod();
if (Method.PUT.equals(method) || Method.POST.equals(method)) {
try {
session.parseBody(files);//重要的邏輯在parseBody中
} catch (IOException ioe) {
return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, NanoHTTPD.MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage());
} catch (ResponseException re) {
return newFixedLengthResponse(re.getStatus(), NanoHTTPD.MIME_PLAINTEXT, re.getMessage());
}
}
Map<String, String> parms = session.getParms();
parms.put(NanoHTTPD.QUERY_STRING_PARAMETER, session.getQueryParameterString());
return serve(session.getUri(), method, session.getHeaders(), parms, files);
}
複製代碼
對就是這個session.parseBody()方法。
@Override
public void parseBody(Map<String, String> files) throws IOException, ResponseException {
RandomAccessFile randomAccessFile = null;
try {
long size = getBodySize();
ByteArrayOutputStream baos = null;
DataOutput request_data_output = null;
// Store the request in memory or a file, depending on size
if (size < MEMORY_STORE_LIMIT) {
baos = new ByteArrayOutputStream();
request_data_output = new DataOutputStream(baos);
} else {
randomAccessFile = getTmpBucket();
request_data_output = randomAccessFile;
}
// Read all the body and write it to request_data_output
byte[] buf = new byte[REQUEST_BUFFER_LEN];
while (this.rlen >= 0 && size > 0) {
this.rlen = this.inputStream.read(buf, 0, (int) Math.min(size, REQUEST_BUFFER_LEN));
size -= this.rlen;
if (this.rlen > 0) {
request_data_output.write(buf, 0, this.rlen);
}
}
ByteBuffer fbuf = null;
if (baos != null) {
fbuf = ByteBuffer.wrap(baos.toByteArray(), 0, baos.size());
} else {
fbuf = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, randomAccessFile.length());
randomAccessFile.seek(0);
}
// If the method is POST, there may be parameters
// in data section, too, read it:
if (Method.POST.equals(this.method)) {
String contentType = "";
String contentTypeHeader = this.headers.get("content-type");
Log.d(TAG, "contentTypeHeader = " + contentTypeHeader);
StringTokenizer st = null;
if (contentTypeHeader != null) {
st = new StringTokenizer(contentTypeHeader, ",; ");
if (st.hasMoreTokens()) {
contentType = st.nextToken();
}
}
if ("multipart/form-data".equalsIgnoreCase(contentType)) {//文件上傳
// Handle multipart/form-data
if (!st.hasMoreTokens()) {
throw new ResponseException(Response.Status.BAD_REQUEST,
"BAD REQUEST: Content type is multipart/form-data but boundary missing. Usage: GET /example/file.html");
}
//處理文件上傳的方法
decodeMultipartFormData(getAttributeFromContentHeader(contentTypeHeader, BOUNDARY_PATTERN, null), //
getAttributeFromContentHeader(contentTypeHeader, CHARSET_PATTERN, "US-ASCII"), fbuf, this.parms, files);
} else {
byte[] postBytes = new byte[fbuf.remaining()];
fbuf.get(postBytes);
String postLine = new String(postBytes).trim();
// Handle application/x-www-form-urlencoded
if ("application/x-www-form-urlencoded".equalsIgnoreCase(contentType)) {
decodeParms(postLine, this.parms);
} else if (postLine.length() != 0) {
// Special case for raw POST data => create a
// special files entry "postData" with raw content
// data
files.put("postData", postLine);
}
}
} else if (Method.PUT.equals(this.method)) {
files.put("content", saveTmpFile(fbuf, 0, fbuf.limit(), null));
}
} finally {
safeClose(randomAccessFile);
}
}
複製代碼
這段代碼的功能:
這個方法主要處理文件上傳邏輯
/**
* Decodes the Multipart Body data and put it into Key/Value pairs.
*/
private void decodeMultipartFormData(String boundary, String encoding, ByteBuffer fbuf, Map<String, String> parms, Map<String, String> files) throws ResponseException {
try {
int[] boundary_idxs = getBoundaryPositions(fbuf, boundary.getBytes());
if (boundary_idxs.length < 2) {
throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data but contains less than two boundary strings.");
}
byte[] part_header_buff = new byte[MAX_HEADER_SIZE];
for (int bi = 0; bi < boundary_idxs.length - 1; bi++) {
fbuf.position(boundary_idxs[bi]);
int len = (fbuf.remaining() < MAX_HEADER_SIZE) ? fbuf.remaining() : MAX_HEADER_SIZE;
fbuf.get(part_header_buff, 0, len);
BufferedReader in = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(part_header_buff, 0, len), Charset.forName(encoding)), len);
int headerLines = 0;
// First line is boundary string
String mpline = in.readLine();
headerLines++;
if (!mpline.contains(boundary)) {
throw new ResponseException(Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data but chunk does not start with boundary.");
}
String part_name = null, file_name = null, content_type = null;
// Parse the reset of the header lines
mpline = in.readLine();
headerLines++;
while (mpline != null && mpline.trim().length() > 0) {//經過正則的方式獲取文件名稱
Matcher matcher = CONTENT_DISPOSITION_PATTERN.matcher(mpline);
if (matcher.matches()) {
String attributeString = matcher.group(2);
matcher = CONTENT_DISPOSITION_ATTRIBUTE_PATTERN.matcher(attributeString);
while (matcher.find()) {
String key = matcher.group(1);
if (key.equalsIgnoreCase("name")) {
part_name = matcher.group(2);
} else if (key.equalsIgnoreCase("filename")) {
file_name = matcher.group(2);
}
}
}
matcher = CONTENT_TYPE_PATTERN.matcher(mpline);
if (matcher.matches()) {
content_type = matcher.group(2).trim();
}
mpline = in.readLine();
headerLines++;
}
int part_header_len = 0;
while (headerLines-- > 0) {
part_header_len = scipOverNewLine(part_header_buff, part_header_len);
}
// Read the part data
if (part_header_len >= len - 4) {
throw new ResponseException(Response.Status.INTERNAL_ERROR, "Multipart header size exceeds MAX_HEADER_SIZE.");
}
int part_data_start = boundary_idxs[bi] + part_header_len;
int part_data_end = boundary_idxs[bi + 1] - 4;
fbuf.position(part_data_start);
if (content_type == null) {
// Read the part into a string
byte[] data_bytes = new byte[part_data_end - part_data_start];
fbuf.get(data_bytes);
parms.put(part_name, new String(data_bytes, encoding));
} else {
// Read it into a file
String path = saveTmpFile(fbuf, part_data_start, part_data_end - part_data_start, file_name);//將文件存儲在一個臨時目錄
if (!files.containsKey(part_name)) {
files.put(part_name, path);
} else {
int count = 2;
while (files.containsKey(part_name + count)) {
count++;
}
files.put(part_name + count, path);
}
if (!parms.containsKey(part_name)) {
parms.put(part_name, file_name);
} else {
int count = 2;
while (parms.containsKey(part_name + count)) {
count++;
}
parms.put(part_name + count, file_name);
}
}
}
} catch (ResponseException re) {
throw re;
} catch (Exception e) {
throw new ResponseException(Response.Status.INTERNAL_ERROR, e.toString());
}
}
複製代碼
上面的代碼就是將客戶端上傳的文件解析出來,而後存儲在服務端。在解析數據的時候用了不少的正則匹配,這也從側面印證了Http協議,若是沒有Http協議也就無法使用正則去解析數據。
以上咱們已經把NanoHTTPD中最主要的邏輯介紹完了,NanoHTTPD經過Socket實現了一個服務端,而咱們的客戶端則是由瀏覽器實現的。下面放一張完整的Socket實現客戶端和服務端的流程圖,幫助你們理解。
好了,NanoHTTPD的源碼就解析完了,經過上面的介紹,咱們對Socket, HTTP協議,TCP/IP協議等有了一個更爲深入的認識。不在那麼迷茫。固然了,這之中涉及到了一些系統socket的api,經過這些我想說的是java層的api和系統層的api,原理是同樣的,可是咱們應該更關心底層的實現原理和本質。
在前面建立ServerSocket的時候,設計到了IPV6,下面就介紹一些IPV6的格式。
IPv4地址大小是32位的,好比192.168.0.1, 每一個小圓點之間爲8位。 IPv6的地址爲128位。使用:分割,分紅8個部分,每一個部分爲16位,因此大多數IPv6的地址使用16進製表示,好比ffff:ffff:ffff:ffff:ffff:ffff:fff.
IPv6 地址大小爲 128 位。首選 IPv6 地址表示法爲 x:x:x:x:x:x:x:x,其中每一個 x 是地址的 8 個 16 位部分的十六進制值。IPv6 地址範圍從 0000:0000:0000:0000:0000:0000:0000:0000 至 ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff。 這種是完整的表示格式,其實還有兩種咱們比較常見的簡寫格式: