PriorityQueue和Queue的一種變體的實現

隊列和優先隊列是咱們十分熟悉的數據結構。提供了所謂的「先進先出」功能,優先隊列則按照某種規則「先進先出」。可是他們都沒有提供:「固定大小的隊列」和「固定大小的優先隊列」的功能。java

好比咱們要實現:記錄按照時間排序的最近的登陸網站的20我的;按照分數排序的最高的30我的;好比在遊戲中一場兩兩PK的戰鬥,得分最高的6我的;要實現這些功能時,須要的數據結構,在java類庫中沒有現成的類。咱們須要利用現有的類庫來實現它們。node

1. 固定大小的「先進先出」隊列json

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

public class TopQueue<E> {
    private final LinkedBlockingQueue<E> blockQueue;
    
    public TopQueue(int size){
        this.blockQueue = new LinkedBlockingQueue<E>(size);
    }
    
    public synchronized void put(E e) throws InterruptedException{
        if(blockQueue.offer(e)){
            return;
        }else{
            blockQueue.take();
            blockQueue.offer(e);
        }
    }
    
    public List<E> getAll(){
        return new ArrayList<E>(blockQueue);
    }
    
    public static void main(String[] args) throws InterruptedException{
        TopQueue<Integer> tq = new TopQueue<Integer>(3);
        tq.put(1);
        tq.put(2);
        tq.put(3);
        System.out.println(Arrays.toString(tq.getAll().toArray()));
        
        tq.put(4);
        System.out.println(Arrays.toString(tq.getAll().toArray()));
        
        tq.put(5);
        System.out.println(Arrays.toString(tq.getAll().toArray()));
        
        tq.put(6);
        System.out.println(Arrays.toString(tq.getAll().toArray()));
    }    
}

輸出的結果爲:安全

[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]

上面的TopQueue實現了「固定大小的線程安全的」隊列。不管有多少個線程,向TopQueue中放入了多少個元素,在TopQueue中只保留最後放進去的n個元素。數據結構

2. 固定大小的優先隊列(實現一)ide

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import com.alibaba.fastjson.JSON;

public class TopPriorityQueue<E> {
    private final PriorityBlockingQueue<E> blockQueue;
    private final int size;

    public TopPriorityQueue(int size){
        this.blockQueue = new PriorityBlockingQueue<E>(size + 1);
        this.size = size + 1;    // 這裏多加1的緣由是防止put方法中將大的刪除了,反而降小的插入了,因此多加1個用作"哨卡"
    }
    
    public synchronized void put(E e) throws InterruptedException{
        if(blockQueue.size() >= size)
            blockQueue.take();
        blockQueue.offer(e);
    }
    
