QMQ源碼分析之Actor

前言

QMQ有關actor的一篇文章闡述了actor的應用場景。即client消費消息的請求會先進入一個RequestQueue,在client消費消息時,每每存在多個主題、多個消費組共享一個RequestQueue消費消息。在這個Queue中,存在不一樣主題的有不一樣消費組數量,以及不一樣消費組有不一樣consumer數量,那麼就會存在搶佔資源的狀況。舉個文章中的例子,一個主題下有兩個消費組A和B,A有100個consumer,B有200個consumer,那麼在RequestQueue中來自B的請求可能會多於A,這個時候就存在消費unfair的狀況,因此須要隔離不一樣主題不一樣消費組以保證fair。除此以外,當consumer消費能力不足,形成broker消息堆積,這個時候就會致使consumer所在消費組總在消費"老消息",影響全局總體的一個消費能力。由於"老消息"不會存在page cache中,這個時候極可能就會從磁盤load,那麼表現是RequestQueue中來自消費"老消息"消費組的請求處理時間過長,影響到其餘主題消費組的消費,所以這個時候也須要作策略來避免不一樣消費組的相互影響。因此QMQ就有了actor機制,以消除各個消費組之間因消費能力不一樣、consumer數量不一樣而形成的相互影響各自的消費能力。java

PullMessageWorker

要了解QMQ的actor模式是如何起做用的,就要先來看看Broker是如何處理消息拉取請求的。node

class PullMessageWorker implements ActorSystem.Processor<PullMessageProcessor.PullEntry> {
    // 消息存儲層
    private final MessageStoreWrapper store;
    // actor
    private final ActorSystem actorSystem;
  	
    private final ConcurrentMap<String, ConcurrentMap<String, Object>> subscribers;

    PullMessageWorker(MessageStoreWrapper store, ActorSystem actorSystem) {
        this.store = store;
        this.actorSystem = actorSystem;
        this.subscribers = new ConcurrentHashMap<>();
    }
    
    void pull(PullMessageProcessor.PullEntry pullEntry) {
    		// subject+group做actor調度粒度
        final String actorPath = ConsumerGroupUtils.buildConsumerGroupKey(pullEntry.subject, pullEntry.group);
        // actor調度
        actorSystem.dispatch(actorPath, pullEntry, this);
    }

    @Override
    public boolean process(PullMessageProcessor.PullEntry entry , ActorSystem.Actor<PullMessageProcessor.PullEntry> self) {
        QMon.pullQueueTime(entry.subject, entry.group, entry.pullBegin);

        //開始處理請求的時候就過時了,那麼就直接不處理了,也不返回任何東西給客戶端,客戶端等待超時
        //由於出現這種狀況通常是server端排隊嚴重,暫時掛起客戶端能夠避免狀況惡化
        // deadline機制,若是QMQ認爲這個消費請求來不及處理,那麼就直接返回,避免雪崩
      	if (entry.expired()) {
            QMon.pullExpiredCountInc(entry.subject, entry.group);
            return true;
        }

        if (entry.isInValid()) {
            QMon.pullInValidCountInc(entry.subject, entry.group);
            return true;
        }

      	// 存儲層find消息
        final PullMessageResult pullMessageResult = store.findMessages(entry.pullRequest);

        if (pullMessageResult == PullMessageResult.FILTER_EMPTY ||
                pullMessageResult.getMessageNum() > 0
                || entry.isPullOnce()
                || entry.isTimeout()) {
            entry.processMessageResult(pullMessageResult);
            return true;
        }

      	// 沒有拉取到消息,那麼掛起該actor
        self.suspend();
      	// timer task,在超時前喚醒actor
        if (entry.setTimerOnDemand()) {
            QMon.suspendRequestCountInc(entry.subject, entry.group);
          	// 訂閱消息,一有消息來就喚醒該actor
            subscribe(entry.subject, entry.group);
            return false;
        }

      	// 已經超時,那麼即刻喚醒調度
        self.resume();
        entry.processNoMessageResult();
        return true;
    }

  	// 訂閱
    private void subscribe(String subject, String group) {
        ConcurrentMap<String, Object> map = subscribers.get(subject);
        if (map == null) {
            map = new ConcurrentHashMap<>();
            map = ObjectUtils.defaultIfNull(subscribers.putIfAbsent(subject, map), map);
        }
        map.putIfAbsent(group, HOLDER);
    }

