JAVA基礎-多線程

 

程序、進程、線程java

程序:由高級語言編寫,而後在編譯的過程當中,被編譯器/解釋器轉譯爲機器語言,從而得以執行程序員

順序執行時的主要特徵包括順序行,封閉性,可再現性算法

進程:進程是程序的一次執行,該程序能夠和其餘程序併發執行。是系統進行資源分配和調用的獨立單位。每個進程都有它本身的內存空間和系統資源。數據庫

進程是由程序、數據和進程控制塊組成。編程

線程:是程序的執行單元,執行路徑。是程序使用CPU的最基本單位。數組

是進程中的單個順序控制流,是一條執行路徑緩存

一個進程若是隻有一條執行路徑,則稱爲單線程程序。安全

一個進程若是有多條執行路徑,則稱爲多線程程序。服務器

線程生命週期圖解網絡

線程的生命週期有五種狀態:

新建狀態(new):當一個線程類實例被建立時,線程處於新建狀態,此時的線程已經被初始化,並分配了資源

就緒狀態(Runnable):已具有運行條件,進入線程隊列,排隊等待CPU。一旦得到CPU使用權,就可進入運行狀態

運行狀態(Running):當處於就緒狀態的線程被調度得到CPU資源時,就進入了運行狀態。定義在線程體中的run()方法被調用,從方法體的第一條語句開始順序執行;

阻塞狀態(Blocked):處於運行狀態的線程因事件的發生,而致使讓出CPU使用權,並終止當前執行,進行阻塞狀態。

例如:Thread myThread = new MyThreadClass( );

例如:myThread.start( );

 

 

多線程有什麼意義

多線程的存在,不是提升程序的執行速度。實際上是爲了提升應用程序的使用率。

程序的執行其實都是在搶CPU的資源,CPU的執行權。

多個進程是在搶這個資源,而其中的某一個進程若是執行路徑比較多,就會有更高的概率搶到CPU的執行權。

咱們是不敢保證哪個線程可以在哪一個時刻搶到,因此線程的執行有隨機性。

 

多線程的代價

設計更復雜

雖然有一些多線程應用程序比單線程的應用程序要簡單,但其餘的通常都更復雜。在多線程訪問共享數據的時候,這部分代碼須要特別的注意。線程之間的交互每每很是複雜。不正確的線程同步產生的錯誤很是難以被發現,而且重現以修復。

上下文切換的開銷

當CPU從執行一個線程切換到執行另一個線程的時候,它須要先存儲當前線程的本地的數據,程序指針等,而後載入另外一個線程的本地數據,程序指針 等,最後纔開始執行。這種切換稱爲「上下文切換」(「context switch」)。CPU會在一個上下文中執行一個線程,而後切換到另一個上下文中執行另一個線程。

增長資源消耗

線程在運行的時候須要從計算機裏面獲得一些資源。除了CPU,線程還須要一些內存來維持它本地的堆棧。它也須要佔用操做系統中一些資源來管理線程。

線程的優先級

Java中Thread對象有一個優先級的概念,優先級被劃分10個級別,建立線程的時候,若是沒有指定優先級,默認是5。主線程的優先級也是5。優先級高的線程會比優先級低的線程得到更多的運行機會。

Thread類定義了3個整形常量MAX_PRIORITY、NORM_PRIORITY、MIN_PRIORITY分別用於表示支持的最高優先級,正常優先級和最低優先級。同時提供了一個getPriority()方法來獲取當前線程優先級。

 

public class PriorityDemo {

public static void main(String[] args) {

System.out.println("最大優先級;"+Thread.MAX_PRIORITY);

System.out.println("正常優先級;"+Thread.NORM_PRIORITY);

System.out.println("最小優先級;"+Thread.MIN_PRIORITY);

System.out.println("主線程優先級;"+Thread.currentThread().getPriority());

Thread t=new Thread();

System.out.println("建立一個線程默認的優先級:"+t.getPriority());

}

}

 

類 Thread

public class Thread extends Object implements Runnable

 

線程 是程序中的執行線程。Java 虛擬機容許應用程序併發地運行多個執行線程。

每一個線程都有一個優先級,高優先級線程的執行優先於低優先級線程。每一個線程均可以或不能夠標記爲一個守護程序。當某個線程中運行的代碼建立一個新 Thread 對象時,該新線程的初始優先級被設定爲建立線程的優先級,而且當且僅當建立線程是守護線程時,新線程纔是守護程序。

