聊聊storm的JoinBolt

本文主要研究一下storm的JoinBolthtml

實例

@Test
    public void testJoinBolt() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("uuid-spout", new RandomWordSpout(new String[]{"uuid", "timestamp"}), 1);
        builder.setSpout("word-spout", new RandomWordSpout(new String[]{"word", "timestamp"}), 1);

        JoinBolt joinBolt = new JoinBolt("uuid-spout", "timestamp")
                //from priorStream inner join newStream on newStream.field = priorStream.field1
                .join("word-spout", "timestamp", "uuid-spout")
                .select("uuid,word,timestamp")
                .withTumblingWindow(BaseWindowedBolt.Count.of(10));
        builder.setBolt("join", joinBolt,1)
                .fieldsGrouping("uuid-spout",new Fields("timestamp"))
                .fieldsGrouping("word-spout",new Fields("timestamp"));

        builder.setBolt("fileWriter",new FilePrinterBolt(),1).globalGrouping("join");
        SubmitHelper.submitRemote("windowTopology",builder.createTopology());
    }

JoinBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javajava

public class JoinBolt extends BaseWindowedBolt {

    protected final Selector selectorType;
    // Map[StreamName -> JoinInfo]
    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
    protected FieldSelector[] outputFields;  // specified via bolt.select() ... used in declaring Output fields
    //    protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
    protected String outputStreamName;
    // Map[StreamName -> Map[Key -> List<Tuple>]  ]
    HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs = new HashMap<>(); // holds remaining streams
    private OutputCollector collector;

    /**
     * Calls  JoinBolt(Selector.SOURCE, sourceId, fieldName)
     *
     * @param sourceId  Id of source component (spout/bolt) from which this bolt is receiving data
     * @param fieldName the field to use for joining the stream (x.y.z format)
     */
    public JoinBolt(String sourceId, String fieldName) {
        this(Selector.SOURCE, sourceId, fieldName);
    }


    /**
     * Introduces the first stream to start the join with. Equivalent SQL ... select .... from srcOrStreamId ...
     *
     * @param type          Specifies whether 'srcOrStreamId' refers to stream name/source component
     * @param srcOrStreamId name of stream OR source component
     * @param fieldName     the field to use for joining the stream (x.y.z format)
     */
    public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
        selectorType = type;

        joinCriteria.put(srcOrStreamId, new JoinInfo(new FieldSelector(srcOrStreamId, fieldName)));
    }

    /**
     * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on 'default' stream.
     */
    public JoinBolt withOutputStream(String streamName) {
        this.outputStreamName = streamName;
        return this;
    }

    /**
     * Performs inner Join with the newStream. SQL    :   from priorStream inner join newStream on newStream.field = priorStream.field1 same
     * as:   new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
     *
     * Note: priorStream must be previously joined. Valid ex:    new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2); Invalid ex:
     * new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
     *
     * @param newStream Either stream name or name of upstream component
     * @param field     the field on which to perform the join
     */
    public JoinBolt join(String newStream, String field, String priorStream) {
        return joinCommon(newStream, field, priorStream, JoinType.INNER);
    }

    /**
     * Performs left Join with the newStream. SQL    :   from stream1  left join stream2  on stream2.field = stream1.field1 same as:   new
     * WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
     *
     * Note: priorStream must be previously joined Valid ex:    new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
     * Invalid ex:  new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
     *
     * @param newStream Either a name of a stream or an upstream component
     * @param field     the field on which to perform the join
     */
    public JoinBolt leftJoin(String newStream, String field, String priorStream) {
        return joinCommon(newStream, field, priorStream, JoinType.LEFT);
    }

    private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
        if (hashedInputs.containsKey(newStream)) {
            throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
        }
        hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
        JoinInfo joinInfo = joinCriteria.get(priorStream);
        if (joinInfo == null) {
            throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
        }

        FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
        joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType));
        return this;
    }

    /**
     * Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select("field1, stream2:field2, field3") Nested
     * Key names are supported for nested types: e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)" Inner
     * types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the
     * output fieldNames for the bolt based.
     *
     * @param commaSeparatedKeys
     * @return
     */
    public JoinBolt select(String commaSeparatedKeys) {
        String[] fieldNames = commaSeparatedKeys.split(",");

        outputFields = new FieldSelector[fieldNames.length];
        for (int i = 0; i < fieldNames.length; i++) {
            outputFields[i] = new FieldSelector(fieldNames[i]);
        }
        return this;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String[] outputFieldNames = new String[outputFields.length];
        for (int i = 0; i < outputFields.length; ++i) {
            outputFieldNames[i] = outputFields[i].getOutputName();
        }
        if (outputStreamName != null) {
            declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
        } else {
            declarer.declare(new Fields(outputFieldNames));
        }
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        // initialize the hashedInputs data structure
        int i = 0;
        for (String stream : joinCriteria.keySet()) {
            if (i > 0) {
                hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
            }
            ++i;
        }
        if (outputFields == null) {
            throw new IllegalArgumentException("Must specify output fields via .select() method.");
        }
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        // 1) Perform Join
        List<Tuple> currentWindow = inputWindow.get();
        JoinAccumulator joinResult = hashJoin(currentWindow);

        // 2) Emit results
        for (ResultRecord resultRecord : joinResult.getRecords()) {
            ArrayList<Object> outputTuple = resultRecord.getOutputFields();
            if (outputStreamName == null) {
                // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all
                // tuples in window
                collector.emit(resultRecord.tupleList, outputTuple);
            } else {
                // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples
                // in window
                collector.emit(outputStreamName, resultRecord.tupleList, outputTuple);
            }
        }
    }

    //......
}
  • JoinBolt繼承了BaseWindowedBolt,定義了Selector selectorType、LinkedHashMap<String, JoinInfo> joinCriteria、FieldSelector[] outputFields等屬性,用於記錄關聯類型及關聯關係
  • join、leftJoin方法用於設置join關聯關係,最後都是調用joinCommon方法,關聯關係使用JoinInfo對象,存儲在joinCriteria中
  • select方法用於選擇結果集的列,最後設置到outputFields,用於declareOutputFields
  • execute就是join的核心邏輯了,這裏調用了hashJoin

