本文主要研究一下storm的JoinBolthtml
@Test public void testJoinBolt() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("uuid-spout", new RandomWordSpout(new String[]{"uuid", "timestamp"}), 1); builder.setSpout("word-spout", new RandomWordSpout(new String[]{"word", "timestamp"}), 1); JoinBolt joinBolt = new JoinBolt("uuid-spout", "timestamp") //from priorStream inner join newStream on newStream.field = priorStream.field1 .join("word-spout", "timestamp", "uuid-spout") .select("uuid,word,timestamp") .withTumblingWindow(BaseWindowedBolt.Count.of(10)); builder.setBolt("join", joinBolt,1) .fieldsGrouping("uuid-spout",new Fields("timestamp")) .fieldsGrouping("word-spout",new Fields("timestamp")); builder.setBolt("fileWriter",new FilePrinterBolt(),1).globalGrouping("join"); SubmitHelper.submitRemote("windowTopology",builder.createTopology()); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javajava
public class JoinBolt extends BaseWindowedBolt { protected final Selector selectorType; // Map[StreamName -> JoinInfo] protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>(); protected FieldSelector[] outputFields; // specified via bolt.select() ... used in declaring Output fields // protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields protected String outputStreamName; // Map[StreamName -> Map[Key -> List<Tuple>] ] HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs = new HashMap<>(); // holds remaining streams private OutputCollector collector; /** * Calls JoinBolt(Selector.SOURCE, sourceId, fieldName) * * @param sourceId Id of source component (spout/bolt) from which this bolt is receiving data * @param fieldName the field to use for joining the stream (x.y.z format) */ public JoinBolt(String sourceId, String fieldName) { this(Selector.SOURCE, sourceId, fieldName); } /** * Introduces the first stream to start the join with. Equivalent SQL ... select .... from srcOrStreamId ... * * @param type Specifies whether 'srcOrStreamId' refers to stream name/source component * @param srcOrStreamId name of stream OR source component * @param fieldName the field to use for joining the stream (x.y.z format) */ public JoinBolt(Selector type, String srcOrStreamId, String fieldName) { selectorType = type; joinCriteria.put(srcOrStreamId, new JoinInfo(new FieldSelector(srcOrStreamId, fieldName))); } /** * Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on 'default' stream. */ public JoinBolt withOutputStream(String streamName) { this.outputStreamName = streamName; return this; } /** * Performs inner Join with the newStream. SQL : from priorStream inner join newStream on newStream.field = priorStream.field1 same * as: new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream); * * Note: priorStream must be previously joined. Valid ex: new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2); Invalid ex: * new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1); * * @param newStream Either stream name or name of upstream component * @param field the field on which to perform the join */ public JoinBolt join(String newStream, String field, String priorStream) { return joinCommon(newStream, field, priorStream, JoinType.INNER); } /** * Performs left Join with the newStream. SQL : from stream1 left join stream2 on stream2.field = stream1.field1 same as: new * WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1); * * Note: priorStream must be previously joined Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2); * Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1); * * @param newStream Either a name of a stream or an upstream component * @param field the field on which to perform the join */ public JoinBolt leftJoin(String newStream, String field, String priorStream) { return joinCommon(newStream, field, priorStream, JoinType.LEFT); } private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) { if (hashedInputs.containsKey(newStream)) { throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once."); } hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>()); JoinInfo joinInfo = joinCriteria.get(priorStream); if (joinInfo == null) { throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared"); } FieldSelector field = new FieldSelector(newStream, fieldDescriptor); joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType)); return this; } /** * Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select("field1, stream2:field2, field3") Nested * Key names are supported for nested types: e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)" Inner * types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the * output fieldNames for the bolt based. * * @param commaSeparatedKeys * @return */ public JoinBolt select(String commaSeparatedKeys) { String[] fieldNames = commaSeparatedKeys.split(","); outputFields = new FieldSelector[fieldNames.length]; for (int i = 0; i < fieldNames.length; i++) { outputFields[i] = new FieldSelector(fieldNames[i]); } return this; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { String[] outputFieldNames = new String[outputFields.length]; for (int i = 0; i < outputFields.length; ++i) { outputFieldNames[i] = outputFields[i].getOutputName(); } if (outputStreamName != null) { declarer.declareStream(outputStreamName, new Fields(outputFieldNames)); } else { declarer.declare(new Fields(outputFieldNames)); } } @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; // initialize the hashedInputs data structure int i = 0; for (String stream : joinCriteria.keySet()) { if (i > 0) { hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>()); } ++i; } if (outputFields == null) { throw new IllegalArgumentException("Must specify output fields via .select() method."); } } @Override public void execute(TupleWindow inputWindow) { // 1) Perform Join List<Tuple> currentWindow = inputWindow.get(); JoinAccumulator joinResult = hashJoin(currentWindow); // 2) Emit results for (ResultRecord resultRecord : joinResult.getRecords()) { ArrayList<Object> outputTuple = resultRecord.getOutputFields(); if (outputStreamName == null) { // explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all // tuples in window collector.emit(resultRecord.tupleList, outputTuple); } else { // explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples // in window collector.emit(outputStreamName, resultRecord.tupleList, outputTuple); } } } //...... }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaapache
protected JoinAccumulator hashJoin(List<Tuple> tuples) { clearHashedInputs(); JoinAccumulator probe = new JoinAccumulator(); // 1) Build phase - Segregate tuples in the Window into streams. // First stream's tuples go into probe, rest into HashMaps in hashedInputs String firstStream = joinCriteria.keySet().iterator().next(); for (Tuple tuple : tuples) { String streamId = getStreamSelector(tuple); if (!streamId.equals(firstStream)) { Object field = getJoinField(streamId, tuple); ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field); if (recs == null) { recs = new ArrayList<Tuple>(); hashedInputs.get(streamId).put(field, recs); } recs.add(tuple); } else { ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1); probe.insert(probeRecord); // first stream's data goes into the probe } } // 2) Join the streams in order of streamJoinOrder int i = 0; for (String streamName : joinCriteria.keySet()) { boolean finalJoin = (i == joinCriteria.size() - 1); if (i > 0) { probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin); } ++i; } return probe; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaapp
protected class JoinAccumulator { ArrayList<ResultRecord> records = new ArrayList<>(); public void insert(ResultRecord tuple) { records.add(tuple); } public Collection<ResultRecord> getRecords() { return records; } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaless
// Join helper to concat fields to the record protected class ResultRecord { ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined ArrayList<Object> outFields = null; // refs to fields that will be part of output fields // 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined public ResultRecord(Tuple tuple, boolean generateOutputFields) { tupleList.add(tuple); if (generateOutputFields) { outFields = doProjection(tupleList, outputFields); } } public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) { if (lhs != null) { tupleList.addAll(lhs.tupleList); } if (rhs != null) { tupleList.add(rhs); } if (generateOutputFields) { outFields = doProjection(tupleList, outputFields); } } public ArrayList<Object> getOutputFields() { return outFields; } // 'stream' cannot be null, public Object getField(FieldSelector fieldSelector) { for (Tuple tuple : tupleList) { Object result = lookupField(fieldSelector, tuple); if (result != null) { return result; } } return null; } } // Performs projection on the tuples based on 'projectionFields' protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) { ArrayList<Object> result = new ArrayList<>(projectionFields.length); // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples for (int i = 0; i < projectionFields.length; i++) { boolean missingField = true; for (Tuple tuple : tuples) { Object field = lookupField(projectionFields[i], tuple); if (field != null) { result.add(field); missingField = false; break; } } if (missingField) { // add a null for missing fields (usually in case of outer joins) result.add(null); } } return result; } // Extract the field from tuple. Field may be nested field (x.y.z) protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) { // very stream name matches, it stream name was specified if (fieldSelector.streamName != null && !fieldSelector.streamName.equalsIgnoreCase(getStreamSelector(tuple))) { return null; } Object curr = null; for (int i = 0; i < fieldSelector.field.length; i++) { if (i == 0) { if (tuple.contains(fieldSelector.field[i])) { curr = tuple.getValueByField(fieldSelector.field[i]); } else { return null; } } else { curr = ((Map) curr).get(fieldSelector.field[i]); if (curr == null) { return null; } } } return curr; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javadom
// Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) { final JoinType joinType = joinInfo.getJoinType(); switch (joinType) { case INNER: return doInnerJoin(probe, buildInput, joinInfo, finalJoin); case LEFT: return doLeftJoin(probe, buildInput, joinInfo, finalJoin); case RIGHT: case OUTER: default: throw new RuntimeException("Unsupported join type : " + joinType.name()); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javajvm
// inner join - core implementation protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) { String[] probeKeyName = joinInfo.getOtherField(); JoinAccumulator result = new JoinAccumulator(); FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName); for (ResultRecord rec : probe.getRecords()) { Object probeKey = rec.getField(fieldSelector); if (probeKey != null) { ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); if (matchingBuildRecs != null) { for (Tuple matchingRec : matchingBuildRecs) { ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin); result.insert(mergedRecord); } } } } return result; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.javaide
// left join - core implementation protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) { String[] probeKeyName = joinInfo.getOtherField(); JoinAccumulator result = new JoinAccumulator(); FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName); for (ResultRecord rec : probe.getRecords()) { Object probeKey = rec.getField(fieldSelector); if (probeKey != null) { ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null if (matchingBuildRecs != null && !matchingBuildRecs.isEmpty()) { for (Tuple matchingRec : matchingBuildRecs) { ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin); result.insert(mergedRecord); } } else { ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin); result.insert(mergedRecord); } } } return result; }
因爲JoinBolt是在內存進行操做,又須要匹配數據,須要消耗CPU及內存,有幾個點須要注意一下:oop
設置Config.TOPOLOGY_ACKER_EXECUTORS爲0
)