當 Java 虛擬機啓動時,一般都會有單個非守護線程(它一般會調用某個指定類的 main 方法)。Java

虛擬機會繼續執行線程,直到下列任一狀況出現時爲止:

  • 調用了 Runtime 類的 exit 方法,而且安全管理器容許退出操做發生。
  • 非守護線程的全部線程都已中止運行,不管是經過從對 run 方法的調用中返回,仍是經過拋出一個傳播到 run 方法以外的異常。

建立新執行線程有兩種方法。一種方法是將類聲明爲 Thread 的子類。該子類應重寫 Thread 類的 run 方法。接下來能夠分配並啓動該子類的實例。例如,計算大於某一規定值的質數的線程能夠寫成:

class PrimeThread extends Thread {

long minPrime;

PrimeThread(long minPrime) {

this.minPrime = minPrime;

}

public void run() {

// compute primes larger than minPrime

. . .

}

}

而後,下列代碼會建立並啓動一個線程:

PrimeThread p = new PrimeThread(143);

p.start();

方法摘要

static int

activeCount()

返回當前線程的線程組中活動線程的數目。

void

checkAccess()

斷定當前運行的線程是否有權修改該線程。

int

countStackFrames()

已過期。 該調用的定義依賴於 suspend(),但它遭到了反對。此外,該調用的結果歷來都不是意義明確的。

static Thread

currentThread()

返回對當前正在執行的線程對象的引用。

void

destroy()

已過期。 該方法最初用於破壞該線程,但不做任何清除。它所保持的任何監視器都會保持鎖定狀態。不過,該方法決不會被實現。即便要實現,它也極有可能以 suspend()

方式被死鎖。若是目標線程被破壞時保持一個保護關鍵系統資源的鎖,則任何線程在任什麼時候候都沒法再次訪問該資源。若是另外一個線程曾試圖鎖定該資源,則會出現死鎖。這類死鎖一般會證實它們本身是「凍結」的進程。有關更多信息,請參閱爲什麼不同意使用

Thread.stop、Thread.suspend 和 Thread.resume?。

static void

dumpStack()

將當前線程的堆棧跟蹤打印至標準錯誤流。

static int

enumerate(Thread[] tarray)

將當前線程的線程組及其子組中的每個活動線程複製到指定的數組中。

static Map<Thread,StackTraceElement[]>

getAllStackTraces()

返回全部活動線程的堆棧跟蹤的一個映射。

ClassLoader

getContextClassLoader()

返回該線程的上下文 ClassLoader。

static Thread.UncaughtExceptionHandler

getDefaultUncaughtExceptionHandler()

返回線程因爲未捕獲到異常而忽然終止時調用的默認處理程序。

long

getId()

返回該線程的標識符。

String

getName()

返回該線程的名稱。

int

getPriority()

返回線程的優先級。

StackTraceElement[]

getStackTrace()

返回一個表示該線程堆棧轉儲的堆棧跟蹤元素數組。

Thread.State

getState()

返回該線程的狀態。

ThreadGroup

getThreadGroup()

返回該線程所屬的線程組。

Thread.UncaughtExceptionHandler

getUncaughtExceptionHandler()

返回該線程因爲未捕獲到異常而忽然終止時調用的處理程序。

static boolean

holdsLock(Object obj)

當且僅當當前線程在指定的對象上保持監視器鎖時,才返回 true。

void

interrupt()

中斷線程。

static boolean

interrupted()

測試當前線程是否已經中斷。

boolean

isAlive()

測試線程是否處於活動狀態。

boolean

isDaemon()

測試該線程是否爲守護線程。

boolean

isInterrupted()

測試線程是否已經中斷。

void

join()

等待該線程終止。

void

join(long millis)

等待該線程終止的時間最長爲 millis 毫秒。

void

join(long millis,

int nanos)

等待該線程終止的時間最長爲 millis 毫秒 + nanos 納秒。

void

resume()

已過期。 該方法只與 suspend() 一塊兒使用,但 suspend()

已經遭到反對,由於它具備死鎖傾向。有關更多信息,請參閱爲什麼不同意使用

Thread.stop、Thread.suspend 和 Thread.resume?。

void

run()

若是該線程是使用獨立的 Runnable 運行對象構造的,則調用該 Runnable 對象的 run 方法;不然,該方法不執行任何操做並返回。

void

setContextClassLoader(ClassLoader cl)

設置該線程的上下文 ClassLoader。

void

setDaemon(boolean on)