JoinBolt.hashJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaapache

protected JoinAccumulator hashJoin(List<Tuple> tuples) {
        clearHashedInputs();

        JoinAccumulator probe = new JoinAccumulator();

        // 1) Build phase - Segregate tuples in the Window into streams.
        //    First stream's tuples go into probe, rest into HashMaps in hashedInputs
        String firstStream = joinCriteria.keySet().iterator().next();
        for (Tuple tuple : tuples) {
            String streamId = getStreamSelector(tuple);
            if (!streamId.equals(firstStream)) {
                Object field = getJoinField(streamId, tuple);
                ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
                if (recs == null) {
                    recs = new ArrayList<Tuple>();
                    hashedInputs.get(streamId).put(field, recs);
                }
                recs.add(tuple);

            } else {
                ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
                probe.insert(probeRecord);  // first stream's data goes into the probe
            }
        }

        // 2) Join the streams in order of streamJoinOrder
        int i = 0;
        for (String streamName : joinCriteria.keySet()) {
            boolean finalJoin = (i == joinCriteria.size() - 1);
            if (i > 0) {
                probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
            }
            ++i;
        }


        return probe;
    }
  • hashJoin方法先遍歷一下tuples,把tuples分爲兩類,firstStream的數據存到JoinAccumulator probe中,其他的存到HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs
  • 以後對剩餘的streamId,挨個遍歷調用doJoin,把結果整合到JoinAccumulator probe

JoinAccumulator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaapp

protected class JoinAccumulator {
        ArrayList<ResultRecord> records = new ArrayList<>();

        public void insert(ResultRecord tuple) {
            records.add(tuple);
        }

        public Collection<ResultRecord> getRecords() {
            return records;
        }
    }
  • JoinAccumulator就是一個ArrayList<ResultRecord>

ResultRecord

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaless

// Join helper to concat fields to the record
    protected class ResultRecord {

        ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
        ArrayList<Object> outFields = null; // refs to fields that will be part of output fields

        // 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined
        public ResultRecord(Tuple tuple, boolean generateOutputFields) {
            tupleList.add(tuple);
            if (generateOutputFields) {
                outFields = doProjection(tupleList, outputFields);
            }
        }

        public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
            if (lhs != null) {
                tupleList.addAll(lhs.tupleList);
            }
            if (rhs != null) {
                tupleList.add(rhs);
            }
            if (generateOutputFields) {
                outFields = doProjection(tupleList, outputFields);
            }
        }

        public ArrayList<Object> getOutputFields() {
            return outFields;
        }


        // 'stream' cannot be null,
        public Object getField(FieldSelector fieldSelector) {
            for (Tuple tuple : tupleList) {
                Object result = lookupField(fieldSelector, tuple);
                if (result != null) {
                    return result;
                }
            }
            return null;
        }
    }

    // Performs projection on the tuples based on 'projectionFields'
    protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
        ArrayList<Object> result = new ArrayList<>(projectionFields.length);
        // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples
        for (int i = 0; i < projectionFields.length; i++) {
            boolean missingField = true;
            for (Tuple tuple : tuples) {
                Object field = lookupField(projectionFields[i], tuple);
                if (field != null) {
                    result.add(field);
                    missingField = false;
                    break;
                }
            }
            if (missingField) { // add a null for missing fields (usually in case of outer joins)
                result.add(null);
            }
        }
        return result;
    }

    // Extract the field from tuple. Field may be nested field (x.y.z)
    protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {

        // very stream name matches, it stream name was specified
        if (fieldSelector.streamName != null &&
            !fieldSelector.streamName.equalsIgnoreCase(getStreamSelector(tuple))) {
            return null;
        }

        Object curr = null;
        for (int i = 0; i < fieldSelector.field.length; i++) {
            if (i == 0) {
                if (tuple.contains(fieldSelector.field[i])) {
                    curr = tuple.getValueByField(fieldSelector.field[i]);
                } else {
                    return null;
                }
            } else {
                curr = ((Map) curr).get(fieldSelector.field[i]);
                if (curr == null) {
                    return null;
                }
            }
        }
        return curr;
    }
  • ResultRecord用於存儲joined以後的數據
  • 當joinCriteria.size() == 1或者finalJoin爲true的時候,ResultRecord的generateOutputFields爲true,會調用doProjection對結果集進行projection操做
  • 當遍歷joinCriteria調用doJoin的時候,遍歷到最後一條記錄時爲true

