本文主要研究一下storm tuple的序列化html
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.javajava
// Every executor has an instance of this class public class ExecutorTransfer { private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class); private final WorkerState workerData; private final KryoTupleSerializer serializer; private final boolean isDebug; private int indexingBase = 0; private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker private AtomicReferenceArray<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) { this.workerData = workerData; this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext()); this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false); } //...... // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null) public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) { if (isDebug) { LOG.info("TRANSFERRING tuple {}", addressedTuple); } JCQueue localQueue = getLocalQueue(addressedTuple); if (localQueue != null) { return tryTransferLocal(addressedTuple, localQueue, pendingEmits); } return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer); } //...... }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.javaapache
/* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */ public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) { return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.javasegmentfault
/* Not a Blocking call. If cannot emit, will add 'tuple' to 'pendingEmits' and return 'false'. 'pendingEmits' can be null */ public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) { if (pendingEmits != null && !pendingEmits.isEmpty()) { pendingEmits.add(addressedTuple); return false; } if (!remoteBackPressureStatus[addressedTuple.dest].get()) { TaskMessage tm = new TaskMessage(addressedTuple.getDest(), serializer.serialize(addressedTuple.getTuple())); if (transferQueue.tryPublish(tm)) { return true; } } else { LOG.debug("Noticed Back Pressure in remote task {}", addressedTuple.dest); } if (pendingEmits != null) { pendingEmits.add(addressedTuple); } return false; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.javaapi
public class KryoTupleSerializer implements ITupleSerializer { KryoValuesSerializer _kryo; SerializationFactory.IdDictionary _ids; Output _kryoOut; public KryoTupleSerializer(final Map<String, Object> conf, final GeneralTopologyContext context) { _kryo = new KryoValuesSerializer(conf); _kryoOut = new Output(2000, 2000000000); _ids = new SerializationFactory.IdDictionary(context.getRawTopology()); } public byte[] serialize(Tuple tuple) { try { _kryoOut.clear(); _kryoOut.writeInt(tuple.getSourceTask(), true); _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true); tuple.getMessageId().serialize(_kryoOut); _kryo.serializeInto(tuple.getValues(), _kryoOut); return _kryoOut.toBytes(); } catch (IOException e) { throw new RuntimeException(e); } } // public long crc32(Tuple tuple) { // try { // CRC32OutputStream hasher = new CRC32OutputStream(); // _kryo.serializeInto(tuple.getValues(), hasher); // return hasher.getValue(); // } catch (IOException e) { // throw new RuntimeException(e); // } // } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.javaapp
public class KryoValuesSerializer { Kryo _kryo; ListDelegate _delegate; Output _kryoOut; public KryoValuesSerializer(Map<String, Object> conf) { _kryo = SerializationFactory.getKryo(conf); _delegate = new ListDelegate(); _kryoOut = new Output(2000, 2000000000); } public void serializeInto(List<Object> values, Output out) { // this ensures that list of values is always written the same way, regardless // of whether it's a java collection or one of clojure's persistent collections // (which have different serializers) // Doing this lets us deserialize as ArrayList and avoid writing the class here _delegate.setDelegate(values); _kryo.writeObject(out, _delegate); } public byte[] serialize(List<Object> values) { _kryoOut.clear(); serializeInto(values, _kryoOut); return _kryoOut.toBytes(); } public byte[] serializeObject(Object obj) { _kryoOut.clear(); _kryo.writeClassAndObject(_kryoOut, obj); return _kryoOut.toBytes(); } }
即用它來包裝一下List<Object> values
),_kryoOut爲new Output(2000, 2000000000)storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.javaless
public static Kryo getKryo(Map<String, Object> conf) { IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY)); Kryo k = kryoFactory.getKryo(conf); k.register(byte[].class); /* tuple payload serializer is specified via configuration */ String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER); try { Class serializerClass = Class.forName(payloadSerializerName); Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf); k.register(ListDelegate.class, serializer); } catch (ClassNotFoundException ex) { throw new RuntimeException(ex); } k.register(ArrayList.class, new ArrayListSerializer()); k.register(HashMap.class, new HashMapSerializer()); k.register(HashSet.class, new HashSetSerializer()); k.register(BigInteger.class, new BigIntegerSerializer()); k.register(TransactionAttempt.class); k.register(Values.class); k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class); k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class); k.register(ConsList.class); k.register(BackPressureStatus.class); synchronized (loader) { for (SerializationRegister sr : loader) { try { sr.register(k); } catch (Exception e) { throw new RuntimeException(e); } } } kryoFactory.preRegister(k, conf); boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS); register(k, conf.get(Config.TOPOLOGY_KRYO_REGISTER), conf, skipMissing); kryoFactory.postRegister(k, conf); if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) { for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) { try { Class klass = Class.forName(klassName); IKryoDecorator decorator = (IKryoDecorator) klass.newInstance(); decorator.decorate(k); } catch (ClassNotFoundException e) { if (skipMissing) { LOG.info("Could not find kryo decorator named " + klassName + ". Skipping registration..."); } else { throw new RuntimeException(e); } } catch (InstantiationException e) { throw new RuntimeException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } } kryoFactory.postDecorate(k, conf); return k; } public static void register(Kryo k, Object kryoRegistrations, Map<String, Object> conf, boolean skipMissing) { Map<String, String> registrations = normalizeKryoRegister(kryoRegistrations); for (Map.Entry<String, String> entry : registrations.entrySet()) { String serializerClassName = entry.getValue(); try { Class klass = Class.forName(entry.getKey()); Class serializerClass = null; if (serializerClassName != null) { serializerClass = Class.forName(serializerClassName); } if (serializerClass == null) { k.register(klass); } else { k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf)); } } catch (ClassNotFoundException e) { if (skipMissing) { LOG.info("Could not find serialization or class for " + serializerClassName + ". Skipping registration..."); } else { throw new RuntimeException(e); } } } }
topology.tuple.serializer,默認是org.apache.storm.serialization.types.ListDelegateSerializer
)配置的類進行序列化topology.skip.missing.kryo.registrations,默認爲false
),當kryo找不到配置的要序列化的class對應serializers的時候,是拋出異常仍是直接跳過註冊;topology.kryo.decorators
)加載自定義的serializationstorm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.javajvm
public class DefaultKryoFactory implements IKryoFactory { @Override public Kryo getKryo(Map<String, Object> conf) { KryoSerializableDefault k = new KryoSerializableDefault(); k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION))); k.setReferences(false); return k; } @Override public void preRegister(Kryo k, Map<String, Object> conf) { } public void postRegister(Kryo k, Map<String, Object> conf) { ((KryoSerializableDefault) k).overrideDefault(true); } @Override public void postDecorate(Kryo k, Map<String, Object> conf) { } public static class KryoSerializableDefault extends Kryo { boolean _override = false; public void overrideDefault(boolean value) { _override = value; } @Override public Serializer getDefaultSerializer(Class type) { if (_override) { return new SerializableSerializer(); } else { return super.getDefaultSerializer(type); } } } }
topology.fall.back.on.java.serialization
),默認該值爲true,則registrationRequired這裏設置爲false,即序列化的時候不要求該class必須在已註冊的列表中kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/Kryo.javaide
/** If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered * using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}. * @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true. * @see ClassResolver#getRegistration(Class) */ public Registration getRegistration (Class type) { if (type == null) throw new IllegalArgumentException("type cannot be null."); Registration registration = classResolver.getRegistration(type); if (registration == null) { if (Proxy.isProxyClass(type)) { // If a Proxy class, treat it like an InvocationHandler because the concrete class for a proxy is generated. registration = getRegistration(InvocationHandler.class); } else if (!type.isEnum() && Enum.class.isAssignableFrom(type) && !Enum.class.equals(type)) { // This handles an enum value that is an inner class. Eg: enum A {b{}}; registration = getRegistration(type.getEnclosingClass()); } else if (EnumSet.class.isAssignableFrom(type)) { registration = classResolver.getRegistration(EnumSet.class); } else if (isClosure(type)) { registration = classResolver.getRegistration(ClosureSerializer.Closure.class); } if (registration == null) { if (registrationRequired) { throw new IllegalArgumentException(unregisteredClassMessage(type)); } if (warnUnregisteredClasses) { warn(unregisteredClassMessage(type)); } registration = classResolver.registerImplicit(type); } } return registration; } /** Registers the class using the lowest, next available integer ID and the {@link Kryo#getDefaultSerializer(Class) default * serializer}. If the class is already registered, no change will be made and the existing registration will be returned. * Registering a primitive also affects the corresponding primitive wrapper. * <p> * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when * using this method. The order must be the same at deserialization as it was for serialization. */ public Registration register (Class type) { Registration registration = classResolver.getRegistration(type); if (registration != null) return registration; return register(type, getDefaultSerializer(type)); } /** Returns the best matching serializer for a class. This method can be overridden to implement custom logic to choose a * serializer. */ public Serializer getDefaultSerializer (Class type) { if (type == null) throw new IllegalArgumentException("type cannot be null."); final Serializer serializerForAnnotation = getDefaultSerializerForAnnotatedType(type); if (serializerForAnnotation != null) return serializerForAnnotation; for (int i = 0, n = defaultSerializers.size(); i < n; i++) { DefaultSerializerEntry entry = defaultSerializers.get(i); if (entry.type.isAssignableFrom(type)) { Serializer defaultSerializer = entry.serializerFactory.makeSerializer(this, type); return defaultSerializer; } } return newDefaultSerializer(type); } /** Called by {@link #getDefaultSerializer(Class)} when no default serializers matched the type. Subclasses can override this * method to customize behavior. The default implementation calls {@link SerializerFactory#makeSerializer(Kryo, Class)} using * the {@link #setDefaultSerializer(Class) default serializer}. */ protected Serializer newDefaultSerializer (Class type) { return defaultSerializer.makeSerializer(this, type); } /** Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already * registered, the existing entry is updated with the new serializer. Registering a primitive also affects the corresponding * primitive wrapper. * <p> * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when * using this method. The order must be the same at deserialization as it was for serialization. */ public Registration register (Class type, Serializer serializer) { Registration registration = classResolver.getRegistration(type); if (registration != null) { registration.setSerializer(serializer); return registration; } return classResolver.register(new Registration(type, serializer, getNextRegistrationId())); } /** Returns the lowest, next available integer ID. */ public int getNextRegistrationId () { while (nextRegisterID != -2) { if (classResolver.getRegistration(nextRegisterID) == null) return nextRegisterID; nextRegisterID++; } throw new KryoException("No registration IDs are available."); }
kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/util/DefaultClassResolver.javapost
static public final byte NAME = -1; protected final IntMap<Registration> idToRegistration = new IntMap(); protected final ObjectMap<Class, Registration> classToRegistration = new ObjectMap(); protected IdentityObjectIntMap<Class> classToNameId; public Registration registerImplicit (Class type) { return register(new Registration(type, kryo.getDefaultSerializer(type), NAME)); } public Registration register (Registration registration) { if (registration == null) throw new IllegalArgumentException("registration cannot be null."); if (registration.getId() != NAME) { if (TRACE) { trace("kryo", "Register class ID " + registration.getId() + ": " + className(registration.getType()) + " (" + registration.getSerializer().getClass().getName() + ")"); } idToRegistration.put(registration.getId(), registration); } else if (TRACE) { trace("kryo", "Register class name: " + className(registration.getType()) + " (" + registration.getSerializer().getClass().getName() + ")"); } classToRegistration.put(registration.getType(), registration); if (registration.getType().isPrimitive()) classToRegistration.put(getWrapperClass(registration.getType()), registration); return registration; } public Registration writeClass (Output output, Class type) { if (type == null) { if (TRACE || (DEBUG && kryo.getDepth() == 1)) log("Write", null); output.writeVarInt(Kryo.NULL, true); return null; } Registration registration = kryo.getRegistration(type); if (registration.getId() == NAME) writeName(output, type, registration); else { if (TRACE) trace("kryo", "Write class " + registration.getId() + ": " + className(type)); output.writeVarInt(registration.getId() + 2, true); } return registration; } protected void writeName (Output output, Class type, Registration registration) { output.writeVarInt(NAME + 2, true); if (classToNameId != null) { int nameId = classToNameId.get(type, -1); if (nameId != -1) { if (TRACE) trace("kryo", "Write class name reference " + nameId + ": " + className(type)); output.writeVarInt(nameId, true); return; } } // Only write the class name the first time encountered in object graph. if (TRACE) trace("kryo", "Write class name: " + className(type)); int nameId = nextNameId++; if (classToNameId == null) classToNameId = new IdentityObjectIntMap(); classToNameId.put(type, nameId); output.writeVarInt(nameId, true); output.writeString(type.getName()); } public void reset () { if (!kryo.isRegistrationRequired()) { if (classToNameId != null) classToNameId.clear(2048); if (nameIdToClass != null) nameIdToClass.clear(); nextNameId = 0; } }
這裏用-1表示
)則註冊到ObjectMap<Class, Registration> classToRegistration,若是有id不是NAME的,則註冊到IntMap<Registration> idToRegistration若是要序列化的類的字段中不單單有基本類型,還有未註冊的類,會調用這裏的writeClass方法
),從代碼能夠看到若是是NAME,則使用的是writeName;不是NAME的則直接使用output.writeVarInt(registration.getId() + 2, true),寫入int;writeName方法第一次遇到NAME的class時會給它生成一個nameId,而後放入到IdentityObjectIntMap<Class> classToNameId中,而後寫入int,再寫入class.getName,第二次遇到該class的時候,因爲classToNameId中已經存在nameId,於是直接寫入int;可是DefaultClassResolver的reset方法在registrationRequired是false這種狀況下會調用classToNameId.clear(2048),進行清空或者resize,這個時候一旦這個方法被調用,那麼下次可能沒法利用classToNameId用id替代className來序列化。kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/Kryo.java
/** Writes an object using the registered serializer. */ public void writeObject (Output output, Object object) { if (output == null) throw new IllegalArgumentException("output cannot be null."); if (object == null) throw new IllegalArgumentException("object cannot be null."); beginObject(); try { if (references && writeReferenceOrNull(output, object, false)) { getRegistration(object.getClass()).getSerializer().setGenerics(this, null); return; } if (TRACE || (DEBUG && depth == 1)) log("Write", object); getRegistration(object.getClass()).getSerializer().write(this, output, object); } finally { if (--depth == 0 && autoReset) reset(); } } /** Resets unregistered class names, references to previously serialized or deserialized objects, and the * {@link #getGraphContext() graph context}. If {@link #setAutoReset(boolean) auto reset} is true, this method is called * automatically when an object graph has been completely serialized or deserialized. If overridden, the super method must be * called. */ public void reset () { depth = 0; if (graphContext != null) graphContext.clear(); classResolver.reset(); if (references) { referenceResolver.reset(); readObject = null; } copyDepth = 0; if (originalToCopy != null) originalToCopy.clear(2048); if (TRACE) trace("kryo", "Object graph complete."); }
classToNameId.clear(2048)
)topology.fall.back.on.java.serialization
)若是爲true,則kryo.setRegistrationRequired(false),也就是若是一個class沒有在kryo進行註冊,不會拋異常;這個命名可能存在歧義(不是使用java自身的序列化機制來進行fallback
),它實際上要表達的是對於遇到沒有註冊的class要不要fallback,若是不fallback則直接拋異常,若是fallback,則會進行隱式註冊,在classToNameId不會被reset的前提下,第一次使用className來序列化,同時分配一個id寫入classToNameId,第二次則直接使用classToNameId中獲取到的id,也就至關於手工註冊的效果topology.tuple.serializer,默認是org.apache.storm.serialization.types.ListDelegateSerializer
)用於配置tuple的payload的序列化類topology.kryo.decorators
)用於加載自定義的serialization,能夠直接經過Config.registerDecorator註冊一個IKryoDecorator,在decorate方法中對Kyro註冊要序列化的classtopology.skip.missing.kryo.registrations,默認爲false
)這個屬性容易跟Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization
)混淆起來,前者是storm自身的屬性然後者storm包裝的kryo的屬性(registrationRequired
);Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS配置的是在有自定義Config.TOPOLOGY_KRYO_DECORATORS的場景下,若是storm加載不到用戶自定義的IKryoDecorator類時是skip仍是拋異常註冊到classToNameId
),只在第一次序列化的時候使用className,以後都用id替代,來節省空間;不過要注意的是若是Kryo的autoReset爲true的話,那麼classToNameId會被reset,於是隱式註冊在非第一次遇到未註冊的class的時候並不能一直走使用id代替className來序列化