將該線程標記爲守護線程或用戶線程。

static void

setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)

設置當線程因爲未捕獲到異常而忽然終止,而且沒有爲該線程定義其餘處理程序時所調用的默認處理程序。

void

setName(String name)

改變線程名稱,使之與參數 name 相同。

void

setPriority(int newPriority)

更改線程的優先級。

void

setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)

設置該線程因爲未捕獲到異常而忽然終止時調用的處理程序。

static void

sleep(long millis)

在指定的毫秒數內讓當前正在執行的線程休眠(暫停執行),此操做受到系統計時器和調度程序精度和準確性的影響。

static void

sleep(long millis,

int nanos)

在指定的毫秒數加指定的納秒數內讓當前正在執行的線程休眠(暫停執行),此操做受到系統計時器和調度程序精度和準確性的影響。

void

start()

使該線程開始執行;Java 虛擬機調用該線程的 run 方法。

void

stop()

已過期。 該方法具備固有的不安全性。用 Thread.stop

來終止線程將釋放它已經鎖定的全部監視器(做爲沿堆棧向上傳播的未檢查 ThreadDeath

異常的一個天然後果)。若是之前受這些監視器保護的任何對象都處於一種不一致的狀態,則損壞的對象將對其餘線程可見,這有可能致使任意的行爲。stop

的許多使用都應由只修改某些變量以指示目標線程應該中止運行的代碼來取代。目標線程應按期檢查該變量,而且若是該變量指示它要中止運行,則從其運行方法依次返回。若是目標線程等待很長時間(例如基於一個條件變量),則應使用 interrupt 方法來中斷該等待。有關更多信息,請參閱爲什麼不同意使用

Thread.stop、Thread.suspend 和 Thread.resume?。

void

stop(Throwable obj)

已過期。 該方法具備固有的不安全性。有關詳細信息,請參閱 stop()。該方法的附加危險是它可用於生成目標線程未準備處理的異常(包括若沒有該方法該線程不太可能拋出的已檢查的異常)。有關更多信息,請參閱爲什麼不同意使用

Thread.stop、Thread.suspend 和 Thread.resume?。

void

suspend()

已過期。 該方法已經遭到反對,由於它具備固有的死鎖傾向。若是目標線程掛起時在保護關鍵系統資源的監視器上保持有鎖,則在目標線程從新開始之前任何線程都不能訪問該資源。若是從新開始目標線程的線程想在調用 resume 以前鎖定該監視器,則會發生死鎖。這類死鎖一般會證實本身是「凍結」的進程。有關更多信息,請參閱爲什麼不同意使用

Thread.stop、Thread.suspend 和 Thread.resume?。

String

toString()

返回該線程的字符串表示形式,包括線程名稱、優先級和線程組。

static void

yield()

暫停當前正在執行的線程對象,並執行其餘線程。

 

接口 Runnable

public interface Runnable

Runnable 接口應該由那些打算經過某一線程執行其實例的類來實現。類必須定義一個稱爲 run

的無參數方法。

設計該接口的目的是爲但願在活動時執行代碼的對象提供一個公共協議。例如,Thread 類實現了 Runnable。激活的意思是說某個線程已啓動而且還沒有中止。

此外,Runnable 爲非 Thread 子類的類提供了一種激活方式。經過實例化某個 Thread 實例並將自身做爲運行目標,就能夠運行實現 Runnable 的類而無需建立 Thread 的子類。大多數狀況下,若是隻想重寫 run() 方法,而不重寫其餘 Thread 方法,那麼應使用 Runnable

接口。這很重要,由於除非程序員打算修改或加強類的基本行爲,不然不該爲該類建立子類。

接口 Callable<V>

public interface Callable<V>

返回結果而且可能拋出異常的任務。實現者定義了一個不帶任何參數的叫作 call 的方法。

Callable 接口相似於 Runnable,二者都是爲那些其實例可能被另外一個線程執行的類設計的。可是 Runnable 不會返回結果,而且沒法拋出通過檢查的異常。

Executors

類包含一些從其餘普通形式轉換成 Callable 類的實用方法。

參數類型爲 Callable 的 java.util.concurrent

中的構造方法

FutureTask(Callable<V> callable)

建立一個 FutureTask,一旦運行就執行給定的 Callable。

 

接口 Future<V>

類型參數:V - 此 Future 的 get 方法所返回的結果類型全部已知子接口: Response<T>, RunnableFuture<V>, RunnableScheduledFuture<V>, ScheduledFuture<V> 全部已知實現類: FutureTask, SwingWorker

 