  	// 有消息來就喚醒訂閱的subscriber
    void remindNewMessages(final String subject) {
        final ConcurrentMap<String, Object> map = this.subscribers.get(subject);
        if (map == null) return;

        for (String group : map.keySet()) {
            map.remove(group);
            this.actorSystem.resume(ConsumerGroupUtils.buildConsumerGroupKey(subject, group));
            QMon.resumeActorCountInc(subject, group);
        }
    }
}

// ActorSystem內定義的處理接口
public interface ActorSystem.Processor<T> {
		boolean process(T message, Actor<T> self);
}
複製代碼

能看出在這裏起做用的是這個actorSystem。PullMessageWorker繼承了ActorSystem.Processor,因此真正處理拉取請求的是這個接口裏的process方法。請求到達pullMessageWorker,worker將該次請求交給actorSystem調度,調度到此次請求時,worker還有個根據拉取結果作反應的策略,即若是暫時沒有消息,那麼suspend,以一個timer task定時resume;若是在timer task執行以前有消息進來,那麼也會即時resume。git

ActorSystem

接下來就看看ActorSystem裏邊是如何作的公平調度github

public class ActorSystem {
		// 內部維護的是一個ConcurrentMap,key即PullMessageWorker裏的subject+group
    private final ConcurrentMap<String, Actor> actors;
    // 執行actor的executor
    private final ThreadPoolExecutor executor;
    
    private final AtomicInteger actorsCount;
    private final String name;

    public ActorSystem(String name) {
        this(name, Runtime.getRuntime().availableProcessors() * 4, true);
    }

    public ActorSystem(String name, int threads, boolean fair) {
        this.name = name;
        this.actorsCount = new AtomicInteger();
        // 這裏根據fair參數初始化一個優先級隊列做爲executor的參數,處理關於前言裏說的"老消息"的狀況
        BlockingQueue<Runnable> queue = fair ? new PriorityBlockingQueue<>() : new LinkedBlockingQueue<>();
        this.executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.MINUTES, queue, new NamedThreadFactory("actor-sys-" + name));
        this.actors = Maps.newConcurrentMap();
        QMon.dispatchersGauge(name, actorsCount::doubleValue);
        QMon.actorSystemQueueGauge(name, () -> (double) executor.getQueue().size());
    }
}
複製代碼

能夠看到,用一個線程池處理actor的調度執行,這個線程池裏的隊列是一個優先級隊列。優先級隊列存儲的元素是Actor。關於Actor咱們稍後來看,先來看一下ActorSystem的處理調度流程。緩存

// PullMessageWorker調用的就是這個方法
    public <E> void dispatch(String actorPath, E msg, Processor<E> processor) {
      	// 取得actor
        Actor<E> actor = createOrGet(actorPath, processor);
      	// 在後文Actor定義裏能看到,actor內部維護一個queue,這裏actor僅僅是offer(msg)
        actor.dispatch(msg);
      	// 執行調度
        schedule(actor, true);
    }

		// 無消息時,則會掛起
    public void suspend(String actorPath) {
        Actor actor = actors.get(actorPath);
        if (actor == null) return;

        actor.suspend();
    }

		// 有消息則恢復,能夠理解成線程的"就緒狀態"
    public void resume(String actorPath) {
        Actor actor = actors.get(actorPath);
        if (actor == null) return;

        actor.resume();
      	// 當即調度,能夠留意一下那個false
      	// 當actor是"可調度狀態"時,這個actor是否能調度是取決於actor的queue是否有消息
        schedule(actor, false);
    }

    private <E> Actor<E> createOrGet(String actorPath, Processor<E> processor) {
        Actor<E> actor = actors.get(actorPath);
        if (actor != null) return actor;

        Actor<E> add = new Actor<>(this.name, actorPath, this, processor, DEFAULT_QUEUE_SIZE);
        Actor<E> old = actors.putIfAbsent(actorPath, add);
        if (old == null) {
            LOG.info("create actorSystem: {}", actorPath);
            actorsCount.incrementAndGet();
            return add;
        }
        return old;
    }

		// 將actor入隊的地方
    private <E> boolean schedule(Actor<E> actor, boolean hasMessageHint) {
      	// 若是actor不能調度,則ret false
        if (!actor.canBeSchedule(hasMessageHint)) return false;
      	// 設置actor爲"可調度狀態"
        if (actor.setAsScheduled()) {
          	// 提交時間,和actor執行總耗時共同決定在隊列裏的優先級
            actor.submitTs = System.currentTimeMillis();
          	// 入隊,入的是線程池裏的優先級隊列
            this.executor.execute(actor);
            return true;
        }
      	// actor.setAsScheduled()裏,這裏是actor已是可調度狀態,那麼不必再次入隊
        return false;
    }
