ConcurrentLinkedQueue使用實例

ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部,當咱們獲取一個元素時,它會返回隊列頭部的元素。html

對比

queue 阻塞與否 是否有界 線程安全保障 適用場景 注意事項
ArrayBlockingQueue 阻塞 有界 一把全局鎖 生產消費模型,平衡兩邊處理速度 --
LinkedBlockingQueue 阻塞 可配置 存取採用2把鎖 生產消費模型,平衡兩邊處理速度 無界的時候注意內存溢出問題
ConcurrentLinkedQueue 非阻塞 無界 CAS 對全局的集合進行操做的場景 size() 是要遍歷一遍集合,慎用

實例

因爲ConcurrentLinkedQueue是無界的,因此使用的時候要特別注意內存溢出問題。akka的actor模型,默認的mailbox是用這個來實現的。java

object UnboundedMailbox {
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
    final def queue: Queue[Envelope] = this
  }
}

MyUnboundedMailboxgit

public class MyUnboundedMailbox implements MailboxType,
  ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> {

  // This is the MessageQueue implementation
  public static class MyMessageQueue implements MessageQueue,
      MyUnboundedMessageQueueSemantics {
    private final Queue<Envelope> queue =
      new ConcurrentLinkedQueue<Envelope>();

    // these must be implemented; queue used as example
    public void enqueue(ActorRef receiver, Envelope handle) {
      queue.offer(handle);
    }
    public Envelope dequeue() { return queue.poll(); }
    public int numberOfMessages() { return queue.size(); }
    public boolean hasMessages() { return !queue.isEmpty(); }
    public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
      for (Envelope handle: queue) {
        deadLetters.enqueue(owner, handle);
      }
    }
  }

  // This constructor signature must exist, it will be called by Akka
  public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {
    // put your initialization code here
  }

  // The create method is called to create the MessageQueue
  public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
    return new MyMessageQueue();
  }
}

doc

相關文章
相關標籤/搜索