public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
final transient ReentrantLock lock = new ReentrantLock();
private transient volatile Object[] array;
// 添加元素,有鎖
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 修改時加鎖,保證併發安全
try {
Object[] elements = getArray(); // 當前數組
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 建立一個新數組,比老的大一個空間
newElements[len] = e; // 要添加的元素放進新數組
setArray(newElements); // 用新數組替換原來的數組
return true;
} finally {
lock.unlock(); // 解鎖
}
}
// 讀元素,不加鎖,所以可能讀取到舊數據
public E get(int index) {
return get(getArray(), index);
}
}
複製代碼
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 讀寫共用此鎖,線程間經過下面兩個Condition通訊
* 這兩個Condition和lock有緊密聯繫(就是lock的方法生成的)
* 相似Object的wait/notify
*/
final ReentrantLock lock;
/** 隊列不爲空的信號,取數據的線程須要關注 */
private final Condition notEmpty;
/** 隊列沒滿的信號,寫數據的線程須要關注 */
private final Condition notFull;
// 一直阻塞直到有東西能夠拿出來
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// 在尾部插入一個元素,隊列已滿時等待指定時間,若是仍是不能插入則返回
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 鎖住
try {
// 循環等待直到隊列有空閒
while (count == items.length) {
if (nanos <= 0)
return false;// 等待超時,返回
// 暫時放出鎖,等待一段時間(可能被提早喚醒並搶到鎖,因此須要循環判斷條件)
// 這段時間可能其餘線程取走了元素,這樣就有機會插入了
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);//插入一個元素
return true;
} finally {
lock.unlock(); //解鎖
}
}
複製代碼
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
// 沒有休息,瘋狂寫入
for (int i = 0; ; i++) {
System.out.println("放入: " + i);
queue.put(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
// 鹹魚模式取數據
while (true) {
System.out.println("取出: " + queue.take());
Thread.sleep((long) (Math.random() * 2000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
/* 輸出:
放入: 0
取出: 0
放入: 1
取出: 1
放入: 2
取出: 2
放入: 3
取出: 3
*/
複製代碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, // 核心線程爲0,沒用的線程都被無情拋棄
Integer.MAX_VALUE, // 最大線程數理論上是無限了,還沒到這個值機器資源就被掏空了
60L, TimeUnit.SECONDS, // 閒置線程60秒後銷燬
new SynchronousQueue<Runnable>()); // offer時若是沒有空閒線程取出任務,則會失敗,線程池就會新建一個線程
}
複製代碼
歡迎你們關注個人公衆號【程序員追風】,文章都會在裏面更新,整理的資料也會放在裏面。
java