JoinBolt.doJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javadom

// Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
    protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
                                     boolean finalJoin) {
        final JoinType joinType = joinInfo.getJoinType();
        switch (joinType) {
            case INNER:
                return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
            case LEFT:
                return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
            case RIGHT:
            case OUTER:
            default:
                throw new RuntimeException("Unsupported join type : " + joinType.name());
        }
    }
  • doJoin封裝了各類join類型的方法,目前僅僅實現了INNER以及LEFT,分別調用doInnerJoin、doLeftJoin方法

doInnerJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javajvm

// inner join - core implementation
    protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
                                          boolean finalJoin) {
        String[] probeKeyName = joinInfo.getOtherField();
        JoinAccumulator result = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
        for (ResultRecord rec : probe.getRecords()) {
            Object probeKey = rec.getField(fieldSelector);
            if (probeKey != null) {
                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
                if (matchingBuildRecs != null) {
                    for (Tuple matchingRec : matchingBuildRecs) {
                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
                        result.insert(mergedRecord);
                    }
                }
            }
        }
        return result;
    }
  • 這裏挨個對JoinAccumulator probe的records遍歷,而後經過probeKey從buildInput尋找對應的records,若是有找到則進行合併

doLeftJoin

storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaide

// left join - core implementation
    protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo,
                                         boolean finalJoin) {
        String[] probeKeyName = joinInfo.getOtherField();
        JoinAccumulator result = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
        for (ResultRecord rec : probe.getRecords()) {
            Object probeKey = rec.getField(fieldSelector);
            if (probeKey != null) {
                ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
                if (matchingBuildRecs != null && !matchingBuildRecs.isEmpty()) {
                    for (Tuple matchingRec : matchingBuildRecs) {
                        ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
                        result.insert(mergedRecord);
                    }
                } else {
                    ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
                    result.insert(mergedRecord);
                }

            }
        }
        return result;
    }
  • left join與inner join的區別就在於沒有找到匹配記錄的話,仍舊保留左邊的記錄

小結

  • JoinBolt繼承了BaseWindowedBolt,目前僅僅支持inner join及left join,並且要求join的字段與fieldsGrouping的字段相同
  • JoinBolt對於多個stream數據的合併,使用分治的方式實現,採用JoinAccumulator不斷累加結果集,循環遍歷調用doJoin來完成
  • 因爲JoinBolt是在內存進行操做,又須要匹配數據,須要消耗CPU及內存,有幾個點須要注意一下:oop

    • window的時間窗口不宜過大,不然內存堆積的數據過多,容易OOM,可根據狀況調整時間窗口或者經過Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB設置woker的內存大小
    • 採起slding window會形成數據重複join,於是須要使用withTumblingWindow
    • 若是開啓tuple處理超時,則要求Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS大於windowLength + slidingInterval + 處理時間,避免尚未處理完就誤判爲超時從新replayed
    • 因爲windowedBolt會自動對tupleWindow的數據進行anchor,數據量過多anchor操做會給整個topology形成壓力,如無必要能夠關閉ack(設置Config.TOPOLOGY_ACKER_EXECUTORS爲0)
    • Config.TOPOLOGY_MAX_SPOUT_PENDING要設置的大一點,給window的join操做及後續操做足夠的時間,在必定程度上避免spout發送tuple速度過快,下游bolt消費不過來
    • 生產上Config.TOPOLOGY_DEBUG設置爲false關閉debug日誌,Config.TOPOLOGY_EVENTLOGGER_EXECUTORS設置爲0關閉event logger

doc

相關文章
相關標籤/搜索