getState()
、setState(int newState)
、compareAndSetState(int expect,int update)
)來對同步狀態state進行操做,AQS確保對state操做時線程安全的。acquire(int arg)
以獨佔模式獲取對象,忽略中斷。acquireInterruptibly(int arg)
以獨佔模式獲取對象,若是被中斷則停止。acquire(int arg)
以獨佔模式獲取對象,忽略中斷。acquireShared(int arg)
以共享模式獲取對象,忽略中斷。acquireSharedInterruptibly(int arg)
以共享模式獲取對象,若是被中斷則停止。compareAndSetState(int expect, int update)
若是當前狀態值等於預期值,則以原子方式將同步狀態設置爲給定的更新值。getState()
返回同步狀態的當前值。release(int arg)
以獨佔模式釋放對象。releaseShared(int arg)
以共享模式釋放對象。setState(int newState)
設置同步狀態的值。tryAcquire(int arg)
試圖在獨佔模式下獲取對象狀態。tryAcquireNanos(int arg, long nanosTimeout)
試圖以獨佔模式獲取對象,若是被中斷則停止,若是到了給定超時時間,則會失敗。tryAcquireShared(int arg)
試圖在共享模式下獲取對象狀態。tryAcquireSharedNanos(int arg, long nanosTimeout)
試圖以共享模式獲取對象,若是被中斷則停止,若是到了給定超時時間,則會失敗。tryReleaseShared(int arg)
試圖設置狀態來反映共享模式下的一個釋放。static final class Node {
/** 表示節點正在共享模式中等待 */
static final Node SHARED = new Node();
/** 表示節點正在獨佔模式下等待 */
static final Node EXCLUSIVE = null;
/** 表示取消狀態,同步隊列中等待的線程等待超時或中斷,須要從同步隊列中取消等待,節點進入該值不會發生變化 */
static final int CANCELLED = 1;
/** 後續節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者取消,將會通知後續節點運行*/
static final int SIGNAL = -1;
/** 節點在等待中,節點線程等待在Conditions上。當其餘線程對Condition調用了signal()後,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 */
static final int CONDITION = -2;
/** * 表示下一次共享式同步狀態獲取將會無條件傳播下去 */
static final int PROPAGATE = -3;
volatile int waitStatus;
/**前驅節點**/
volatile Node prev;
/**後繼節點**/
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
複製代碼
節點是構成同步隊列的基礎,同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態的線程將會成爲節點加入該隊列的尾部,同步隊列的java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
package com.example.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/** * 基於AQS實現一個簡單的鎖 * * @author qinxuewu * @create 19/3/18下午11:44 * @since 1.0.0 */
public class MyAQSLock implements Lock {
private final MySync sync=new MySync();
/** * 構建一個內部幫助器 集成AQS */
private static class MySync extends AbstractQueuedSynchronizer{
//狀態爲0時獲取鎖,
/*** * 一個線程進來時,若是狀態爲0,就更改state變量,返回true表示拿到鎖 * * 當state大於0說明當前鎖已經被持有,直接返回false,若是重複進來,就累加state,返回true * @param arg * @return */
@Override
protected boolean tryAcquire(int arg) {
//獲取同步狀態狀態的成員變量的值
int state=getState();
Thread cru=Thread.currentThread();
if(state==0){
//CAS方式更新state,保證原子性,指望值,更新的值
if( compareAndSetState(0,arg)){
//設置成功
//設置當前線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}else if(Thread.currentThread()==getExclusiveOwnerThread()){
//若是仍是當前線程進來,累加state,返回true 可重入
setState(state+1);
return true;
}
return false;
}
/** * 釋放同步狀態 * @param arg * @return */
@Override
protected boolean tryRelease(int arg) {
boolean flag=false;
//判斷釋放操做是不是當前線程,
if(Thread.currentThread()==getExclusiveOwnerThread()){
//獲取同步狀態成員變量,若是大於0 才釋放
int state=getState();
if(getState()==0){
//當前線程置爲null
setExclusiveOwnerThread(null);
flag=true;
}
setState(arg);
}else{
//不是當線程拋出異常
throw new RuntimeException();
}
return flag;
}
Condition newCondition(){
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/** * 加鎖 * @return */
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
/** * 釋放鎖 */
@Override
public void unlock() {
sync.tryRelease(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
複製代碼
測試node
public class MyAQSLockTest {
MyAQSLock lock=new MyAQSLock();
private int i=0;
public int next() {
try {
lock.lock();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i++;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return 0;
}
public void test1(){
System.out.println("test1");
test2();
}
public void test2(){
System.out.println("test2");
}
public static void main(String[] args){
MyAQSLockTest test=new MyAQSLockTest();
// Thread thread = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
//
// System.out.println(Thread.currentThread().getName() + "-" + test.next());
//
// }
//
// }
// });
// thread.start();
//
// Thread thread2 = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
//
// System.out.println(Thread.currentThread().getName() + "-" + test.next());
//
// }
//
// }
// });
// thread2.start();
//可重複鎖演示
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
test.test1();
}
});
thread3.start();
}
}
複製代碼