public interface Future<V>

Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。計算完成後只能使用 get 方法來獲取結果,若有必要,計算完成前能夠阻塞此方法。取消則由 cancel

方法來執行。還提供了其餘方法,以肯定任務是正常完成仍是被取消了。一旦計算完成,就不能再取消計算。若是爲了可取消性而使用 Future

但又不提供可用的結果,則能夠聲明 Future<?> 形式類型、並返回 null 做爲底層任務的結果。

用法示例(注意,下列各種都是構造好的。)

interface ArchiveSearcher { String search(String target); }

class App {

ExecutorService executor = ...

ArchiveSearcher searcher = ...

void showSearch(final String target)

throws InterruptedException {

Future<String> future

= executor.submit(new Callable<String>() {

public String call() {

return searcher.search(target);

}});

displayOtherThings(); // do other things while searching

try {

displayText(future.get()); // use future

} catch (ExecutionException ex) { cleanup(); return; }

}

}

FutureTask

類是 Future 的一個實現,Future 可實現 Runnable,因此可經過 Executor 來執行。例如,可用下列內容替換上面帶有 submit 的構造:

FutureTask<String> future =

new FutureTask<String>(new Callable<String>() {

public String call() {

return searcher.search(target);

}});

executor.execute(future);

第一個submit方法裏面的參數類型就是Callable。

暫時只須要知道Callable通常是和ExecutorService配合來使用的,具體的使用方法講在後面講述。

通常狀況下咱們使用第一個submit方法和第三個submit方法,第二個submit方法不多使用。

public class CallableAndFuture {

public static void main(String[] args) {

System.out.println("開始時間:" + new Date());

ExecutorService service = Executors.newSingleThreadExecutor();

//Future與Callable中的泛型,就是返回值的類型

Future<String> future = service.submit(new Callable<String>() {

public String call() throws Exception {

Thread.sleep(2000);

return "Hello";

}

});

try {

String result = future.get();// 該方法會進行阻塞,等待執行完成

System.out.println(result);

} catch (Exception e) {

e.printStackTrace();

}

System.out.println("結束時間:" + new Date());

service.shutdown();

}

}

線程安全的類

StringBuffer sb = new StringBuffer();

Vector<String> v = new Vector<String>();

Hashtable<String, String> h = new Hashtable<String, String>();

 

線程池的方法建立多線程

爲何要使用線程池?

在java中,若是每一個請求到達就建立一個新線程,開銷是至關大的。在實際使用中,建立和銷燬線程花費的時間和消耗的系統資源都至關大,甚至可能要比在處理實際的用戶請求的時間和資源要多的多。除了建立和銷燬線程的開銷以外,活動的線程也須要消耗系統資源。若是在一個jvm裏建立太多的線程,可能會使系統因爲過分消耗內存或「切換過分」而致使系統資源不足。爲了防止資源不足,須要採起一些辦法來限制任何給定時刻處理的請求數目,儘量減小建立和銷燬線程的次數,特別是一些資源耗費比較大的線程的建立和銷燬,儘可能利用已有對象來進行服務。

線程池主要用來解決線程生命週期開銷問題和資源不足問題。經過對多個任務重複使用線程,線程建立的開銷就被分攤到了多個任務上了,並且因爲在請求到達時線程已經存在,因此消除了線程建立所帶來的延遲。這樣,就能夠當即爲請求服務,使用應用程序響應更快。另外,經過適當的調整線程中的線程數目能夠防止出現資源不足的狀況。

線程池的任務處理策略

若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會分配一個線程去執行這個任務;

若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;

若是線程池中的線程數量大於 corePoolSize時,此時若某線程空閒時間超過keepAliveTime,該線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

使用線程池建立多線程的優勢

減小了建立新線程的時間,提升程序的響應速度

重複利用線程池中的線程,不須要每次都建立新的線程下降資源消耗

便於線程的管理:

corePoolSize:表示容許線程池中容許同時運行的最大線程數

maximumPoolSize:線程池容許的最大線程數,他表示最大能建立多少個線程。maximumPoolSize確定是大於等於corePoolSize

KeepAliveTime:表示線程沒有任務時最多保持多久而後中止

 

接口 ExecutorService

全部超級接口: Executor

全部已知子接口: ScheduledExecutorService

全部已知實現類: AbstractExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor

 

public interface ExecutorServiceextends Executor

