咱們都知道線程間的通訊可使用BlockingQueue,那麼爲何BlockingQueue能夠進行線程間的通訊呢?其實就在於BlockingQueue的鎖機制。重入鎖ReentrantLock帶有一個Condition的條件,能夠進行線程等待和喚醒的功能,而BlockingQueue正是使用了這一機制來進行線程間的通訊的。從LinkedBlockingQueue的jdk的源碼能夠看到安全
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
咱們來看一組示例代碼併發
public final class PCData { private final int intData; public PCData(int intData) { this.intData = intData; } public PCData(String d) { this.intData = Integer.valueOf(d); } public int getIntData() { return intData; } @Override public String toString() { return "data:" + intData; } }
保存數據的對象類框架
public class Consumer implements Runnable { private BlockingQueue<PCData> queue; private static final int SLEEPTIME = 1000; public Consumer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { System.out.println("start Consumer id=" + Thread.currentThread().getId()); Random r = new Random(); try { while (true) { PCData data = queue.take(); if (null != data) { int re = data.getIntData() * data.getIntData(); System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re)); Thread.sleep(r.nextInt(SLEEPTIME)); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
消費者線程,這裏面有一個PCData data = queue.take();這個take()方法就是線程通訊的關鍵。從jdk源碼來看dom
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //優先處理中斷信號 takeLock.lockInterruptibly(); try { //從隊列內取數,當隊列內沒數的時候進行等待中斷 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
當隊列內沒有數據可取的時候,當前線程被中斷,等待隊列內有數據爲止繼續執行。而不是簡單取個數出來那麼簡單。ide
public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<PCData> queue; private static AtomicInteger count = new AtomicInteger(); private static final int SLEEPTIME = 1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data = null; Random r = new Random(); System.out.println("start producer id=" + Thread.currentThread().getId()); try { while (isRunning) { Thread.sleep(r.nextInt(SLEEPTIME)); data = new PCData(count.incrementAndGet()); System.out.println(data + " is put into queue"); if (!queue.offer(data,2, TimeUnit.SECONDS)) { System.out.println("failed to put data: " + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop() { isRunning = false; } }
生產者,這裏有一個方法queue.offer(data,2, TimeUnit.SECONDS),來看看他的jdk源碼高併發
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } //插入一個數據後,c從-1加到0 if (c == 0) //喚醒take()方法的等待,繼續執行 signalNotEmpty(); return true; }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
由上面的代碼能夠看到,若是隊列中一旦有數據就會喚醒取數的線程繼續取數,取完繼續等待中斷。性能
public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueue<PCData> queue = new LinkedBlockingQueue<>(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(3000); service.shutdown(); } }
由此執行能夠看到消費者會一直等待生產者放入數據進行消費。this
運行結果:線程
start producer id=11
start producer id=13
start producer id=12
start Consumer id=14
start Consumer id=15
start Consumer id=16
data:1 is put into queue
1*1=1
data:2 is put into queue
2*2=4
data:3 is put into queue
3*3=9
data:4 is put into queue
4*4=16
data:5 is put into queue
data:6 is put into queue
5*5=25
6*6=36
data:7 is put into queue
7*7=49
data:8 is put into queue
8*8=64
data:9 is put into queue
9*9=81
data:10 is put into queue
10*10=100
data:11 is put into queue
11*11=121
data:12 is put into queue
12*12=144
data:13 is put into queue
13*13=169
data:14 is put into queue
14*14=196
data:15 is put into queue
15*15=225
data:16 is put into queue
16*16=256
data:17 is put into queue
17*17=289
data:18 is put into queue
18*18=324
data:19 is put into queue
data:20 is put into queue
19*19=361
data:21 is put into queue
20*20=400
data:22 is put into queue
21*21=441
22*22=484
data:23 is put into queue
data:24 is put into queue
data:25 is put into queue
data:26 is put into queue
data:27 is put into queue
data:28 is put into queue
23*23=529
24*24=576
data:29 is put into queue
data:30 is put into queue
data:31 is put into queue
25*25=625
26*26=676
27*27=729
28*28=784
data:32 is put into queue
data:33 is put into queue
data:34 is put into queue
29*29=841
data:35 is put into queue
30*30=900
31*31=961
data:36 is put into queue
32*32=1,024
33*33=1,089
data:37 is put into queue
data:38 is put into queue
data:39 is put into queue
34*34=1,156
data:40 is put into queue
data:41 is put into queue
data:42 is put into queue
data:43 is put into queue
35*35=1,225
36*36=1,296
data:44 is put into queue
37*37=1,369
38*38=1,444
39*39=1,521
data:45 is put into queue
data:46 is put into queue
data:47 is put into queue
40*40=1,600
41*41=1,681
data:48 is put into queue
data:49 is put into queue
data:50 is put into queue
42*42=1,764
data:51 is put into queue
data:52 is put into queue
43*43=1,849
44*44=1,936
45*45=2,025
data:53 is put into queue
data:54 is put into queue
data:55 is put into queue
data:56 is put into queue
data:57 is put into queue
46*46=2,116
47*47=2,209
48*48=2,304
49*49=2,401
data:58 is put into queue
50*50=2,500
data:59 is put into queue
51*51=2,601
52*52=2,704
53*53=2,809
data:60 is put into queue
54*54=2,916
55*55=3,025
data:61 is put into queue
data:62 is put into queue
56*56=3,136
57*57=3,249
58*58=3,364
59*59=3,481
60*60=3,600
data:63 is put into queue
61*61=3,721
62*62=3,844
63*63=3,969orm
但這個程序的弊端也是很是明顯的,在高併發的場合,徹底使用鎖和阻塞來實現線程同步,性能並不優越。
固然還有一種特殊的隊列——延遲隊列,也是BlockingQueue接口實現類的一種——DelayQueue
源碼開頭
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); //優先隊列 private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
private Thread leader = null;
它跟LinkedBlockingQueue不一樣,它只有一個鎖跟一個條件跟一個優先隊列。
先簡單介紹一下優先隊列PriorityQueue
@Data @AllArgsConstructor public class Student implements Comparable { private int score; private String name; /** * 對其以分數進行比較 * @param o * @return */ public int compareTo(Object o) { Student current = (Student)o; if (current.getScore() > this.getScore()) { return 1; }else if (current.getScore() == this.getScore()) { return 0; } return -1; } }
public class PriorityMain { public static void main(String[] args) { final PriorityQueue<Student> queue = new PriorityQueue(); Student p1=new Student(95,"張三"); Student p2=new Student(89,"李四"); Student p3=new Student(69,"孫七"); Student p4=new Student(67,"王五"); Student p5=new Student(92,"趙六"); queue.add(p1); queue.add(p2); queue.add(p3);//add 和offer效果同樣。 queue.offer(p4);//add 方法實現,其實就是調用了offer queue.offer(p5); for (Student student:queue) { System.out.println(student.toString()); } System.out.println("-----------------------"); while (!queue.isEmpty()) { System.out.println(queue.poll()); } } }
運行結果:
Student(score=95, name=張三)
Student(score=92, name=趙六)
Student(score=69, name=孫七)
Student(score=67, name=王五)
Student(score=89, name=李四)
-----------------------
Student(score=95, name=張三)
Student(score=92, name=趙六)
Student(score=89, name=李四)
Student(score=69, name=孫七)
Student(score=67, name=王五)
從下盤的結果能夠看出,優先隊列的取出(poll)出來的老是分數最高的。不過這個隊列並非線程安全的。
再回到延遲隊列DelayQueue,它的數據存儲對象須要實現一個接口——Delayed,並具體實現兩個方法compairTo和GetDelay
@Data public class Message implements Delayed { //消息id private int id; // 消息內容 private String body; // 延遲時長,這個是必須的屬性由於要按照這個判斷延時時長 private long excuteTime; public Message(int id,String body,long delayTime) { this.id = id; this.body = body; //將延遲時長(單位毫秒)轉化成納秒 this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime,TimeUnit.MILLISECONDS) + System.nanoTime(); } /** * 延遲任務是否到時就是按照這個方法判斷若是返回的是負數則說明到期不然還沒到期 * @param unit * @return */ public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime - System.nanoTime(),TimeUnit.NANOSECONDS); } /** * 自定義實現比較方法返回 1 0 -1三個參數 * @param o * @return */ public int compareTo(Delayed o) { Message msg = (Message)o; return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1:(Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); } }
消費者線程
@AllArgsConstructor public class Cosumer implements Runnable { private DelayQueue<Message> queue; public void run() { while (true) { try { long time = System.currentTimeMillis(); Message take = queue.take(); System.out.println("消費消息id:" + take.getId() + "消息體:" + take.getBody()); System.out.println("共使用" + (System.currentTimeMillis() - time)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
這裏一樣有一個從延遲隊列取出queue.take(),咱們來看一下JDK的源碼(加了註釋),由源碼能夠看出,Message take = queue.take()要在延遲時間事後且隊列中有消息,才能夠拿到該消息,不然就一直等待和中斷。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //優先處理中斷 lock.lockInterruptibly(); try { for (;;) { //獲取優先隊列的隊首元素 E first = q.peek(); //若是隊首元素爲空,等待中斷 if (first == null) available.await(); else { //取得延期時間 long delay = first.getDelay(NANOSECONDS); //若是到期,從隊列中彈出消息 if (delay <= 0) return q.poll(); //未到期 first = null; // don't retain ref while waiting //該線程依然在運行,中斷等待 if (leader != null) available.await(); else { //該線程未運行,獲取當前線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { //在延遲時間以前一直等待 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { //若是該線程未運行且優先隊列首元素不爲空,喚醒 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
最後是main方法
public class DelayMain { public static void main(String[] args) { DelayQueue<Message> queue = new DelayQueue(); Message m1 = new Message(1,"world",3000); Message m2 = new Message(2,"hello",10000); queue.offer(m2); queue.offer(m1); ExecutorService exec = Executors.newFixedThreadPool(1); exec.execute(new Cosumer(queue)); exec.shutdown(); } }
運行結果
消費消息id:1消息體:world
共使用2999
消費消息id:2消息體:hello
共使用7000
不過程序並無執行完成,由於queue一直在中斷等待。
無鎖框架:Disruptor
在Disruptor中,使用環形隊列(RingBuffer)來代替普通線性隊列。
要使用Disruptor,須要先引用他的jar,pom中配置以下
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.2</version> </dependency>
如今咱們一樣來實現上面的業務邏輯
public class PCData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
數據對象類
public class PCDataFactory implements EventFactory<PCData> { @Override public PCData newInstance() { return new PCData(); } }
數據工廠
public class Consumer implements WorkHandler<PCData> { @Override public void onEvent(PCData pcData) throws Exception { System.out.println(Thread.currentThread().getId() + ":Event: --" + pcData.getValue() * pcData.getValue() + "--"); } }
消費者
public class Producer { private final RingBuffer<PCData> ringBuffer; public Producer(RingBuffer<PCData> ringBuffer) { this.ringBuffer = ringBuffer; } public void pushData(ByteBuffer bb) { long sequence = ringBuffer.next(); try { PCData event = ringBuffer.get(sequence); event.setValue(bb.getLong(0)); } finally { ringBuffer.publish(sequence); } } }
生產者
public class Main { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); PCDataFactory factory = new PCDataFactory(); int bufferSize = 1024; Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor, ProducerType.MULTI,new YieldingWaitStrategy()); //這裏起了4個消費者線程 disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer(),new Consumer(),new Consumer()); disruptor.start(); RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0;true;l++) { bb.putLong(0,l); producer.pushData(bb); Thread.sleep(100); System.out.println("add data " + l); } } }
主方法中YieldingWaitStrategy()有4種策略
一、BlockingWaitStrategy:相似於BlockingQueue,性能不佳
二、SleepingWaitStrategy:比較保守,適合對延時要求不是特別高的場合。
三、YieldingWaitStrategy():低延時場合,對延時有較爲嚴格的要求,可使用這個策略,最好計算機實際線程大於程序使用線程。
四、BusySpinWaitStrategy():最瘋狂的等待策略。就是一個死循環。對延遲很是苛刻的場合能夠考慮使用它。物理CPU數大於程序使用線程。
運行結果:
11:Event: --0--
add data 0
12:Event: --1--
add data 1
13:Event: --4--
add data 2
14:Event: --9--
add data 3
11:Event: --16--
add data 4
12:Event: --25--
add data 5
13:Event: --36--
add data 6
14:Event: --49--
add data 7
11:Event: --64--
add data 8
12:Event: --81--
add data 9
13:Event: --100--
add data 10
14:Event: --121--
add data 11
11:Event: --144--
add data 12
12:Event: --169--
add data 13
13:Event: --196--
add data 14
14:Event: --225--
add data 15
11:Event: --256--
add data 16
12:Event: --289--
add data 17
13:Event: --324--
add data 18
14:Event: --361--
add data 19
11:Event: --400--
add data 20
12:Event: --441--
add data 21
13:Event: --484--
add data 22
14:Event: --529--
add data 23
11:Event: --576--
add data 24
12:Event: --625--
add data 25
13:Event: --676--
add data 26
14:Event: --729--
add data 27
11:Event: --784--
add data 28
12:Event: --841--
add data 29
13:Event: --900--
add data 30
14:Event: --961--
add data 31
11:Event: --1024--
add data 32
12:Event: --1089--
add data 33
13:Event: --1156--
add data 34
14:Event: --1225--
add data 35
11:Event: --1296--
add data 36
12:Event: --1369--
add data 37
13:Event: --1444--
add data 38
14:Event: --1521--
add data 39
11:Event: --1600--
add data 40
12:Event: --1681--
add data 41
13:Event: --1764--
add data 42
14:Event: --1849--
add data 43
11:Event: --1936--
add data 44
12:Event: --2025--
add data 45
13:Event: --2116--
add data 46
14:Event: --2209--
add data 47
11:Event: --2304--
add data 48
12:Event: --2401--
add data 49
13:Event: --2500--
add data 50
14:Event: --2601--
add data 51
11:Event: --2704--
add data 52
12:Event: --2809--
add data 53
13:Event: --2916--
add data 54
14:Event: --3025--
add data 55
11:Event: --3136--
add data 56
12:Event: --3249--
add data 57
13:Event: --3364--
add data 58
14:Event: --3481--
add data 59
11:Event: --3600--
add data 60
12:Event: --3721--
add data 61
13:Event: --3844--
add data 62
14:Event: --3969--
add data 63
11:Event: --4096--
add data 64
12:Event: --4225--
add data 65
13:Event: --4356--
add data 66
14:Event: --4489--
add data 67
11:Event: --4624--
add data 68
12:Event: --4761--
add data 69
13:Event: --4900--
add data 70
14:Event: --5041--
add data 71
11:Event: --5184--
add data 72
12:Event: --5329--
add data 73
13:Event: --5476--
add data 74
14:Event: --5625--
add data 75
11:Event: --5776--
add data 76
12:Event: --5929--
add data 77
13:Event: --6084--
add data 78
14:Event: --6241--
add data 79
11:Event: --6400--
add data 80
12:Event: --6561--
add data 81
13:Event: --6724--
add data 82
14:Event: --6889--
add data 83
11:Event: --7056--
add data 84
12:Event: --7225--
add data 85
13:Event: --7396--
最後說明的是Disruptor使用的是無鎖技術,性能比BlockingQueue至少高一個數量級以上。