    public List<E> getAll() throws InterruptedException{
synchronized(this){
if(blockQueue.size() >= size) blockQueue.take(); // 前面構造函數中多加了1,這裏減掉一個 } return new ArrayList<E>(blockQueue); } public static void main(String[] args) throws InterruptedException{ final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3); User u1 = new User(1, "bbb", 10); User u2 = new User(2, "ccc", 20); User u3 = new User(3, "ddd", 30); User u4 = new User(4, "fff", 40); User u5 = new User(5, "fff", 50); User u6 = new User(6, "ddd", 60); User u7 = new User(7, "ggg", 70); User u8 = new User(8, "hhh", 80); tq.put(u4); //4 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u8); //4,8 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u7); //4,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u5); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u2); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u3); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u1); //5,8,7 System.out.println(JSON.toJSONString(tq.getAll())); tq.put(u6); //6,8,7 System.out.println(JSON.toJSONString(tq.getAll())); } }

User類:函數

import java.util.Comparator;

public class User implements Comparable<User>{
    private int id;
    private String name;
    private long score;    // 得分
    // ... ...
    
    public User(int id, String name, long score){
        this.id = id;
        this.name = name;
        this.score = score;
    }
    
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public long getScore() {
        return score;
    }
    public void setScore(long score) {
        this.score = score;
    }

    @Override
    public int compareTo(User o) {
        return this.getScore() > o.getScore() ? 1 : this.getScore() < o.getScore() ? -1 : 0;
    }
}

輸入的結果爲:網站

[{"id":4,"name":"fff","score":40}]
[{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80}]
[{"id":4,"name":"fff","score":40},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
[{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
[{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
[{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
[{"id":5,"name":"fff","score":50},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]
[{"id":6,"name":"ddd","score":60},{"id":8,"name":"hhh","score":80},{"id":7,"name":"ggg","score":70}]

TopPriorityQueue實現了「固定大小的優先隊列」,的實現原理是:this

public synchronized void put(E e) throws InterruptedException{
        if(blockQueue.size() >= size)
            blockQueue.take();
        blockQueue.offer(e);
 }spa

當隊列滿了,還要插入時,就刪除隊列中最小的一個,而後再插入。可是這裏涉及到一個問題,若是這個要被插入的元素優先級要比那個被刪除的元素優先級低呢?那豈不是將大的刪除了,反而將小的插入了。因此這裏咱們採起的辦法是,比實際要求的size的基礎上多保留一個,用作「哨卡」。當隊列滿了時,咱們將「哨卡」刪掉,而後再插入咱們的元素,而後隊列中新的最小的元素就成爲了新的「哨卡」。而「哨卡」由於是最小的一個,不是咱們須要的,返回最終結果時會被刪除掉。因此不會出現刪除了大的,插入了小的問題。這裏有點小技巧。

 

3. 固定大小的優先隊列(實現二)

上面的實現,須要咱們插入隊列的元素Comparable這個接口,可是實際環境中,咱們不太可能去進行這樣的修改,因此咱們還有另一種方法——使用Comparator來搞定,看代碼:

import java.util.Comparator;
import com.coin.User;

public class MyComparator implements Comparator<User> {
    @Override
    public int compare(User u1, User u2) {
        if(u1.getScore() > u2.getScore())
            return 1;
        if(u1.getScore() < u2.getScore())
            return -1;
        return 0;
    }
}
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;

import com.alibaba.fastjson.JSON;

public class TopPriorityQueue<E> {
    private final PriorityBlockingQueue<E> blockQueue;
    private final int size;
    
    public TopPriorityQueue(int size, Comparator<E> comparator){
        this.blockQueue = new PriorityBlockingQueue<E>(size + 1, comparator);
        this.size = size + 1;    // 這裏多加1的緣由是防止put方法中將大的刪除了,反而降小的插入了,因此多加1個用作"哨卡"
    }
    
    public synchronized void put(E e) throws InterruptedException{
        if(blockQueue.size() >= size)
            blockQueue.take();
        blockQueue.offer(e);
    }
    
    public List<E> getAll() throws InterruptedException{
        synchronized(this){
            if(blockQueue.size() >= size)
                blockQueue.take();    // 前面構造函數中多加了1,這裏減掉一個
        }
        
        return new ArrayList<E>(blockQueue);
    }
    
    public static void main(String[] args) throws InterruptedException{
        MyComparator myComparator = new MyComparator();
        final TopPriorityQueue<User> tq = new TopPriorityQueue<User>(3, myComparator);
        User u1 = new User(1, "bbb", 10);
        User u2 = new User(2, "ccc", 20);
        User u3 = new User(3, "ddd", 30);
        User u4 = new User(4, "fff", 40);
        User u5 = new User(5, "fff", 50);
        User u6 = new User(6, "ddd", 60);
        User u7 = new User(7, "ggg", 70);
        User u8 = new User(8, "hhh", 80);

        tq.put(u4);    //4
        System.out.println(JSON.toJSONString(tq.getAll()));
        tq.put(u8);    //4,8
        System.out.println(JSON.toJSONString(tq.getAll()));
        tq.put(u7);    //4,8,7
        System.out.println(JSON.toJSONString(tq.getAll()));
        tq.put(u5);    //5,8,7
        System.out.println(JSON.toJSONString(tq.getAll()));
        tq.put(u2);    //5,8,7
        System.out.println(JSON.toJSONString(tq.getAll()));
        tq.put(u3);    //5,8,7
        System.out.println(JSON.toJSONString(tq.getAll()));
        tq.put(u1);    //5,8,7
        System.out.println(JSON.toJSONString(tq.getAll()));
        tq.put(u6);    //6,8,7
        System.out.println(JSON.toJSONString(tq.getAll()));
    }
}

因此我們在使用PriorityBlockingQueue時,要麼咱們插入的元素實現了Comparable這個接口,要麼我定義一個Comparator,傳入到PriorityBlockingQueue的構造函數中,咱們能夠看下PriorityBlockingQueue.offer(e)方法的源碼,它會對這兩種狀況進行判斷:

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

其中的代碼:

            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);

就是判斷咱們是否在PriorityBlockingQueue的構造函數中是否傳入了Comparator。這樣User類就不須要實現Comparable接口了。

 

另外咱們要注意 LinkedBlockingQueue  和  PriorityBlockingQueue 有一點不一樣,BlockingQueue.offer(e)在隊列滿了時,會返回false,而PriorityBlockingQueue.offer()即便隊列滿了,它會進行擴展,永遠只返回true.

LinkedBlockingQueue .offer() 的源碼以下:

/**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.
     * When using a capacity-restricted queue, this method is generally
     * preferable to method {@link BlockingQueue#add add}, which can fail to
     * insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

當滿了時返回false:

if (count.get() == capacity)
       return false;

 

PriorityBlockingQueue.offer() 的源碼以下:

/**
     * Inserts the specified element into this priority queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws ClassCastException if the specified element cannot be compared
     *         with elements currently in the priority queue according to the
     *         priority queue's ordering
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

當滿了時,會擴容:

while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);

 

As the queue is unbounded, this method will never return {@code false}.

另外TopQueue 和 TopPriorityQueue 都是線程安全的,可是並不保證插入隊列中的元素自身的線程安全性。

相關文章
相關標籤/搜索