1. 固定大小的「先進先出」隊列json
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; public class TopQueue<E> { private final LinkedBlockingQueue<E> blockQueue; public TopQueue(int size){ this.blockQueue = new LinkedBlockingQueue<E>(size); } public synchronized void put(E e) throws InterruptedException{ if(blockQueue.offer(e)){ return; }else{ blockQueue.take(); blockQueue.offer(e); } } public List<E> getAll(){ return new ArrayList<E>(blockQueue); } public static void main(String[] args) throws InterruptedException{ TopQueue<Integer> tq = new TopQueue<Integer>(3); tq.put(1); tq.put(2); tq.put(3); System.out.println(Arrays.toString(tq.getAll().toArray())); tq.put(4); System.out.println(Arrays.toString(tq.getAll().toArray())); tq.put(5); System.out.println(Arrays.toString(tq.getAll().toArray())); tq.put(6); System.out.println(Arrays.toString(tq.getAll().toArray())); } }
[1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 6]
2. 固定大小的優先隊列(實現一)ide
import java.util.ArrayList; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import com.alibaba.fastjson.JSON; public class TopPriorityQueue<E> { private final PriorityBlockingQueue<E> blockQueue; private final int size; public TopPriorityQueue(int size){ this.blockQueue = new PriorityBlockingQueue<E>(size + 1); this.size = size + 1; // 這裏多加1的緣由是防止put方法中將大的刪除了,反而降小的插入了,因此多加1個用作"哨卡" } public synchronized void put(E e) throws InterruptedException{ if(blockQueue.size() >= size) blockQueue.take(); blockQueue.offer(e); } public List<E> getAll() throws InterruptedException{
synchronized(this){ if(blockQueue.size() >= size) blockQueue.take(); // 前面構造函數中多加了1,這裏減掉一個 } return new ArrayList<E>(blockQueue); } public static void main(String[] args) throws InterruptedException{ final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3); User u1 = new User(1, "bbb", 10); User u2 = new User(2, "ccc", 20); User u3 = new User(3, "ddd", 30); User u4 = new User(4, "fff", 40); User u5 = new User(5, "fff", 50); User u6 = new User(6, "ddd", 60); User u7 = new User(7, "ggg", 70); User u8 = new User(8, "hhh", 80); tq.put(u4); //4 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u8); //4,8 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u7); //4,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u5); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u2); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u3); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u1); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u6); //6,8,7 System.out.println(JSON.toJSONString(tq.getAll())); } }
import java.util.Comparator; public class User implements Comparable<User>{ private int id; private String name; private long score; // 得分 // ... ... public User(int id, String name, long score){ this.id = id; this.name = name; this.score = score; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getScore() { return score; } public void setScore(long score) { this.score = score; } @Override public int compareTo(User o) { return this.getScore() > o.getScore() ? 1 : this.getScore() < o.getScore() ? -1 : 0; } }
[{"id":4,"name":"fff","score":40}] [{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80}] [{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}] [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}] [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}] [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}] [{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}] [{"id":6,"name":"ddd","score":60},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
public synchronized void put(E e) throws InterruptedException{
if(blockQueue.size() >= size)
3. 固定大小的優先隊列(實現二)
import java.util.Comparator; import com.coin.User; public class MyComparator implements Comparator<User> { @Override public int compare(User u1, User u2) { if(u1.getScore() > u2.getScore()) return 1; if(u1.getScore() < u2.getScore()) return -1; return 0; } }
import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import com.alibaba.fastjson.JSON; public class TopPriorityQueue<E> { private final PriorityBlockingQueue<E> blockQueue; private final int size; public TopPriorityQueue(int size, Comparator<E> comparator){ this.blockQueue = new PriorityBlockingQueue<E>(size + 1, comparator); this.size = size + 1; // 這裏多加1的緣由是防止put方法中將大的刪除了,反而降小的插入了,因此多加1個用作"哨卡" } public synchronized void put(E e) throws InterruptedException{ if(blockQueue.size() >= size) blockQueue.take(); blockQueue.offer(e); } public List<E> getAll() throws InterruptedException{ synchronized(this){ if(blockQueue.size() >= size) blockQueue.take(); // 前面構造函數中多加了1,這裏減掉一個 } return new ArrayList<E>(blockQueue); } public static void main(String[] args) throws InterruptedException{ MyComparator myComparator = new MyComparator(); final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3, myComparator); User u1 = new User(1, "bbb", 10); User u2 = new User(2, "ccc", 20); User u3 = new User(3, "ddd", 30); User u4 = new User(4, "fff", 40); User u5 = new User(5, "fff", 50); User u6 = new User(6, "ddd", 60); User u7 = new User(7, "ggg", 70); User u8 = new User(8, "hhh", 80); tq.put(u4); //4 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u8); //4,8 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u7); //4,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u5); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u2); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u3); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u1); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u6); //6,8,7 System.out.println(JSON.toJSONString(tq.getAll())); } }
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp);
另外咱們要注意 LinkedBlockingQueue 和 PriorityBlockingQueue 有一點不一樣,BlockingQueue.offer(e)在隊列滿了時,會返回false,而PriorityBlockingQueue.offer()即便隊列滿了,它會進行擴展,永遠只返回true.
LinkedBlockingQueue .offer() 的源碼以下:
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
if (count.get() == capacity)
return false;
PriorityBlockingQueue.offer() 的源碼以下:
/** * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never return {@code false}. * * @param e the element to add * @return {@code true} (as specified by {@link Queue#offer}) * @throws ClassCastException if the specified element cannot be compared * with elements currently in the priority queue according to the * priority queue's ordering * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
As the queue is unbounded, this method will never return {@code false}.
另外TopQueue 和 TopPriorityQueue 都是線程安全的,可是並不保證插入隊列中的元素自身的線程安全性。