:https://segmentfault.com/blog...java
前陣子休息天平常在尋找項目裏很差的代碼,看到了這樣的一段代碼:shell
private Result sshSameExec(Session session, String cmd) { if (log.isDebugEnabled()) { log.debug("shell command: {}", cmd); } UserInfo ui = getUserInfo(); session.setUserInfo(ui); int exitStatus = 0; StringBuilder builder = new StringBuilder(); ChannelExec channel; InputStream in; InputStream err; try { session.connect(connectTimeout); channel = (ChannelExec) session.openChannel("exec"); channel.setCommand(cmd); in = channel.getInputStream(); err = channel.getErrStream(); channel.connect(); } catch (Exception e) { throw new CloudRuntimeException(e); } try { long lastRead = Long.MAX_VALUE; byte[] tmp = new byte[1024]; while (true) { while (in.available() > 0 || err.available() > 0) { int i = 0; if (in.available() > 0) { i = in.read(tmp, 0, 1024); } else if (err.available() > 0) { i = err.read(tmp, 0, 1024); } if (i < 0) { break; } lastRead = System.currentTimeMillis(); builder.append(new String(tmp, 0, i)); } if (channel.isClosed()) { if (in.available() > 0) { continue; } exitStatus = channel.getExitStatus(); break; } if (System.currentTimeMillis() - lastRead > exeTimeout) { break; } } } catch (IOException e) { throw new CloudRuntimeException(e); } finally { channel.disconnect(); session.disconnect(); } if (0 != exitStatus) { return Result.createByError(ErrorData.builder() .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode()) .detail(builder.toString()) .title(ResultCode.EXECUTE_SSH_FAIL.toString()) .build()); } else { return Result.createBySuccess(builder.toString()); } }
簡單解釋一下這段代碼——即經過ssh到一臺機器上,而後執行一些命令.對命令輸出的東西,開了一個循環,每一次讀必定的位置,而後以字節流的形式讀回來.segmentfault
這段代碼有點醜,因而我聞到了學習的味道.緩存
首先是對兩個Stream的消費,很顯然,在多核環境下,咱們同時也只可以消費其中一個Stream.其次,這代碼太挫了,本身定義一個tmp,而後102四、1024這樣的去取出來.bash
在改良以前,咱們先來回顧一下JavaIO的接口定義.網絡
這裏有同窗可能問了,爲啥叫它低抽象接口呢?由於它離底層太近了,計算機原本就是處理二進制的,而這兩個接口正是用來處理二進制數據流的.session
先簡單看一眼這兩個接口:app
** * This abstract class is the superclass of all classes representing * an input stream of bytes. * * <p> Applications that need to define a subclass of <code>InputStream</code> * must always provide a method that returns the next byte of input. * * @author Arthur van Hoff * @see java.io.BufferedInputStream * @see java.io.ByteArrayInputStream * @see java.io.DataInputStream * @see java.io.FilterInputStream * @see java.io.InputStream#read() * @see java.io.OutputStream * @see java.io.PushbackInputStream * @since JDK1.0 */ public abstract class InputStream implements Closeable {.....}
/** * This abstract class is the superclass of all classes representing * an output stream of bytes. An output stream accepts output bytes * and sends them to some sink. * <p> * Applications that need to define a subclass of * <code>OutputStream</code> must always provide at least a method * that writes one byte of output. * * @author Arthur van Hoff * @see java.io.BufferedOutputStream * @see java.io.ByteArrayOutputStream * @see java.io.DataOutputStream * @see java.io.FilterOutputStream * @see java.io.InputStream * @see java.io.OutputStream#write(int) * @since JDK1.0 */ public abstract class OutputStream implements Closeable, Flushable {...}
咱們能夠發現,它們都實現了Closeable的接口.所以你們在使用這些原生類時,要注意在結束時調用Close方法哦.ssh
這兩個接口的經常使用實現類有:
- FileInputStream
和FileOutputStream
async
DataInputStream
和DataOutputStream
ObjectInputStream
和ObjectOutputStream
爲啥說它是高級抽象接口呢?咱們先來看看它們的註釋:
/** * Abstract class for writing to character streams. The only methods that a * subclass must implement are write(char[], int, int), flush(), and close(). * Most subclasses, however, will override some of the methods defined here in * order to provide higher efficiency, additional functionality, or both. * * @see Writer * @see BufferedWriter * @see CharArrayWriter * @see FilterWriter * @see OutputStreamWriter * @see FileWriter * @see PipedWriter * @see PrintWriter * @see StringWriter * @see Reader * * @author Mark Reinhold * @since JDK1.1 */ public abstract class Writer implements Appendable, Closeable, Flushable {
/** * Abstract class for reading character streams. The only methods that a * subclass must implement are read(char[], int, int) and close(). Most * subclasses, however, will override some of the methods defined here in order * to provide higher efficiency, additional functionality, or both. * * * @see BufferedReader * @see LineNumberReader * @see CharArrayReader * @see InputStreamReader * @see FileReader * @see FilterReader * @see PushbackReader * @see PipedReader * @see StringReader * @see Writer * * @author Mark Reinhold * @since JDK1.1 */ public abstract class Reader implements Readable, Closeable {
咱們能夠看到,這個抽象類是用來面向character
的,也就是字符.字符的抽象等級必然比字節高,由於字符靠近上層,即人類.
若是咱們直接使用上述實現類去打開一個文件(如FileWriter
、FileReader
、FileInputStream
、FileOutputStream
),對其對象調用read
、write
、readLine
等,每一個請求都是由基礎OS直接處理的,這會使一個程序效率低得多——由於它們都會引起磁盤訪問or網絡請求等.
爲了減小這種開銷,Java 平臺實現緩衝 I/O 流。緩衝輸入流從被稱爲緩衝區(buffer)的存儲器區域讀出數據;僅當緩衝區是空時,本地輸入 API 才被調用。一樣,緩衝輸出流,將數據寫入到緩存區,只有當緩衝區已滿才調用本機輸出 API。
用於包裝非緩存流的緩衝流類有4個:BufferedInputStream
和BufferedOutputStream·用於建立字節緩衝字節流,
BufferedReader和
BufferedWriter`用於建立字符緩衝字節流.
以前,咱們提到了這段代碼寫得搓的地方:
故此,咱們能夠考慮對每一個Stream都進行包裝,支持用線程去消費,其次咱們能夠用高級抽象分接口去適配Byte,而後去裝飾成Buffer.
接下來,咱們來看一段ZStack裏的工具類ShellUtils
,爲了節省篇幅,咱們僅僅截出它在IDE裏的
概覽:
run方法的核心:
public ShellResult run() { StopWatch watch = new StopWatch(); watch.start(); try { if (withSudo) { command = String.format("sudo %s", command); } ProcessBuilder pb = new ProcessBuilder(Arrays.asList("/bin/bash", "-c", command)); if (baseDir == null) { baseDir = System.getProperty("user.home"); } pb.directory(new File(baseDir)); process = pb.start(); if (!suppressTraceLog && logger.isTraceEnabled()) { logger.debug(String.format("exec shell command[%s]", command)); } Writer stdout; int stdoutLog = stdoutLogStrategy(); if (stdoutLog == LOG_TO_FILE) { stdout = new BufferedWriter(new FileWriter(stdoutFile)); } else if (stdoutLog == LOG_TO_SCREEN) { stdout = new BufferedWriter(new OutputStreamWriter(System.out)); } else { stdout = new StringWriter(); } Writer stderr; int stderrLog = stderrLogStrategy(); if (stderrLog == LOG_TO_FILE) { stderr = new BufferedWriter(new FileWriter(stderrFile)); } else if (stderrLog == LOG_TO_SCREEN) { stderr = new BufferedWriter(new OutputStreamWriter(System.err)); } else { stderr = new StringWriter(); } StreamConsumer stdoutConsumer = new StreamConsumer(process.getInputStream(), new PrintWriter(stdout, true), stdoutLog != LOG_TO_FILE); StreamConsumer stderrConsumer = new StreamConsumer(process.getErrorStream(), new PrintWriter(stderr, true), stderrLog != LOG_TO_FILE); stderrConsumer.start(); stdoutConsumer.start(); process.waitFor(); stderrConsumer.join(TimeUnit.SECONDS.toMillis(30)); stdoutConsumer.join(TimeUnit.SECONDS.toMillis(30)); ShellResult ret = new ShellResult(); ret.setCommand(command); ret.setRetCode(process.exitValue()); if (stderrLog == LOG_TO_STRING) { ret.setStderr(stderr.toString()); } else if (stderrLog == LOG_TO_FILE) { stderr.close(); } if (stdoutLog == LOG_TO_STRING) { ret.setStdout(stdout.toString()); } else if (stdoutLog == LOG_TO_FILE) { stdout.close(); } return ret; } catch (Exception e) { StringBuilder sb = new StringBuilder(); sb.append("Shell command failed:\n"); sb.append(command); throw new ShellException(sb.toString(), e); } finally { if (process != null) { process.destroy(); } watch.stop(); if (!suppressTraceLog && logger.isTraceEnabled()) { logger.trace(String.format("shell command[%s] costs %sms to finish", command, watch.getTime())); } } } }
咱們能夠看到StreamConsumer
這個類,咱們來看一下它的代碼:
private static class StreamConsumer extends Thread { final InputStream in; final PrintWriter out; final boolean flush; StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) { this.in = in; this.out = out; flush = flushEveryWrite; } @Override public void run() { BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(in)); String line; while ( (line = br.readLine()) != null) { out.println(line); if (flush) { out.flush(); } } } catch (Exception e) { logger.warn(e.getMessage(), e); } finally { try { if (br != null) { br.close(); } } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
這段代碼已經達到了咱們的理想狀態:線程消費,高級抽象.
閒話很少說,先貼代碼爲敬:
import java.io.InputStream import java.io.InputStreamReader class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable { override fun run() { InputStreamReader(inputStream).buffered().use { it.lines().forEach { r -> result.append(r) } } } }
仍是同樣熟悉的配方,咱們逐行來解讀:
InputStreamReader
,這意味着它能夠輸出字符流了,而後咱們使用了Kotlin的接口將其裝飾成了Buffer.
先看一下上面的圖,咱們都知道內核態線程是由OS調度的,但當一個線程拿到時間片時,卻調到了阻塞IO,那麼只能等在那邊,浪費時間.
而協程則能夠解決這個問題,當一個Job
hang住的時候,能夠去作別的事情,繞開阻塞.更好的利用時間片.
最後,咱們來看一下成品代碼:
override fun sshExecWithCoroutine(session: Session, cmd: String): SimpleResult<out String> { val ui = InnerUserInfo() session.userInfo = ui val exitStatus: Int var channel = ChannelExec() val inputBuilder = StringBuilder() val errorBuilder = StringBuilder() try { session.connect(connectTimeout) channel = session.openChannel("exec") as ChannelExec channel.setCommand(cmd) channel.connect() val inputStream = StreamGobbler(channel.inputStream, inputBuilder) val errStream = StreamGobbler(channel.errStream, errorBuilder) val customJob = GlobalScope.launch { customStream(inputStream, errStream) } while (!customJob.isCompleted) { // wait job be done } exitStatus = channel.exitStatus } catch (e: IOException) { throw java.lang.RuntimeException(e) } finally { if (channel.isConnected) { channel.disconnect() } if (session.isConnected) { session.disconnect() } } return if (0 != exitStatus) { return SimpleResult.createByError(ErrorData.Builder() .errorCode(ResultCode.EXECUTE_SSH_FAIL.value) .detail(errorBuilder.toString()) .title(ResultCode.EXECUTE_SSH_FAIL.toString()) .build()) } else { SimpleResult.createBySuccess(inputBuilder.toString()) } } private suspend fun customStream(inputStream: StreamGobbler, errorStream: StreamGobbler) { val inputDeferred = GlobalScope.async { inputStream.run() } val errorDeferred = GlobalScope.async { errorStream.run() } inputDeferred.join() errorDeferred.join() }