Executor

提供了管理終止的方法,以及可爲跟蹤一個或多個異步任務執行情況而生成 Future 的方法。

能夠關閉 ExecutorService,這將致使其拒絕新任務。提供兩個方法來關閉 ExecutorService。shutdown()

方法在終止前容許執行之前提交的任務,而 shutdownNow()

方法阻止等待任務啓動並試圖中止當前正在執行的任務。在終止時,執行程序沒有任務在執行,也沒有任務在等待執行,而且沒法提交新任務。應該關閉未使用的 ExecutorService 以容許回收其資源。

經過建立並返回一個可用於取消執行和/或等待完成的 Future,方法 submit 擴展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量執行的最經常使用形式,它們執行任務

collection,而後等待至少一個,或所有任務完成(可以使用 ExecutorCompletionService

類來編寫這些方法的自定義變體)。

Executors

類提供了用於此包中所提供的執行程序服務的工廠方法。

 

//一、先建立一個線程池

ExecutorService executorService = Executors.newFixedThreadPool(10);

//三、爲線程池中線程分配任務並執行

ThreadPoolExecutor threadPoolExecutor=(ThreadPoolExecutor)executorService;

 

class NetworkService implements Runnable {

private final ServerSocket serverSocket;

private final ExecutorService pool;

 

public NetworkService(int port, int poolSize)

throws IOException {

serverSocket = new ServerSocket(port);

pool = Executors.newFixedThreadPool(poolSize);

}

public void run() { // run the service

try {

for (;;) {

pool.execute(new Handler(serverSocket.accept()));

}

} catch (IOException ex) {

pool.shutdown();

}

}

}

 

class Handler implements Runnable {

private final Socket socket;

Handler(Socket socket) { this.socket = socket; }

public void run() {

// read and service request on socket

}

}

使用線程池方式--Runnable接口

一般,線程池都是經過線程池工廠建立,再調用線程池中的方法獲取線程,再經過線程去執行任務方法。

l Executors:線程池建立工廠類

l public static ExecutorService newFixedThreadPool(int nThreads):返回線程池對象

l ExecutorService:線程池類

l Future<?> submit(Runnable task):獲取線程池中的某一個線程對象,並執行

l Future接口:用來記錄線程任務執行完畢後產生的結果。線程池建立與使用

//建立線程池對象

ExecutorService service = Executors.newFixedThreadPool(2);//包含2個線程對象

//建立Runnable實例對象

MyRunnable r = new MyRunnable();

//本身建立線程對象的方式

//Thread t = new Thread(r);

//t.start(); ---> 調用MyRunnable中的run()

//從線程池中獲取線程對象,而後調用MyRunnable中的run()

service.submit(r);

//再獲取個線程對象,調用MyRunnable中的run()

service.submit(r);

service.submit(r);

//注意:submit方法調用結束後,程序並不終止,是由於線程池控制了線程的關閉。將使用完的線程又歸還到了線程池中

//關閉線程池

//service.shutdown();

使用線程池方式—Callable接口

l Callable接口:與Runnable接口功能類似,用來指定線程的任務。其中的call()方法,用來返回線程任務執行完畢後的結果,call方法可拋出異常。

l ExecutorService:線程池類

l <T> Future<T> submit(Callable<T> task):獲取線程池中的某一個線程對象,並執行線程中的call()方法

l Future接口:用來記錄線程任務執行完畢後產生的結果。線程池建立與使用

l 使用線程池中線程對象的步驟:

l 建立線程池對象

l 建立Callable接口子類對象

l 提交Callable接口子類對象

l 關閉線程池

 

//建立線程池對象

ExecutorService service = Executors.newFixedThreadPool(2);//包含2個線程對象

//建立Callable對象

MyCallable c = new MyCallable();

//從線程池中獲取線程對象,而後調用MyRunnable中的run()

service.submit(c);

//注意:submit方法調用結束後,程序並不終止,是由於線程池控制了線程的關閉。將使用完的線程又歸還到了線程池中

//關閉線程池

//service.shutdown();

