【Java併發.6】結構化併發應用程序

   

6.1  在線程中執行任務java

  應用程序提供商但願程序支持儘量多的用戶,從而下降每一個用戶的服務成本,而用戶則但願得到儘量快的響應。大多數服務器應用程序都提供了一種天然的任務邊界選擇方式:以獨立的客戶請求爲邊界。數據庫

 

6.1.1  串行地執行任務瀏覽器

  在應用程序中能夠經過多種策略來調度任務,而其中一些策略可以更好地利用潛在的併發性。最簡單的策略就是在單個線程中串行地執行各項任務。緩存

  程序清單 6-1 :串行的 Web 服務器安全

public class SingleThreadWebServer {
    public static void main(String[] args) throws IOException{
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }
}

  SingleThreadWebServer  很簡單,且在理論上是正確的,但在實際生產環境中的執行性能卻很糟糕,由於它每次只能處理一個請求。服務器

  在服務器應用程序中,串行處理機制一般都沒法提供高吞吐率或快速響應性。也有一些例外,例如,當任務數量不多且執行時間長時,或者當服務器只爲單個用戶提供服務,而且該客戶每次只發出一種請求。網絡

 

6.1.2  顯示地爲任務建立線程併發

  經過爲每個請求建立一個新的線程來提供服務,從而實現更高的響應性,如程序清單 6-2 中的 ThreadTaskWebWebServer 所示。框架

  程序清單 6-2:在 Web 服務器中爲每一個請求啓動一個型的線程。socket

public class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable bleck = new Runnable() {
                public void run() {
                    //handleRequest(connection);
         }
       };
     }
  }
}

  對比 ThreadPerTaskWebServer 和 SingleThreadWebServer 區別在於,對於每一個鏈接,主循環都將建立一個新線程來處理請求,而不是在主循環中進行處理。所以可得出三個結論:

  • 任務處理過程從主線程中分離出來,使得主循環可以更快地從新等待下一個到來的鏈接。這使得程序在完成前面的請求以前能夠接受新的請求,從而提升響應性。
  • 任務能夠並行處理,從而能同時服務多個請求。若是有多個處理器,或者任務因爲某種緣由被阻塞,例如等待 I/O 完成、獲取鎖或者資源可用性等,程序的吞吐量將獲得提升。
  • 任務處理代碼必須是線程安全的,由於當有多個任務時會併發地調用這段代碼。

 

6.1.3  無限制建立線程的不足

  在生產環境中,「爲每一個任務分配一個線程」 這種方法存在一些缺陷,尤爲是當須要建立大量的線程時:

  • 線程生命週期的開銷很是高:線程的建立與銷燬並非沒有代價的。根據平臺的不一樣,實際的開銷也有所不一樣,但線程的建立過程都會須要時間,延遲處理的請求,而且須要 JVM 和操做系統提供一些輔助操做。若是請求的到達率很是高且請求的處理過程是輕量級的,例如大多數服務器應用程序就是這種狀況,那麼爲每一個請求建立一個新線程將消耗大量的計算資源。
  • 資源消耗:活躍的線程會消耗系統資源,尤爲是內存。若是你已經擁有足夠多的線程使 CPU 保持忙碌狀態,那麼再建立更多的線程反而會下降性能。
  • 穩定性:在可建立線程的數量上存在一個限制。這個限制隨着平臺的不一樣而不一樣,而且受到多個限制約束,包括 SVM 的啓動參數、Thread 構造函數中請求的棧大小,以及底層操做系統對線程的限制等。若是破壞這些限制,則拋出 OutOfMemoryError 異常。

 

6.2  Executor 框架

   咱們已經分析了兩種經過線程來執行任務的策略,即把全部任務放在單個線程中串行執行,以及將每一個任務放在各自的線程中執行。這兩種方式都存在一些嚴格的限制:串行執行的問題在於其糟糕的響應性和吞吐量,而 「爲每一個任務分配一個線程」 的問題在於資源管理的複雜性。在第五章中,咱們介紹瞭如何經過有界隊列來防止高負荷的應用程序耗盡內存。線程池簡化了線程的管理工做,而且 java.util.concurrent 提供了一種靈活的線程池實現做爲 Executor 框架的一部分。在Java 類庫中,任務執行的主要抽象不是 Thread,而是 Executor,如程序清單6-3:Executor 接口

public interface Executor {
    void execute(Runnable command);
}

 