複製代碼

actorSystem維護一個線程池,線程池隊列具備優先級,隊列存儲元素是actor。actor的粒度是subject+group。Actor是一個Runnable,且由於是優先級隊列的存儲元素因此需繼承Comparable接口(隊列並無傳Comparator參數),而且actor有四種狀態,初始狀態、可調度狀態、掛起狀態、調度狀態(這個狀態其實不存在,可是暫且這麼叫以幫助理解)。app

接下來看看Actor這個類:ide

public static class Actor<E> implements Runnable, Comparable<Actor> {
      	// 初始狀態
        private static final int Open = 0;
      	// 可調度狀態
        private static final int Scheduled = 2;
      	// 掩碼,二進制表示:11 與Open和Scheduled做&運算 
      	// shouldScheduleMask&currentStatus != Open 則爲不可置爲調度狀態(當currentStatus爲掛起狀態或調度狀態)
        private static final int shouldScheduleMask = 3;
        private static final int shouldNotProcessMask = ~2;
      	// 掛起狀態
        private static final int suspendUnit = 4;
        //每一個actor至少執行的時間片
        private static final int QUOTA = 5;
      	// status屬性內存偏移量,用Unsafe操做
        private static long statusOffset;

        static {
            try {
                statusOffset = Unsafe.instance.objectFieldOffset(Actor.class.getDeclaredField("status"));
            } catch (Throwable t) {
                throw new ExceptionInInitializerError(t);
            }
        }

        final String systemName;
        final ActorSystem actorSystem;
      	// actor內部維護的queue,後文簡單分析下
        final BoundedNodeQueue<E> queue;
      	// ActorSystem內部定義接口,PullMessageWorker實現的就是這個接口,用於真正業務邏輯處理的地方
        final Processor<E> processor;
        private final String name;
      	// 一個actor執行總耗時
        private long total;
      	// actor執行提交時間,即actor入隊時間
        private volatile long submitTs;
        //經過Unsafe操做
        private volatile int status;

        Actor(String systemName, String name, ActorSystem actorSystem, Processor<E> processor, final int queueSize) {
            this.systemName = systemName;
            this.name = name;
            this.actorSystem = actorSystem;
            this.processor = processor;
            this.queue = new BoundedNodeQueue<>(queueSize);

            QMon.actorQueueGauge(systemName, name, () -> (double) queue.count());
        }

      	// 入隊,是actor內部的隊列
        boolean dispatch(E message) {
            return queue.add(message);
        }

      	// actor執行的地方
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            String old = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(systemName + "-" + name);
                if (shouldProcessMessage()) {
                    processMessages();
                }
            } finally {
                long duration = System.currentTimeMillis() - start;
              	// 每次actor執行的耗時累加到total
                total += duration;
                QMon.actorProcessTime(name, duration);

                Thread.currentThread().setName(old);
              	// 設置爲"空閒狀態",即初始狀態 (currentStatus & ~Scheduled)
                setAsIdle();
              	// 進行下一次調度
                this.actorSystem.schedule(this, false);
            }
        }

        void processMessages() {
            long deadline = System.currentTimeMillis() + QUOTA;
            while (true) {
                E message = queue.peek();
                if (message == null) return;
              	// 處理業務邏輯
                boolean process = processor.process(message, this);
              	// 失敗,該message不會出隊,等待下一次調度
              	// 如pullMessageWorker中沒有消息時將actor掛起
                if (!process) return;
		
              	// 出隊
                queue.pollNode();
              	// 每一個actor只有QUOTA個時間片的執行時間
                if (System.currentTimeMillis() >= deadline) return;
            }
        }

        final boolean shouldProcessMessage() {
          	// 可以真正執行業務邏輯的判斷
          	// 一種場景是,針對掛起狀態,因爲沒有拉取到消息該actor置爲掛起狀態
          	// 天然就沒有搶佔時間片的必要了
            return (currentStatus() & shouldNotProcessMask) == 0;
        }

      	// 可否調度
        private boolean canBeSchedule(boolean hasMessageHint) {
            int s = currentStatus();
            if (s == Open || s == Scheduled) return hasMessageHint || !queue.isEmpty();
            return false;
        }

        public final boolean resume() {
            while (true) {
                int s = currentStatus();
                int next = s < suspendUnit ? s : s - suspendUnit;
                if (updateStatus(s, next)) return next < suspendUnit;
            }
        }

        public final void suspend() {
            while (true) {
                int s = currentStatus();
                if (updateStatus(s, s + suspendUnit)) return;
            }
        }

        final boolean setAsScheduled() {
            while (true) {
                int s = currentStatus();
              	// currentStatus爲非Open狀態,則ret false
                if ((s & shouldScheduleMask) != Open) return false;
              	// 更新actor狀態爲調度狀態
                if (updateStatus(s, s | Scheduled)) return true;
            }
        }

        final void setAsIdle() {
            while (true) {
                int s = currentStatus();
              	// 更新actor狀態位不可調度狀態,(這裏能夠理解爲更新爲初始狀態Open)
                if (updateStatus(s, s & ~Scheduled)) return;
            }
        }

        final int currentStatus() {
          	// 根據status在內存中的偏移量取得status
            return Unsafe.instance.getIntVolatile(this, statusOffset);
        }

        private boolean updateStatus(int oldStatus, int newStatus) {
          	// Unsafe 原子操做,處理status的輪轉變動
            return Unsafe.instance.compareAndSwapInt(this, statusOffset, oldStatus, newStatus);
        }

      	// 決定actor在優先級隊列裏的優先級的地方
      	// 先看總耗時,以達到動態限速,保證執行"慢"的請求(已經堆積的消息拉取請求)在後執行
      	// 其次看提交時間,先提交的actor先執行
        @Override
        public int compareTo(Actor o) {
            int result = Long.compare(total, o.total);
            return result == 0 ? Long.compare(submitTs, o.submitTs) : result;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Actor<?> actor = (Actor<?>) o;
            return Objects.equals(systemName, actor.systemName) &&
                    Objects.equals(name, actor.name);
        }

        @Override
        public int hashCode() {
            return Objects.hash(systemName, name);
        }
    }