public class ThreadPoolDemo {

public static void main(String[] args) throws InterruptedException, ExecutionException {

//建立線程池對象

ExecutorService threadPool = Executors.newFixedThreadPool(2);

//建立一個Callable接口子類對象

//MyCallable c = new MyCallable();

MyCallable c = new MyCallable(100, 200);

MyCallable c2 = new MyCallable(10, 20);

//獲取線程池中的線程,調用Callable接口子類對象中的call()方法, 完成求和操做

//<Integer> Future<Integer> submit(Callable<Integer> task)

// Future 結果對象

Future<Integer> result = threadPool.submit(c);

//此 Future 的 get 方法所返回的結果類型

Integer sum = result.get();

System.out.println("sum=" + sum);

result = threadPool.submit(c2);

sum = result.get();

System.out.println("sum=" + sum);

//關閉線程池(能夠不關閉)

}

}

併發編程模型

併發模型與分佈式系統之間的類似性

本文所描述的併發模型相似於分佈式系統中使用的不少體系結構。在併發系統中線程之間能夠相互通訊。在分佈式系統中進程之間也能夠相互通訊(進程有可能在不一樣的機器中)。線程和進程之間具備不少類似的特性。這也就是爲何不少併發模型一般相似於各類分佈式系統架構。

固然,分佈式系統在處理網絡失效、遠程主機或進程宕掉等方面也面臨着額外的挑戰。可是運行在巨型服務器上的併發系統也可能遇到相似的問題,好比一塊CPU失效、一塊網卡失效或一個磁盤損壞等狀況。雖然出現失效的機率可能很低,可是在理論上仍然有可能發生。

因爲併發模型相似於分佈式系統架構,所以它們一般能夠互相借鑑思想。例如,爲工做者們(線程)分配做業的模型通常與分佈式系統中的負載均衡系統比較類似。一樣,它們在日誌記錄、失效轉移、冪等性等錯誤處理技術上也具備類似性。

並行工做者模型

在並行工做者模型中,委派者(Delegator)將傳入的做業分配給不一樣的工做者。每一個工做者完成整個任務。工做者們並行運做在不一樣的線程上,甚至可能在不一樣的CPU上。

並行工做者模型的優勢

並行工做者模式的優勢是,它很容易理解。你只需添加更多的工做者來提升系統的並行度。

例如,若是你正在作一個網絡爬蟲,能夠試試使用不一樣數量的工做者抓取到必定數量的頁面,而後看看多少數量的工做者消耗的時間最短(意味着性能最 高)。因爲網絡爬蟲是一個IO密集型工做,最終結果頗有多是你電腦中的每一個CPU或核心分配了幾個線程。每一個CPU若只分配一個線程可能有點少,由於在 等待數據下載的過程當中CPU將會空閒大量時間。

並行工做者模型的缺點

並行工做者模型雖然看起來簡單,卻隱藏着一些缺點。接下來的章節中我會分析一些最明顯的弱點。

1)共享狀態可能會很複雜

在實際應用中,並行工做者模型可能比前面所描述的狀況要複雜得多。共享的工做者常常須要訪問一些共享數據,不管是內存中的或者共享的數據庫中的。下圖展現了並行工做者模型是如何變得複雜的:

2)無狀態的工做者

共享狀態可以被系統中得其餘線程修改。因此工做者在每次須要的時候必須重讀狀態,以確保每次都能訪問到最新的副本,無論共享狀態是保存在內存中的仍是在外部數據庫中。工做者沒法在內部保存這個狀態(可是每次須要的時候能夠重讀)稱爲無狀態的。每次都重讀須要的數據,將會致使速度變慢,特別是狀態保存在外部數據庫中的時候。

3)任務順序是不肯定的

並行工做者模式的另外一個缺點是,做業執行順序是不肯定的。沒法保證哪一個做業最早或者最後被執行。做業A可能在做業B以前就被分配工做者了,可是做業B反而有可能在做業A以前執行。

並行工做者模式的這種非肯定性的特性,使得很難在任何特定的時間點推斷系統的狀態。這也使得它也更難(若是不是不可能的話)保證一個做業在其餘做業以前被執行。

流水線模型

第二種併發模型咱們稱之爲流水線併發模型。我之因此選用這個名字,只是爲了配合「並行工做者」的隱喻。其餘開發者可能會根據平臺或社區選擇其餘稱呼(好比說反應器系統,或事件驅動系統)。下圖表示一個流水線併發模型:

相似於工廠中生產線上的工人們那樣組織工做者。每一個工做者只負責做業中的部分工做。當完成了本身的這部分工做時工做者會將做業轉發給下一個工做者。每一個工做者在本身的線程中運行,而且不會和其餘工做者共享狀態。有時也被稱爲無共享並行模型。

