ConcurrentLinkedQueue是一個基於連接節點的無界線程安全隊列,它採用先進先出的規則對節點進行排序,當咱們添加一個元素的時候,它會添加到隊列的尾部,當咱們獲取一個元素時,它會返回隊列頭部的元素。html
queue | 阻塞與否 | 是否有界 | 線程安全保障 | 適用場景 | 注意事項 |
---|---|---|---|---|---|
ArrayBlockingQueue | 阻塞 | 有界 | 一把全局鎖 | 生產消費模型,平衡兩邊處理速度 | -- |
LinkedBlockingQueue | 阻塞 | 可配置 | 存取採用2把鎖 | 生產消費模型,平衡兩邊處理速度 | 無界的時候注意內存溢出問題 |
ConcurrentLinkedQueue | 非阻塞 | 無界 | CAS | 對全局的集合進行操做的場景 | size() 是要遍歷一遍集合,慎用 |
因爲ConcurrentLinkedQueue是無界的,因此使用的時候要特別注意內存溢出問題。akka的actor模型,默認的mailbox是用這個來實現的。java
object UnboundedMailbox { class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue { final def queue: Queue[Envelope] = this } }
MyUnboundedMailboxgit
public class MyUnboundedMailbox implements MailboxType, ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> { // This is the MessageQueue implementation public static class MyMessageQueue implements MessageQueue, MyUnboundedMessageQueueSemantics { private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>(); // these must be implemented; queue used as example public void enqueue(ActorRef receiver, Envelope handle) { queue.offer(handle); } public Envelope dequeue() { return queue.poll(); } public int numberOfMessages() { return queue.size(); } public boolean hasMessages() { return !queue.isEmpty(); } public void cleanUp(ActorRef owner, MessageQueue deadLetters) { for (Envelope handle: queue) { deadLetters.enqueue(owner, handle); } } } // This constructor signature must exist, it will be called by Akka public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) { // put your initialization code here } // The create method is called to create the MessageQueue public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { return new MyMessageQueue(); } }