複製代碼

Actor實現了Comparable,在優先級隊列裏優先級是Actor裏的total和submitTs共同決定的。total是actor執行總耗時,submitTs是調度時間。那麼對於處理較慢的actor天然就會在隊列裏相對"尾部"位置,這時就作到了根據actor的執行耗時的一個動態限速。Actor利用Unsafe機制來控制各個狀態的輪轉原子性更新的,且每一個actor執行時間能夠簡單理解爲5個時間片。oop

其實工做進行到這裏就能夠結束了,可是抱着研究的態度,不妨接着往下看看。ui

Actor內部維護一個Queue,這個Queue是自定義的,是一個Lock-free bounded non-blocking multiple-producer single-consumer queue。JDK裏的QUEUE多數都是用鎖控制,不用鎖,猜想也應該是用Unsafe 原子操做實現。那麼來看看吧:this

private static class BoundedNodeQueue<T> {

				// 頭結點、尾節點在內存中的偏移量
        private final static long enqOffset, deqOffset;

        static {
            try {
                enqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_enqDoNotCallMeDirectly"));
                deqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_deqDoNotCallMeDirectly"));
            } catch (Throwable t) {
                throw new ExceptionInInitializerError(t);
            }
        }

        private final int capacity;
        // 尾節點,經過enqOffset操做
        private volatile Node<T> _enqDoNotCallMeDirectly;
        // 頭結點,經過deqOffset操做
        private volatile Node<T> _deqDoNotCallMeDirectly;

        protected BoundedNodeQueue(final int capacity) {
            if (capacity < 0) throw new IllegalArgumentException("AbstractBoundedNodeQueue.capacity must be >= 0");
            this.capacity = capacity;
            final Node<T> n = new Node<T>();
            setDeq(n);
            setEnq(n);
        }

				// 獲取尾節點
        private Node<T> getEnq() {
        		// getObjectVolatile這種方式保證拿到的都是最新數據
            return (Node<T>) Unsafe.instance.getObjectVolatile(this, enqOffset);
        }

				// 設置尾節點,僅在初始化時用
        private void setEnq(Node<T> n) {
            Unsafe.instance.putObjectVolatile(this, enqOffset, n);
        }

        private boolean casEnq(Node<T> old, Node<T> nju) {
        		// cas,循環設置,直到成功
            return Unsafe.instance.compareAndSwapObject(this, enqOffset, old, nju);
        }

				// 獲取頭結點
        private Node<T> getDeq() {
            return (Node<T>) Unsafe.instance.getObjectVolatile(this, deqOffset);
        }

				// 僅在初始化時用
        private void setDeq(Node<T> n) {
            Unsafe.instance.putObjectVolatile(this, deqOffset, n);
        }