一般使用非阻塞的IO來設計使用流水線併發模型的系統。非阻塞IO意味着,一旦某個工做者開始一個IO操做的時候(好比讀取文件或從網絡鏈接中讀取 數據),這個工做者不會一直等待IO操做的結束。IO操做速度很慢,因此等待IO操做結束很浪費CPU時間。此時CPU能夠作一些其餘事情。當IO操做完 成的時候,IO操做的結果(好比讀出的數據或者數據寫完的狀態)被傳遞給下一個工做者。

有了非阻塞IO,就可使用IO操做肯定工做者之間的邊界。工做者會盡量多運行直到遇到並啓動一個IO操做。而後交出做業的控制權。當IO操做完成的時候,在流水線上的下一個工做者繼續進行操做,直到它也遇到並啓動一個IO操做。

在實際應用中,做業有可能不會沿着單一流水線進行。因爲大多數系統能夠執行多個做業,做業從一個工做者流向另外一個工做者取決於做業須要作的工做。在實際中可能會有多個不一樣的虛擬流水線同時運行。這是現實當中做業在流水線系統中可能的移動狀況:

做業甚至也有可能被轉發到超過一個工做者上併發處理。好比說,做業有可能被同時轉發到做業執行器和做業日誌器。下圖說明了三條流水線是如何經過將做業轉發給同一個工做者(中間流水線的最後一個工做者)來完成做業:

流水線有時候比這個狀況更加複雜。

反應器,事件驅動系統

採用流水線併發模型的系統有時候也稱爲反應器系統或事件驅動系統。系統內的工做者對系統內出現的事件作出反應,這些事件也有可能來自於外部世界或者 發自其餘工做者。事件能夠是傳入的HTTP請求,也能夠是某個文件成功加載到內存中等。在寫這篇文章的時候,已經有不少有趣的反應器/事件驅動平臺可使 用了,而且不久的未來會有更多。比較流行的彷佛是這幾個:

* Vert.x

* AKKa

* Node.JS(JavaScript)

我我的以爲Vert.x是至關有趣的(特別是對於我這樣使用Java/JVM的人來講)

Actors 和 Channels

Actors 和 channels 是兩種比較相似的流水線(或反應器/事件驅動)模型。

在Actor模型中每一個工做者被稱爲actor。Actor之間能夠直接異步地發送和處理消息。Actor能夠被用來實現一個或多個像前文描述的那樣的做業處理流水線。下圖給出了Actor模型:

而在Channel模型中,工做者之間不直接進行通訊。相反,它們在不一樣的通道中發佈本身的消息(事件)。其餘工做者們能夠在這些通道上監聽消息,發送者無需知道誰在監聽。下圖給出了Channel模型:

在寫這篇文章的時候,channel模型對於我來講彷佛更加靈活。一個工做者無需知道誰在後面的流水線上處理做業。只需知道做業(或消息等)須要轉 發給哪一個通道。通道上的監聽者能夠隨意訂閱或者取消訂閱,並不會影響向這個通道發送消息的工做者。這使得工做者之間具備鬆散的耦合。

流水線模型的優勢

相比並行工做者模型,流水線併發模型具備幾個優勢,在接下來的章節中我會介紹幾個最大的優勢。

1)無需共享的狀態

工做者之間無需共享狀態,意味着實現的時候無需考慮全部因併發訪問共享對象而產生的併發性問題。這使得在實現工做者的時候變得很是容易。在實現工做者的時候就好像是單個線程在處理工做-基本上是一個單線程的實現。

2)有狀態的工做者

當工做者知道了沒有其餘線程能夠修改它們的數據,工做者能夠變成有狀態的。對於有狀態,我是指,它們能夠在內存中保存它們須要操做的數據,只需在最後將更改寫回到外部存儲系統。所以,有狀態的工做者一般比無狀態的工做者具備更高的性能。

3)較好的硬件整合(Hardware Conformity)

單線程代碼在整合底層硬件的時候每每具備更好的優點。首先,當能肯定代碼只在單線程模式下執行的時候,一般可以建立更優化的數據結構和算法。

其次,像前文描述的那樣,單線程有狀態的工做者可以在內存中緩存數據。在內存中緩存數據的同時,也意味着數據頗有可能也緩存在執行這個線程的CPU的緩存中。這使得訪問緩存的數據變得更快。

