Storm-源碼分析- Messaging (backtype.storm.messaging)

先定義兩個接口和一個類
TaskMessage類自己比較好理解, 抽象storm的message格式
對於IContext, 註釋也說了, 定義messaging plugin, 經過什麼渠道去發送message, storm這裏設計成可替換的
默認定義storm實現了local和ZMQ兩種plugin, 固然你能夠實現更多的
local應該是用於local mode, 而ZMQ用於distributed modehtml

IContext接口主要是用於建立IConnection, 體現對socket的管理, 分別經過bind和connect定義服務器端和客戶端的connection 
IConnection接口主要用於定義, 真正收發message的邏輯服務器

最終經過TransportFactory, 根據Config.STORM_MESSAGING_TRANSPORT的配置, 利用Java的reflection動態的建立不一樣類型的context閉包

IContext接口

/**
 * This interface needs to be implemented for messaging plugin. 
 * 
 * Messaging plugin is specified via Storm config parameter, storm.messaging.transport.
 * 
 * A messaging plugin should have a default constructor and implements IContext interface.
 * Upon construction, we will invoke IContext::prepare(storm_conf) to enable context to be configured
 * according to storm configuration. 
 */
public interface IContext {
    /**
     * This method is invoked at the startup of messaging plugin
     * @param storm_conf storm configuration
     */
    public void prepare(Map storm_conf);
    
    /**
     * This method is invoked when a worker is unload a messaging plugin
     */
    public void term();

    /**
     * This method establishes a server side connection 
     * @param storm_id topology ID
     * @param port port #
     * @return server side connection
     */
    public IConnection bind(String storm_id, int port);
    
    /**
     * This method establish a client side connection to a remote server
     * @param storm_id topology ID
     * @param host remote host
     * @param port remote port
     * @return client side connection
     */
    public IConnection connect(String storm_id, String host, int port);
};

IConnection接口

public interface IConnection {   
    /**
     * receive a message (consists taskId and payload)
     * @param flags 0: block, 1: non-block
     * @return
     */
    public TaskMessage recv(int flags);
    /**
     * send a message with taskId and payload
     * @param taskId task ID
     * @param payload
     */
    public void send(int taskId,  byte[] payload);
    
    /**
     * close this connection
     */
    public void close();
}

TaskMessage

TaskMessage如其名, 包含task和message字段, 以說明發送給哪一個task的message
而且定義了序列化和反序列化的函數app

public class TaskMessage {
    private int _task;
    private byte[] _message;
    
    public TaskMessage(int task, byte[] message) {
        _task = task;
        _message = message;
    }
    
    public int task() {
        return _task;
    }

    public byte[] message() {
        return _message;
    }
    
    public ByteBuffer serialize() {
        ByteBuffer bb = ByteBuffer.allocate(_message.length+2);
        bb.putShort((short)_task);
        bb.put(_message);
        return bb;
    }
    
    public void deserialize(ByteBuffer packet) {
        if (packet==null) return;
        _task = packet.getShort();
        _message = new byte[packet.limit()-2];
        packet.get(_message);
    }
}

TransportFactory

public class TransportFactory {    
    public static IContext makeContext(Map storm_conf) {
        //get factory class name
        String transport_plugin_klassName = (String)storm_conf.get(Config.STORM_MESSAGING_TRANSPORT);
        LOG.info("Storm peer transport plugin:"+transport_plugin_klassName);

        IContext transport = null;
        try {
            //create a factory class
            Class klass = Class.forName(transport_plugin_klassName);
            //obtain a context object
            Object obj = klass.newInstance();
            if (obj instanceof IContext) {
                //case 1: plugin is a IContext class
                transport = (IContext)obj;
                //initialize with storm configuration
                transport.prepare(storm_conf);
            } else {
                //case 2: Non-IContext plugin must have a makeContext(storm_conf) method that returns IContext object
                Method method = klass.getMethod("makeContext", Map.class);
                LOG.debug("object:"+obj+" method:"+method);
                transport = (IContext) method.invoke(obj, storm_conf);
            }
        } catch(Exception e) {
            throw new RuntimeException("Fail to construct messaging plugin from plugin "+transport_plugin_klassName, e);
        } 
        return transport;
    }
}

 

能夠詳細看看local和ZMQ的plugin的實現socket