6.2.1  示例:基於 Executor  的 Web 服務器

  基於 Executor 來構建 Web 服務器是很是容易的。在程序清單 6-4 中用 Executor 代替了硬編碼的線程建立。在這種狀況下使用了一種標準的 Executor 實現,即一個固定長度的線程池,能夠容納 100 個線程。

public class TaskExecutingWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exe = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    //handleRequest(connection);
                }
            };
            exe.execute(task);
        }
    }
}

  咱們能夠很容易地將 TaskExecutionWebServer 修改成相似 ThreadPerTaskWebServer 的行爲,只需使用一個爲每一個請求都建立新線程的 Executor。程序清單 6-5:爲每一個請求啓動一個新線程的 Executor 

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

  一樣,咱們能夠編寫一個 Executor 使 TaskExecutionWebServer 的行爲相似於單線程的行爲,如程序清單 6-6:在調用線程中以同步方式執行全部任務的 Executor 

public class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}
每當看到下面這種形式的代碼時:
    new Thread(rennable).start();
而且你但願得到一種更靈活的執行策略時,請考慮使用 Executor 來代替 Thread

 

6.2.3  線程池

  「在線程池中執行任務」 比 「爲每一個任務分配一個線程」 優點更多。經過重用現有的線程而不是建立新線程,能夠在處理多個請求時分攤在線程建立和銷燬過程當中產生的巨大開銷。另外一個額外的好處是,當請求到達時,工做線程一般已經存在,所以不會因爲等待建立線程而延遲任務的執行,從而提升了響應性。

  類庫提供了一個靈活的線程池以及一些有用的默認配置。能夠經過調用 Executor 中的靜態工廠方法之一來建立一個線程池:

  • newFixedThreadPool:將建立一個固定長度的線程池。(若是某個線程因爲發生了未預期的 Exception 而結束,那麼線程池會補充一個新的線程)。
  • newCachedThreadPool:將建立一個可緩存的線程池,若是線程池當前規模超過了處理需求,那麼回收空閒的線程,而當需求增長時,則能夠添加新的線程,線程池規模不存在任何限制。
  • newSingleThreadExecutor:一個單線程的 Executor,它建立單個工做線程來執行任務,若是線程異常結束,會建立另外一個線程來替代。
  • newScheduledThreadPool:建立一個固定長度的線程池,並且延遲或定時的方式來執行任務,相似 Timer。

 

6.2.4  Executor 的生命週期

  咱們已經知道如何建立一個 Executor,但沒有討論如何關閉它。Executor 的實現一般會建立線程來執行任務。但 JVM 只有在全部線程所有終止後纔會退出。所以,若是沒法正確地關閉 Executor,那麼 JVM 將沒法關閉。

  當關閉應用程序時,可能採用最平緩的關閉形式(完成全部已經啓動的任務,而且再也不接受任何新的任務),也可能採用最粗暴的關閉形式(直接關閉電腦),以及其餘各類可能的形式。

  爲了解決執行服務的生命週期問題,Executor 擴展了 ExecutorService 接口,添加了一些用於生命週期管理的方法。

  程序清單 6-7:ExecutorService 中的生命週期管理方法 

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
    // ... 其餘用於任務提交的便利方法
}

  shutdown 方法將執行平緩的關閉過程:再也不接受新的任務,同時等待已經提交的任務執行完成 -- 包括那些還未開始執行的任務。shutdownNow 方法將執行粗暴的關閉過程:它將嘗試取消全部運行中的任務,而且再也不啓動隊列中還沒有開始執行的任務。

  那麼咱們嘗試吧生命週期管理擴展到 Web服務器的功能。 程序清單 6-8:支持關閉操做的 Web 服務器

public class LifecycleWebServer {
    private final ExecutorService exe = ...;
    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exe.isShutdown()) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    //handleRequest(connection);
                }
            };
            exe.execute(task);
        }
    }
    public void stop() {
        exe.shutdown();
    }
    void handleRequest(Socket connection) {
        Request req = readRequest(connection);
        if (isShutdownRequest(connection)) {
            stop();
        } else {
            dispatchrequest(熱情);
        }
    }
}

 

6.3  找出可利用的並行性

  本節咱們將開發一些不一樣版本的組件,該示例實現瀏覽器程序中的頁面渲染(Page-Rendering)功能,它的做用是將 HTML 頁面繪製到圖像緩存中。爲了簡便,假設 HTML 頁面只包含標籤文本,以及預約義大小的圖片和 URL。

 