我說的硬件整合是指,以某種方式編寫的代碼,使得可以天然地受益於底層硬件的工做原理。有些開發者稱之爲mechanical sympathy。 我更傾向於硬件整合這個術語,由於計算機只有不多的機械部件,而且可以隱喻「更好的匹配(match better)」,相比「同情(sympathy)」這個詞在上下文中的意思,我以爲「conform」這個詞表達的很是好。固然了,這裏有點吹毛求疵 了,用本身喜歡的術語就行。

4)合理的做業順序

基於流水線併發模型實現的併發系統,在某種程度上是有可能保證做業的順序的。做業的有序性使得它更容易地推出系統在某個特定時間點的狀態。更進一 步,你能夠將全部到達的做業寫入到日誌中去。一旦這個系統的某一部分掛掉了,該日誌就能夠用來重頭開始重建系統當時的狀態。按照特定的順序將做業寫入日 志,並按這個順序做爲有保障的做業順序。下圖展現了一種可能的設計:

實現一個有保障的做業順序是不容易的,但每每是可行的。若是能夠,它將大大簡化一些任務,例如備份、數據恢復、數據複製等,這些均可以經過日誌文件來完成。

流水線模型的缺點

流水線併發模型最大的缺點是做業的執行每每分佈到多個工做者上,並所以分佈到項目中的多個類上。這樣致使在追蹤某個做業到底被什麼代碼執行時變得困難。

一樣,這也加大了代碼編寫的難度。有時會將工做者的代碼寫成回調處理的形式。若在代碼中嵌入過多的回調處理,每每會出現所謂的回調地獄 (callback hell)現象。所謂回調地獄,就是意味着在追蹤代碼在回調過程當中到底作了什麼,以及確保每一個回調只訪問它須要的數據的時候,變得很是困難

使用並行工做者模型能夠簡化這個問題。你能夠打開工做者的代碼,從頭至尾優美的閱讀被執行的代碼。固然並行工做者模式的代碼也可能一樣分佈在不一樣的類中,但每每也可以很容易的從代碼中分析執行的順序。

同步代碼塊

實例同步方法

public synchronized void add(int value){

this.count += value;

}

 

靜態同步方法

public static synchronized void add( int value ){

count += value ;

}

 

實例方法中的同步塊

public void add(int value){

synchronized(this){

this.count += value;

}

}

 

靜態方法中的同步塊

public class MyClass {

public static synchronized void log1(String msg1, String msg2){

log.writeln(msg1);

log.writeln(msg2);

}

public static void log2(String msg1, String msg2){

synchronized(MyClass.class){

log.writeln(msg1);

log.writeln(msg2);

}

}

}

 

Java類鎖、對象鎖、私有鎖、隱式鎖

類鎖:在代碼中的方法上加了static和synchronized的鎖,或者synchronized(xxx.class)的代碼段,以下文中的increament();

對象鎖:在代碼中的方法上加了synchronized的鎖,或者synchronized(this)的代碼段,以下文中的synOnMethod()和synInMethod();

私有鎖:在類內部聲明一個私有屬性如private Object lock,在須要加鎖的代碼段synchronized(lock),以下文中的synMethodWithObj()。

public class LockTestClass {

//用於類鎖計數

private static int i = 0;

//私有鎖

private Object object = new Object();

/**

* &lt;p&gt;

* 無鎖方法

*

* @param threadID

* @param thread

*/

public void noSynMethod(long threadID, ObjThread thread) {

System.out.println("nosyn: class obj is " + thread + ", threadId is"

+ threadID);

}

/**

* 對象鎖方法1

*/

public synchronized void synOnMethod() {

System.out.println("synOnMethod begins" + ", time = "

+ System.currentTimeMillis() + "ms");

try {

Thread.sleep(2000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("synOnMethod ends");

}

/**

* 對象鎖方法2,採用synchronized (this)來加鎖

*/

public void synInMethod() {

synchronized (this) {

System.out.println("synInMethod begins" + ", time = "

+ System.currentTimeMillis() + "ms");

try {

Thread.sleep(2000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("synInMethod ends");

}

}

/**

* 對象鎖方法3

*/

public void synMethodWithObj() {

synchronized (object) {

System.out.println("synMethodWithObj begins" + ", time = "

+ System.currentTimeMillis() + "ms");

try {

Thread.sleep(2000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("synMethodWithObj ends");

}

}

/**

* 類鎖

*/

public static synchronized void increament() {

System.out.println("class synchronized. i = " + i + ", time = "

+ System.currentTimeMillis() + "ms");

i++;

try {

Thread.sleep(2000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("class synchronized ends.");

}

}

相關文章
相關標籤/搜索