Object 的 wait()
和notify()
方法html
下圖爲線程狀態的圖:
Object 對象中的wait()
和notify()
是用來實現實現等待 / 通知模式。其中等待狀態和阻塞狀態是不一樣的。等待狀態的線程能夠經過notify()
方法喚醒並繼續執行,而阻塞狀態的線程則是等待獲取新的鎖。
調用wait()
方法後,當前線程會進入等待狀態,直到其餘線程調用notify()
或notifyAll()
來喚醒。
調用notify()
方法後,能夠喚醒正在等待的單一線程。java
原子性:即一個操做或者多個操做 要麼所有執行而且執行的過程不會被任何因素打斷,要麼就都不執行。
可見性:指當多個線程訪問同一個變量時,一個線程修改了這個變量的值,其餘線程可以當即看獲得修改的值。
有序性:即程序執行的順序按照代碼的前後順序執行,不進行指令重排列。程序員
synchronized 實現原理?編程
synchronized
能夠保證方法或者代碼塊在運行時,同一時刻只有一個進程能夠訪問,同時它還能夠保證共享變量的內存可見性。數組Java 中每個對象均可以做爲鎖,這是
synchronized
實現同步的基礎:緩存
- 普通同步方法,鎖是當前實例對象
- 靜態同步方法,鎖是當前類的 class 對象
- 同步方法塊,鎖是括號裏面的對象
- 同步代碼塊:
monitorenter
指令插入到同步代碼塊的開始位置,monitorexit
指令插入到同步代碼塊的結束位置,JVM 須要保證每個monitorenter
都有一個monitorexit
與之相對應。任何對象都有一個 Monitor 與之相關聯,當且一個 Monitor 被持有以後,他將處於鎖定狀態。線程執行到monitorenter
指令時,將會嘗試獲取對象所對應的 Monitor 全部權,即嘗試獲取對象的鎖。- 同步方法:
synchronized
方法則會被翻譯成普通的方法調用和返回指令如:invokevirtual
、areturn
指令,在 VM 字節碼層面並無任何特別的指令來實現被synchronized
修飾的方法,而是在 Class 文件的方法表中將該方法的access_flags
字段中的synchronized
標誌位置設置爲 1,表示該方法是同步方法,並使用調用該方法的對象或該方法所屬的 Class 在 JVM 的內部對象表示 Klass 做爲鎖對象。
synchronized
是重量級鎖,在 JDK1.6 中進行優化,如自旋鎖、適應性自旋鎖、鎖消除、鎖粗化、偏向鎖、輕量級鎖等技術來減小鎖操做的開銷。
volatile 的實現原理?安全
volatile 是輕量級的鎖,它不會引發線程上下文的切換和調度。數據結構
volatile
可見性:對一個volatile
的讀,總能夠看到對這個變量最終的寫。volatile
原子性:volatile
對單個讀 / 寫具備原子性(32 位 Long、Double),可是複合操做除外,例如i++
。- JVM 底層採用「內存屏障」來實現
volatile
語義,防止指令重排序。
volatile
常常用於兩個兩個場景:狀態標記變量、Double Check 。多線程
Java 內存模型(JMM)
JMM 規定了線程的工做內存和主內存的交互關係,以及線程之間的可見性和程序的執行順序。
- 一方面,要爲程序員提供足夠強的內存可見性保證。
- 另外一方面,對編譯器和處理器的限制要儘量地放鬆。JMM 對程序員屏蔽了 CPU 以及 OS 內存的使用問題,可以使程序在不一樣的 CPU 和 OS 內存上都可以達到預期的效果。
Java 採用內存共享的模式來實現線程之間的通訊。編譯器和處理器能夠對程序進行重排序優化處理,可是須要遵照一些規則,不能隨意重排序。
在併發編程模式中,勢必會遇到上面三個概念:
- 原子性:一個操做或者多個操做要麼所有執行要麼所有不執行。
- 可見性:當多個線程同時訪問一個共享變量時,若是其中某個線程更改了該共享變量,其餘線程應該能夠馬上看到這個改變。
- 有序性:程序的執行要按照代碼的前後順序執行。
經過
volatile
、synchronized
、final
、concurrent
包等 實現。
有關隊列 AQS 隊列同步器
AQS 是構建鎖或者其餘同步組件的基礎框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等), 包含了實現同步器的細節(獲取同步狀態、FIFO 同步隊列)。AQS 的主要使用方式是繼承,子類經過繼承同步器,並實現它的抽象方法來管理同步狀態。
- 維護一個同步狀態
state
。當state > 0
時,表示已經獲取了鎖;當state = 0
時,表示釋放了鎖。- AQS 經過內置的 FIFO 同步隊列來完成資源獲取線程的排隊工做:
- 若是當前線程獲取同步狀態失敗(鎖)時,AQS 則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程
- 當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
AQS 內部維護的是** CLH 雙向同步隊列**
鎖的特性
可重入鎖:指的是在一個線程中能夠屢次獲取同一把鎖。 ReentrantLock 和 synchronized 都是可重入鎖。
可中斷鎖:顧名思義,就是能夠相應中斷的鎖。synchronized 就不是可中斷鎖,而 Lock 是可中斷鎖。
公平鎖:即儘可能以請求鎖的順序來獲取鎖。synchronized 是非公平鎖,ReentrantLock 和 ReentrantReadWriteLock,它默認狀況下是非公平鎖,可是能夠設置爲公平鎖。
ReentrantLock 鎖
ReentrantLock,可重入鎖,是一種遞歸無阻塞的同步機制。它能夠等同於
synchronized
的使用,可是 ReentrantLock 提供了比synchronized
更強大、靈活的鎖機制,能夠減小死鎖發生的機率。
- ReentrantLock 實現 Lock 接口,基於內部的 Sync 實現。
- Sync 實現 AQS ,提供了 FairSync 和 NonFairSync 兩種實現。
Condition
Condition 和 Lock 一塊兒使用以實現等待/通知模式,經過
await()
和singnal()
來阻塞和喚醒線程。Condition 是一種廣義上的條件隊列。他爲線程提供了一種更爲靈活的等待 / 通知模式,線程在調用 await 方法後執行掛起操做,直到線程等待的某個條件爲真時纔會被喚醒。Condition 必需要配合 Lock 一塊兒使用,由於對共享狀態變量的訪問發生在多線程環境下。一個 Condition 的實例必須與一個 Lock 綁定,所以 Condition 通常都是做爲 Lock 的內部實現。
ReentrantReadWriteLock
讀寫鎖維護着一對鎖,一個讀鎖和一個寫鎖。經過分離讀鎖和寫鎖,使得併發性比通常的排他鎖有了較大的提高:
- 在同一時間,能夠容許多個讀線程同時訪問。
- 可是,在寫線程訪問時,全部讀線程和寫線程都會被阻塞。
讀寫鎖的主要特性:
- 公平性:支持公平性和非公平性。
- 重入性:支持重入。讀寫鎖最多支持 65535 個遞歸寫入鎖和 65535 個遞歸讀取鎖。
- 鎖降級:遵循獲取寫鎖,再獲取讀鎖,最後釋放寫鎖的次序,如此寫鎖可以降級成爲讀鎖。
ReentrantReadWriteLock 實現 ReadWriteLock 接口,可重入的讀寫鎖實現類。
在同步狀態上,爲了表示兩把鎖,將一個 32 位整型分爲高 16 位和低 16 位,分別表示讀和寫的狀態
Synchronized 和 Lock 的區別
- Lock 是一個接口,而 synchronized 是 Java 中的關鍵字,synchronized 是內置的語言實現;
- synchronized 在發生異常時,會自動釋放線程佔有的鎖,所以不會致使死鎖現象發生;而 Lock 在發生異常時,若是沒有主動經過 unLock() 去釋放鎖,則極可能形成死鎖現象,所以使用 Lock 時須要在 finally 塊中釋放鎖;
- Lock 可讓等待鎖的線程響應中斷,而 synchronized 卻不行,使用 synchronized 時,等待的線程會一直等待下去,不可以響應中斷;
- 經過 Lock 能夠知道有沒有成功獲取鎖,而 synchronized 卻沒法辦到。
- Lock 能夠提升多個線程進行讀操做的效率。
更深的:
- 與
synchronized
相比,ReentrantLock 提供了更多,更加全面的功能,具有更強的擴展性。例如:時間鎖等候,可中斷鎖等候,鎖投票。- ReentrantLock 還提供了條件 Condition ,對線程的等待、喚醒操做更加詳細和靈活,因此在多個條件變量和高度競爭鎖的地方,ReentrantLock 更加適合(之後會闡述 Condition)。
- ReentrantLock 提供了可輪詢的鎖請求。它會嘗試着去獲取鎖,若是成功則繼續,不然能夠等到下次運行時處理,而
synchronized
則一旦進入鎖請求要麼成功要麼阻塞,因此相比synchronized
而言,ReentrantLock 會不容易產生死鎖些。- ReentrantLock 支持更加靈活的同步代碼塊,可是使用
synchronized
時,只能在同一個synchronized
塊結構中獲取和釋放。注意,ReentrantLock 的鎖釋放必定要在finally
中處理,不然可能會產生嚴重的後果。- ReentrantLock 支持中斷處理,且性能較
synchronized
會好些。
Java 中線程同步的方式
- sychronized 同步方法或代碼塊
- volatile
- Lock
- ThreadLocal
- 阻塞隊列(LinkedBlockingQueue)
- 使用原子變量(java.util.concurrent.atomic)
- 變量的不可變性
CAS 是一種什麼樣的同步機制?多線程下爲何不使用 int 而使用 AtomicInteger
?
Compare And Swap,比較交換。能夠看到
synchronized
能夠保證代碼塊原子性,不少時候會引發性能問題,volatile
也是個不錯的選擇,可是volatile
不能保證原子性,只能在某些場合下使用。因此能夠經過 CAS 來進行同步,保證原子性。們在讀 Concurrent 包下的類的源碼時,發現不管是 ReentrantLock 內部的 AQS,仍是各類 Atomic 開頭的原子類,內部都應用到了
CAS
。在 CAS 中有三個參數:內存值 V、舊的預期值 A、要更新的值 B ,當且僅當內存值 V 的值等於舊的預期值 A 時,纔會將內存值 V 的值修改成 B,不然什麼都不幹。其僞代碼以下:
if (this.value == A) { this.value = B return true; } else { return false; }CAS 能夠保證一次的讀-改-寫操做是原子操做。
在多線程環境下,int 類型的自增操做不是原子的,線程不安全,可使用
AtomicInteger
代替。// AtomicInteger.java private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
- Unsafe 是 CAS 的核心類,Java 沒法直接訪問底層操做系統,而是經過本地 native` 方法來訪問。不過儘管如此,JVM 仍是開了一個後門:Unsafe ,它提供了硬件級別的原子操做。
valueOffset
爲變量值在內存中的偏移地址,Unsafe 就是經過偏移地址來獲得數據的原值的。value
當前值,使用volatile
修飾,保證多線程環境下看見的是同一個。// AtomicInteger.java public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } // Unsafe.java // compareAndSwapInt(var1, var2, var5, var5 + var4)其實換成 compareAndSwapInt(obj, offset, expect, update)比較清楚,意思就是若是 obj 內的 value 和 expect 相等,就證實沒有其餘線程改變過這個變量,那麼就更新它爲 update,若是這一步的 CAS 沒有成功,那就採用自旋的方式繼續進行 CAS 操做,取出乍一看這也是兩個步驟了啊,其實在 JNI 裏是藉助於一個 CPU 指令完成的。因此仍是原子操做。 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } // 該方法爲本地方法,有四個參數,分別表明:對象、對象的地址、預期值、修改值 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
HashMap 是否是線程安全?如何體現?如何變得安全?
因爲添加元素到 map 中去時,數據量大產生擴容操做,多線程會致使 HashMap 的 node 鏈表造成環狀的數據結構產生死循環。因此 HashMap 是線程不安全的。
如何變得安全:
- Hashtable:經過 synchronized 來保證線程安全的,獨佔鎖,悲觀策略。吞吐量較低,性能較爲低下
- SynchronizedHashMap :經過
Collections.synchronizedMap()
方法對 HashMap 進行包裝,返回一個 SynchronizedHashMap 對象,在源碼中 SynchronizedHashMap 也是用過 synchronized 來保證線程安全的。可是實現方式和 Hashtable 略有不一樣(前者是 synchronized 方法,後者是經過 synchronized 對互斥變量加鎖實現)- ConcurrentHashMap:JUC 中的線程安全容器,高效併發。ConcurrentHashMap 的 key、value 都不容許爲 null。
ConcurrentHashMap 的實現方式?
ConcurrentHashMap 的實現方式和 Hashtable 不一樣,不採用獨佔鎖的形式,更高效,其中在 jdk1.7 和 jdk1.8 中實現的方式也略有不一樣。
Jdk1.7 中採用分段鎖和 HashEntry 使鎖更加細化。ConcurrentHashMap 採用了分段鎖技術,其中 Segment 繼承於 ReentrantLock。不會像 HashTable 那樣無論是 put 仍是 get 操做都須要作同步處理,理論上 ConcurrentHashMap 支持 CurrencyLevel (Segment 數組數量)的線程併發。
Jdk1.8 利用 CAS+Synchronized 來保證併發更新的安全,固然底層採用數組+鏈表+紅黑樹的存儲結構。
- table 中存放 Node 節點數據,默認 Node 數據大小爲 16,擴容大小老是 2^N。
- 爲了保證可見性,Node 節點中的 val 和 next 節點都用
volatile
修飾。- 當鏈表長度大於 8 時,會轉換成紅黑樹,節點會被包裝成
TreeNode
放在TreeBin
中。put()
:1. 計算鍵所對應的 hash 值;2. 若是哈希表還未初始化,調用 initTable() 初始化,不然在 table 中找到 index 位置,並經過 CAS 添加節點。若是鏈表節點數目超過 8,則將鏈表轉換爲紅黑樹。若是節點總數超過,則進行擴容操做。get()
:無需加鎖,直接根據 key 的 hash 值遍歷 node。
CountDownLatch 和 CyclicBarrier 的區別? 併發工具類
CyclicBarrier 它容許一組線程互相等待,直到到達某個公共屏障點 (Common Barrier Point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 Barrier 在釋放等待線程後能夠重用,因此稱它爲循環 ( Cyclic ) 的 屏障 ( Barrier ) 。
每一個線程調用#await()
方法,告訴 CyclicBarrier 我已經到達了屏障,而後當前線程被阻塞。當全部線程都到達了屏障,結束阻塞,全部線程可繼續執行後續邏輯。CountDownLatch 可以使一個線程在等待另一些線程完成各自工做以後,再繼續執行。使用一個計數器進行實現。計數器初始值爲線程的數量。當每個線程完成本身任務後,計數器的值就會減一。當計數器的值爲 0 時,表示全部的線程都已經完成了任務,而後在 CountDownLatch 上等待的線程就能夠恢復執行任務。
二者區別:
- CountDownLatch 的做用是容許 1 或 N 個線程等待其餘線程完成執行;而 CyclicBarrier 則是容許 N 個線程相互等待。
- CountDownLatch 的計數器沒法被重置;CyclicBarrier 的計數器能夠被重置後使用,所以它被稱爲是循環的 barrier 。
Semaphore 是一個控制訪問多個共享資源的計數器,和 CountDownLatch 同樣,其本質上是一個「共享鎖」。一個計數信號量。從概念上講,信號量維護了一個許可集。
- 若有必要,在許可可用前會阻塞每個 acquire,而後再獲取該許可。
- 每一個 release 添加一個許可,從而可能釋放一個正在阻塞的獲取者。
怎麼控制線程,儘量減小上下文切換?
什麼是樂觀鎖和悲觀鎖?
像
synchronized
這種獨佔鎖屬於悲觀鎖,它是在假設必定會發生衝突的,那麼加鎖剛好有用,除此以外,還有樂觀鎖,樂觀鎖的含義就是假設沒有發生衝突,那麼我正好能夠進行某項操做,若是要是發生衝突呢,那我就重試直到成功,樂觀鎖最多見的就是CAS
。
阻塞隊列
阻塞隊列實現了 BlockingQueue 接口,而且有多組處理方法。
拋出異常:add(e)
、remove()
、element()
返回特殊值:offer(e)
、pool()
、peek()
阻塞:put(e)
、take()
JDK 8 中提供了七個阻塞隊列可供使用:
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue :一個由鏈表結構組成的無界阻塞隊列。
- PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
ArrayBlockingQueue,一個由數組實現的有界阻塞隊列。該隊列採用 FIFO 的原則對元素進行排序添加的。內部使用可重入鎖 ReentrantLock + Condition 來完成多線程環境的併發操做。
線程池
線程池有五種狀態:RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED。
- RUNNING:接收並處理任務。
- SHUTDOWN:不接收但處理現有任務。
- STOP:不接收也不處理任務,同時終端當前處理的任務。
- TIDYING:全部任務終止,線程池會變爲 TIDYING 狀態。當線程池變爲 TIDYING 狀態時,會執行鉤子函數 terminated()。
- TERMINATED:線程池完全終止的狀態。
內部變量** ctl **定義爲 AtomicInteger ,記錄了「線程池中的任務數量」和「線程池的狀態」兩個信息。共 32 位,其中高 3 位表示」線程池狀態」,低 29 位表示」線程池中的任務數量」。
線程池建立參數
corePoolSize
線程池中核心線程的數量。當提交一個任務時,線程池會新建一個線程來執行任務,直到當前線程數等於 corePoolSize。若是調用了線程池的 prestartAllCoreThreads() 方法,線程池會提早建立並啓動全部基本線程。
maximumPoolSize
線程池中容許的最大線程數。線程池的阻塞隊列滿了以後,若是還有任務提交,若是當前的線程數小於 maximumPoolSize,則會新建線程來執行任務。注意,若是使用的是無界隊列,該參數也就沒有什麼效果了。
keepAliveTime
線程空閒的時間。線程的建立和銷燬是須要代價的。線程執行完任務後不會當即銷燬,而是繼續存活一段時間:keepAliveTime。默認狀況下,該參數只有在線程數大於 corePoolSize 時纔會生效。
unit
keepAliveTime 的單位。TimeUnit
workQueue
用來保存等待執行的任務的阻塞隊列,等待的任務必須實現 Runnable 接口。咱們能夠選擇以下幾種:
- ArrayBlockingQueue:基於數組結構的有界阻塞隊列,FIFO。
- LinkedBlockingQueue:基於鏈表結構的有界阻塞隊列,FIFO。
- SynchronousQueue:不存儲元素的阻塞隊列,每一個插入操做都必須等待一個移出操做,反之亦然。
- PriorityBlockingQueue:具備優先界別的阻塞隊列。
threadFactory
用於設置建立線程的工廠。該對象能夠經過 Executors.defaultThreadFactory()。他是經過 newThread() 方法提供建立線程的功能,newThread() 方法建立的線程都是「非守護線程」並且「線程優先級都是 Thread.NORM_PRIORITY」。
handler
RejectedExecutionHandler,線程池的拒絕策略。所謂拒絕策略,是指將任務添加到線程池中時,線程池拒絕該任務所採起的相應策略。當向線程池中提交任務時,若是此時線程池中的線程已經飽和了,並且阻塞隊列也已經滿了,則線程池會選擇一種拒絕策略來處理該任務。
線程池提供了四種拒絕策略:
固然咱們也能夠實現本身的拒絕策略,例如記錄日誌等等,實現 RejectedExecutionHandler 接口便可。
- AbortPolicy:直接拋出異常,默認策略;
- CallerRunsPolicy:用調用者所在的線程來執行任務;
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
- DiscardPolicy:直接丟棄任務;
當添加新的任務到線程池時:
- 線程數量未達到 corePoolSize,則新建一個線程(核心線程)執行任務
- 線程數量達到了 corePoolSize,則將任務移入隊列等待
- 隊列已滿,新建線程(非核心線程)執行任務
- 隊列已滿,總線程數又達到了 maximumPoolSize,就會由 handler 的拒絕策略來處理
線程池可經過
Executor
框架來進行建立:FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }corePoolSize 和 maximumPoolSize 都設置爲建立 FixedThreadPool 時指定的參數 nThreads,意味着當線程池滿時且阻塞隊列也已經滿時,若是繼續提交任務,則會直接走拒絕策略,該線程池不會再新建線程來執行任務,而是直接走拒絕策略。FixedThreadPool 使用的是默認的拒絕策略,即 AbortPolicy,則直接拋出異常。
可是
workQueue
使用了無界的 LinkedBlockingQueue, 那麼當任務數量超過 corePoolSize 後,全都會添加到隊列中而不執行拒絕策略。SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }做爲單一 worker 線程的線程池,SingleThreadExecutor 把 corePool 和 maximumPoolSize 均被設置爲 1,和 FixedThreadPool 同樣使用的是無界隊列 LinkedBlockingQueue, 因此帶來的影響和 FixedThreadPool 同樣。
CachedThreadPool
CachedThreadPool是一個會根據須要建立新線程的線程池 ,他定義以下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }這個線程池,當任務提交是就會建立線程去執行,執行完成後線程會空閒60s,以後就會銷燬。可是若是主線程提交任務的速度遠遠大於 CachedThreadPool 的處理速度,則 CachedThreadPool 會不斷地建立新線程來執行任務,這樣有可能會致使系統耗盡 CPU 和內存資源,因此在使用該線程池是,必定要注意控制併發的任務數,不然建立大量的線程可能致使嚴重的性能問題。
爲何要使用線程池?
- 建立/銷燬線程伴隨着系統開銷,過於頻繁的建立/銷燬線程,會很大程度上影響處理效率。線程池緩存線程,可用已有的閒置線程來執行新任務(keepAliveTime)
- 線程併發數量過多,搶佔系統資源從而致使阻塞。運用線程池能有效的控制線程最大併發數,避免以上的問題。
- 對線程進行一些簡單的管理(延時執行、定時循環執行的策略等)
生產者消費者問題
實例代碼用 Object 的
wait()
和notify()
實現,也可用 ReentrantLock 和 Condition 來完成。或者直接使用阻塞隊列。
public class ProducerConsumer { public static void main(String[] args) { ProducerConsumer main = new ProducerConsumer(); Queue<Integer> buffer = new LinkedList<>(); int maxSize = 5; new Thread(main.new Producer(buffer, maxSize), "Producer1").start(); new Thread(main.new Consumer(buffer, maxSize), "Comsumer1").start(); new Thread(main.new Consumer(buffer, maxSize), "Comsumer2").start(); } class Producer implements Runnable { private Queue<Integer> queue; private int maxSize; Producer(Queue<Integer> queue, int maxSize) { this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.size() == maxSize) { try { System.out.println("Queue is full"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Random random = new Random(); int i = random.nextInt(); System.out.println(Thread.currentThread().getName() + " Producing value : " + i); queue.add(i); queue.notifyAll(); } } } } class Consumer implements Runnable { private Queue<Integer> queue; private int maxSize; public Consumer(Queue<Integer> queue, int maxSize) { super(); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.isEmpty()) { try { System.out.println("Queue is empty"); queue.wait(); } catch (Exception ex) { ex.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " Consuming value : " + queue.remove()); queue.notifyAll(); } } } } }