				// cas設置頭結點
        private boolean casDeq(Node<T> old, Node<T> nju) {
            return Unsafe.instance.compareAndSwapObject(this, deqOffset, old, nju);
        }

				// 與其叫count,不如喚做index,可是是否應該考慮溢出的狀況?
        public final int count() {
            final Node<T> lastNode = getEnq();
            final int lastNodeCount = lastNode.count;
            return lastNodeCount - getDeq().count;
        }

        /** * @return the maximum capacity of this queue */
        public final int capacity() {
            return capacity;
        }

        public final boolean add(final T value) {
            for (Node<T> n = null; ; ) {
                final Node<T> lastNode = getEnq();
                final int lastNodeCount = lastNode.count;
                if (lastNodeCount - getDeq().count < capacity) {
                    // Trade a branch for avoiding to create a new node if full,
                    // and to avoid creating multiple nodes on write conflict á la Be Kind to Your GC
                    if (n == null) {
                        n = new Node<T>();
                        n.value = value;
                    }

                    n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq()

                    // Try to putPullLogs the node to the end, if we fail we continue loopin'
                    // 至關於 
                  	// enq -> next = new Node(value); enq = neq -> next;
                    if (casEnq(lastNode, n)) {
                      	// 注意一下這個Node.setNext方法
                        lastNode.setNext(n);
                        return true;
                    }
                } else return false; // Over capacity—couldn't add the node
            }
        }

        public final boolean isEmpty() {
          	// enq == deq 即爲empty
            return getEnq() == getDeq();
        }

        /** * Removes the first element of this queue if any * * @return the value of the first element of the queue, null if empty */
        public final T poll() {
            final Node<T> n = pollNode();
            return (n != null) ? n.value : null;
        }

        public final T peek() {
            Node<T> n = peekNode();
            return (n != null) ? n.value : null;
        }

        protected final Node<T> peekNode() {
            for (; ; ) {
                final Node<T> deq = getDeq();
                final Node<T> next = deq.next();
                if (next != null || getEnq() == deq)
                    return next;
            }
        }

        /** * Removes the first element of this queue if any * * @return the `Node` of the first element of the queue, null if empty */
        public final Node<T> pollNode() {
            for (; ; ) {
                final Node<T> deq = getDeq();
                final Node<T> next = deq.next();
                if (next != null) {
                    if (casDeq(deq, next)) {
                        deq.value = next.value;
                        deq.setNext(null);
                        next.value = null;
                        return deq;
                    } // else we retry (concurrent consumers)
                  	// 比較套路的cas操做,就很少說了
                } else if (getEnq() == deq) return null; // If we got a null and head meets tail, we are empty
            }
        }

        public static class Node<T> {
          
            private final static long nextOffset;

            static {
                try {
                    nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
                } catch (Throwable t) {
                    throw new ExceptionInInitializerError(t);
                }
            }

            protected T value;
            protected int count;
            // 也是利用偏移量操做
            private volatile Node<T> _nextDoNotCallMeDirectly;

            public final Node<T> next() {
                return (Node<T>) Unsafe.instance.getObjectVolatile(this, nextOffset);
            }

            protected final void setNext(final Node<T> newNext) {
              	// 這裏有點講究,下面分析下
                Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
            }
        }
    }
複製代碼

如上代碼,是經過屬性在內存的偏移量,結合cas原子操做來進行更新賦值等操做,以此來實現lock-free,這是比較常規的套路。值得一說的是Node裏的setNext方法,這個方法的調用是在cas節點後,對"上一位置"的next節點進行賦值。而這個方法使用的是Unsafe.instance.putOrderedObject,要說這個putOrderedObject,就不得不說MESI,緩存一致性協議。如volatile,當進行寫操做時,它是依靠storeload barrier來實現其餘線程對此的可見性。而putOrderedObject也是依靠內存屏障,只不過是storestore barrier。storestore是比storeload快速的一種內存屏障。在硬件層面,內存屏障分兩種:Load-Barrier和Store-Barrier。Load-Barrier是讓高速緩存中的數據失效,強制從新從主內存加載數據;Store-Barrier是讓寫入高速緩存的數據更新寫入主內存,對其餘線程可見。而java層面的四種內存屏障無非是硬件層面的兩種內存屏障的組合而已。那麼可見,storestore barrier天然比storeload barrier快速。那麼有一個問題,咱們可不能夠在這裏也用cas操做呢?答案是能夠,但不必。你能夠想一想這裏爲何不必。

相關文章
相關標籤/搜索