Local

在local模式下使用的message plugin
實現比較簡單, 全部都基於queues-map來實現, 這裏的queue直接使用LinkedBlockingQueue, 由於local用於測試, 不用考慮高效性
全部的接收隊列或發送隊列都經過add-queue!加到queues-map裏面(stormid+port做爲key)
那麼全部的recv和send, 都是基於queue的操做ide

(defn add-queue! [queues-map lock storm-id port]
  (let [id (str storm-id "-" port)]
    (locking lock
      (when-not (contains? @queues-map id)
        (swap! queues-map assoc id (LinkedBlockingQueue.))))
    (@queues-map id)))

(deftype LocalConnection [storm-id port queues-map lock queue]
  IConnection
  (^TaskMessage recv [this ^int flags]
    (when-not queue
      (throw (IllegalArgumentException. "Cannot receive on this socket")))
    (if (= flags 1)
      (.poll queue)
      (.take queue)))
  (^void send [this ^int taskId ^bytes payload]
    (let [send-queue (add-queue! queues-map lock storm-id port)]
      (.put send-queue (TaskMessage. taskId payload))
      ))
  (^void close [this]
    ))

(deftype LocalContext [^{:unsynchronized-mutable true} queues-map
                       ^{:unsynchronized-mutable true} lock]
  IContext
  (^void prepare [this ^Map storm-conf]
    (set! queues-map (atom {}))
    (set! lock (Object.)))
  (^IConnection bind [this ^String storm-id ^int port]
    (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port)))
  (^IConnection connect [this ^String storm-id ^String host ^int port]
    (LocalConnection. storm-id port queues-map lock nil))
  (^void term [this]
    ))

這裏使用Deftype, 而不是Defrecord, 即connection和context自己不須要對字典的支持
而且在IContext的實現中, 使用到了可變field, 聽說是比較難用對的高級特性
我我的的理解, 是由於deftype和defrecord同樣, 沒有閉包的效果, 而只有field(對象成員)能夠隨時被接口函數訪問, 因此有些場景下須要field的mutable, 好比這裏的queues-map
以前相似的場景都是用reify實現的, 這裏給出用deftype實現的版本
函數

ZMQ

號稱最快的消息隊列, 接近socket API 的性能, 參考http://www.cnblogs.com/yjf512/archive/2012/03/03/2378024.html
在distributed mode時, storm使用ZMQ做爲進程間和instrance間通訊性能

(deftype ZMQConnection [socket]
  IConnection
  (^TaskMessage recv [this ^int flags]
    (require 'backtype.storm.messaging.zmq)
    (if-let [packet (mq/recv socket flags)]
      (parse-packet packet)))
  (^void send [this ^int taskId ^bytes payload]
    (require 'backtype.storm.messaging.zmq)
    (mq/send socket (mk-packet taskId payload) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
  (^void close [this]
    (.close socket)))

(deftype ZMQContext [^{:unsynchronized-mutable true} context 
                     ^{:unsynchronized-mutable true} linger-ms 
                     ^{:unsynchronized-mutable true} hwm 
                     ^{:unsynchronized-mutable true} local?]
  IContext
  (^void prepare [this ^Map storm-conf]
    (let [num-threads (storm-conf ZMQ-THREADS)]
      (set! context (mq/context num-threads)) 
      (set! linger-ms (storm-conf ZMQ-LINGER-MILLIS))
      (set! hwm (storm-conf ZMQ-HWM))
      (set! local? (= (storm-conf STORM-CLUSTER-MODE) "local"))))
  (^IConnection bind [this ^String storm-id ^int port]
    (require 'backtype.storm.messaging.zmq)
    (-> context
      (mq/socket mq/pull)
      (mq/set-hwm hwm)
      (mq/bind (get-bind-zmq-url local? port))
      mk-connection
      ))
  (^IConnection connect [this ^String storm-id ^String host ^int port]
    (require 'backtype.storm.messaging.zmq)
    (-> context
      (mq/socket mq/push)
      (mq/set-hwm hwm)
      (mq/set-linger linger-ms)
      (mq/connect (get-connect-zmq-url local? host port))
      mk-connection))
  (^void term [this]
    (.term context))
  
  ZMQContextQuery
  (zmq-context [this]
    context))
相關文章
相關標籤/搜索