6.3.1  示例:串行的頁面渲染器

  最簡單的方式是對 HTML 文檔進行串行處理,但這種方法可能會令用戶感到煩惱,它們必須等待很長時間。另外一種串行執行方法更好一些,它先繪製文本元素,同時爲圖像預留出矩形的佔位空間,在處理完了第一遍文本後,程序再開始下載圖像,並將它們繪製到相應的佔位空間中。

  程序清單 6-10:串行地渲染頁面元素 

public class SingleThreadRender {
    void rederPage(CharSequence source) {
        renderText(source);
        List<ImageData> imageDataList = new ArrayList<ImageData>();
        for (ImageInfo imageInfo : scanFoeImageInfo(source)) {
            imageDataList.add(imageInfo.downloadImage());
        }
        for (ImageData image : imageDataList) {
            rederImage(image);
        }
    }
}

 

6.3.2  攜帶結果的任務 Callable 與 Future

  許多任務實際上都是存在延遲的計算----執行數據庫查詢,從網絡上獲取資源,或者計算某個複雜的功能。對於這些任務,Callable 是一種更好的抽象:它認爲主入口點(即 call)將返回一個值,並可能拋出異常。在Executor 中包含了一些輔助方法能將其餘類型的任務封裝爲一個 Callable ,例如 Runable 和 java.security.privilegedAction。

  程序清單 6-11:Callable 與 Future 接口

public interface Callable<V> {
    V call() throws Exception;
}
public interface Future<V> {
    boolean cancel(boolean var1);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}

  

6.3.3  示例:使用 Future 實現頁面渲染器

  爲了使頁面渲染器實現更高的併發性,首先將渲染過程分解爲兩個任務,一個是渲染全部的文本,另外一個是下載全部的圖像。(由於其中一個任務時 CPU 密集型,一個是 IO 密集型,所以即便在單 CPU 系統上也能提高性能)

  程序清單 6-13:使用 Future 等待圖像下載

public class FutureRender {
    private final ExecutorService executor = ...;
    void rederPage(CharSequence source) throws Exception{
        final List<ImageInfo> imageInfoList = scanFoeImageInfo(source);
        Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
            public List<ImageData> call() {
                List<ImageData> imageDataList = new ArrayList<ImageData>();
                for (ImageInfo imageInfo : imageInfoList) {
                    imageDataList.add(imageInfo.downloadImage());
                }
                return imageDataList
            }
        };
        Future<List<ImageData>> future = executor.submit(task);
        renderText(source);
        
        List<ImageData> imagedata = future.get();
        for (ImageData image : imagedata) {
            rederImage(image);
        }
    }
}

 

6.3.6  示例:使用 CompletionService 實現頁面渲染器

  能夠經過 CompletionService 從兩個方面來提升頁面渲染器的性能:縮短總運行時間以及提升響應性。爲每一幅圖像的下載都建立一個獨立任務,並在線程池中實行它們。

  程序清單 6-15:使用 CompletionService ,使頁面元素在下載完成後當即顯示出來

public class Render {
    private final ExecutorService executor = ...;
    Render(ExecutorService exe) {
        this.executor = exe;
    }
    void rederPage(CharSequence source) throws Exception{
        final List<ImageInfo> imageInfoList = scanFoeImageInfo(source);
        CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageDara>(executor);
        for (final ImageInfo info: imageInfoList) {
            completionService.submit(new Callable<ImageData>() {
                public List<ImageData> call() {
                    return info.downloadImage();
                }
            });
        }
        renderText(source);
        
        for (int i = 0, n = imageInfoList.size(); i < n; i++) {
            Future<ImageData> f = completionService.take();
            ImageData imageData = f.get();
            rederImage(imageData);
        }
    }
}

 

6.3.7  爲任務設置時限

  程序清單 6-16:在指定時間內獲取廣告信息

Page RenderPageWithAd() throws Exception {
        long endNanos = System.nanoTime() + TIME_BUDGET;
        Future<Ad> f = exe.submit(new FetchAdTask());
        //在等待廣告的同時顯示頁面
        Page page = renderPageBody();
        Ad ad;
        //指等待指定的時間長度
        long timeLeft = endNanos - System.nanoTime();
        ad = f.get(timeLeft, NANOSECONDS);
    }

 

6.3.8  示例:批量 爲任務設置時限

List<Future<Integer>> futures = exec.invokeAll(tasks, time, unit);

  ExecutorService 中 invokeAll 方法參數爲一組任務,並返回一組 Future。

相關文章
相關標籤/搜索