Strom序列化機制

  Storm 中的 tuple能夠包含任何類型的對象。因爲Storm 是一個分佈式系統,因此在不一樣的任務之間傳遞消息時Storm必須知道怎樣序列化、反序列化消息對象。html

  Storm 使用 Kryo庫對對象進行序列化。Kryo 是一個靈活、快速的序列化庫。Storm 默認支持基礎類型、string、byte arrays、ArrayList、HashMap、HashSet 以及 Clojure 的集合類型的序列化。若是須要在tuple中使用其餘的對象類型,就須要註冊一個自定義的序列化器。java

原文和做者一塊兒討論:http://www.cnblogs.com/intsmaze/p/7044042.html程序員

微信:intsmaze微信

避免微信回覆重複諮詢問題,技術諮詢請博客留言。網絡

自定義序列化

  TORM使用Kryo來序列化。要實現自定義序列化器,咱們須要使用Kryo註冊新的序列化器。添加自定義序列化器是經過拓撲配置的topology.kryo.register屬性完成的。它須要一個註冊的列表,每一個註冊項能夠採起兩種形式:架構

1:類名註冊,在這種狀況下,Storm將使用Kryo的FieldsSerializer來序列化該類。這多是也可能不是該類最好的選擇,更多的細節能夠查看Kryo文檔。分佈式

2:實現了com.esotericsoftware.kryo.Serializer接口的類名註冊的映射。ide

Storm爲拓撲配置裏的註冊序列化提供了幫助。Config類中有一個名爲registerSerialization的方法,能夠把註冊添加到配置中。性能

public void registerSerialization(Class klass);
public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass);

java序列化

  一個拓撲中不一樣的任務傳遞消息時Storm發現了一個沒有註冊序列化器的類型,它會使用 Java 序列化器來代替,若是這個對象沒法被Java序列化器序列化,Storm 就會拋出異常。測試

  注意,Java 自身的序列化機制很是耗費資源,並且無論在 CPU 的性能上仍是在序列化對象的大小上都沒有優點。強烈建議讀者在生產環境中運行topology 的時候註冊一個自定義的序列化器。

  能夠經過將 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION 配置爲 false 的方式來將禁止序列化器回退到Java的序列化機制。

Config.setFallBackOnJavaSerialization(conf,false);這個時候若是storm使用java序列化就會拋出異常告訴開發人員去註冊一個kryo序列化。

實現storm序列化

建立傳輸的對象。

package cn.intsmaze.serializable.bean;
public class Person {
  private int age;
  private Studnet studnet;
  private ArrayList arrayList=new ArrayList();
  private LinkedList linkedList=new LinkedList();
  public Person() {
  }
  public Person(int age,Studnet s) {
    this.age = age;
    this.studnet=s;
    arrayList.add("ArrayList中的"+s.getName());
    linkedList.add("linkedList中的"+s.getName());
  }
  @Override
  public String toString() {
    return "Person [age=" + age + ", studnet=" + studnet + ", arrayList="
    + arrayList + ", linkedList=" + linkedList + "]";
  }
    get(),set()......
}

package cn.intsmaze.serializable.bean;
public class Studnet {
  private String name;
  public Studnet() {
  }
  public Studnet(String name) {
    this.name = name;
  }
  @Override
  public String toString() {
    return "Studnet [name=" + name + "]";
  } 
  get(),set()......
}

spout和bolt的實現,spout每次會建立一個person對象將該對象發送到bolt,bolt類接收到該對象將該對象打印出來。

package cn.intsmaze.serializable;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.Studnet;
public class SpoutBean extends BaseRichSpout {
  SpoutOutputCollector collector;
  public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    this.collector = collector;
  }
  public void nextTuple() {
    Studnet s=new Studnet("xiaoxi");
    collector.emit(new Values(new Person(100,s)));
    Utils.sleep(500);
  }
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("person"));
  }
} 
 
package cn.intsmaze.serializable;
import cn.intsmaze.serializable.bean.Person;
public class BoltBean extends BaseBasicBolt {
  public void prepare(Map stormConf, TopologyContext context) {
    super.prepare(stormConf, context);
  }
  public void execute(Tuple input, BasicOutputCollector collector) {
    Person person = (Person)input.getValueByField("person");
    System.out.println("接收到spout節點傳來的數據:"+person);
  }
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  }
}

場景一:

使用public void registerSerialization(Class klass);

package cn.intsmaze.serializable;
import java.util.LinkedList;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.Studnet;
public class TopologyBean {
public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new SpoutBean(), 1);
    builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout");
    Config conf = new Config();
    conf.registerSerialization(Person.class);
    conf.registerSerialization(Studnet.class);
    //註釋掉後,但Studnet沒實現java序列化,則會報錯。有兩種方法,一種註冊該類,一種實現java序列化。
    conf.registerSerialization(LinkedList.class);
    //這裏若是註釋掉,則會使用java序列化方式,若是咱們取消掉禁止使用java序列化方法,則會提示註冊LinkedList類報錯。
    conf.setNumWorkers(2);
     // Config.setFallBackOnJavaSerialization(conf, false);//禁止使用java語言本身的序列化
    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
   }
}

  第11行,咱們註冊person類使用Kryo序列化,person對象除了有基本類型int字段外,還有arraylist,linkedlist類型以及自定義的student類型。arraylist類型是storm默認已經提供了支持。

  這裏若是咱們不對linkedlist類型和自定義類型student進行註冊則該拓撲在運行時則會報沒法序列化student類型異常。

這個時候有兩種辦法解決:

  一種就是使student實現java的public class Studnet implements Serializable接口,則該拓撲會成功運行。由於storm若是發現傳輸的對象若是沒有註冊爲Kryo,則就會使用java的序列化對象,而linkedlist默認已經實現了該接口,因此纔會出現前面報student對象沒法序列化,而後使得student實現java的序列化接口便可。

  第二種方案就是,咱們對student類進行註冊conf.registerSerialization(Studnet.class);。

雖然linkedlist不註冊,會默認使用java的序列化,可是出於效率的考慮,咱們將其註冊爲Kryo。

  提示:由於有些集合類型,storm沒有提供序列化支持,可是實現了java序列化接口,因此若是咱們不加以控制,會使用java序列化而拖累整個系統。因此推薦使用

Config.setFallBackOnJavaSerialization(conf, false);禁止使用java語言本身的序列化來能夠在本地模式時及時發現報錯信息,將問題儘早解決。

場景二:

  咱們使用kryo序列化,可是有時候咱們並不但願傳輸對象的全部字段,而只是傳輸對象的某些字段,從而進一步提升消息的傳遞速率,這個時候咱們可使用kryo的自定義序列化機制來指定傳輸的值。

package cn.intsmaze.serializable.bean;
import java.util.ArrayList;
import java.util.LinkedList;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class PersonSerializable extends Serializer<Person>{
  @Override
  public Person read(Kryo kryo, Input input, Class<Person> arg2) {
    System.out.println("序列化");
    Person person=new Person();
    person.setAge(input.readInt());
    person.setArrayList(kryo.readObject(input, ArrayList.class));
    person.setLinkedList(kryo.readObject(input, LinkedList.class));//該類型,storm默認不支持,因此要在topology中註冊該類型,若是不註冊,則會使用java序列化。 person.setStudnet(kryo.readObject(input, Studnet.class));//該類型,storm默認不支持,因此要在topology中註冊該類型,若是不註冊,且java序列化沒有實現,則會報錯。
    return person;
  }
  @Override
  public void write(Kryo kryo, Output output, Person person) {
    System.out.println("反序列化");
    output.writeInt(person.getAge());
    kryo.writeObject(output, person.getArrayList());
    kryo.writeObject(output, person.getLinkedList());
    kryo.writeObject(output, person.getStudnet());
  }
}

 

package cn.intsmaze.serializable;
import java.util.LinkedList;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.PersonSerializable;
import cn.intsmaze.serializable.bean.Studnet;
public class TopologyBean {
  public static void main(String[] args) throws Exception {
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("spout", new SpoutBean(), 1);
      builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout");
      Config conf = new Config();
      conf.registerSerialization(Studnet.class);
      conf.registerSerialization(LinkedList.class);
      conf.registerSerialization(Person.class, PersonSerializable.class);
      conf.setNumWorkers(2);     
      StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
      builder.createTopology());
    }
}

  由於PersonSerializable類中指定了要傳輸person對象的int,studne,ArrayList,LinkedList 類型。

若是咱們註釋掉第12行 conf.registerSerialization(Studnet.class);且Studnet類沒有實現java的序列化,則拓撲的任務間傳遞消息進行序列化時就會報沒法序列化該類的錯誤,感興趣的同窗能夠試試註釋掉該行,看看storm會報什麼異常。

第13行,咱們必須註冊對LinkedList序列化,storm默認支持了對ArrayList類的序列化,但沒有提供對LinkedList序列化,須要咱們手動註冊,若是不註冊,由於LinkedList實現了java的序列化接口,因此會使用java序列化,則不會報錯。

  強烈建議,在開發中就算註冊了kyro序列化方式,也要設置該conf.setFallBackOnJavaSerialization(false)方法來禁止使用java序列化方式,由於實際開發中,核心架構搭建好了,會讓團隊成員直接在現成架構上編寫,他們不須要了解storm的一些機制,可是這也帶來問題,一種場景就是,開發人員對傳輸對象增長了一個LinkedList字段,可是他沒有註冊序列化類,storm就會對LinkedList使用java序列化,就會拖累系統的性能,因此在架構的時候,經過設置禁止java序列化方法,就能夠在測試中及時發現問題所在。

補充:上面的全部一切,在本地運行以及部署到集羣時,work數量設置爲1時,都不會生效的。由於同一個對象公有一個內存,不會涉及網絡傳輸的,也就不須要序列化和反序列化。

生產場景回顧:

  本人intsmaze生產上碰見的問題:storm工程中對傳輸對象使用了conf.registerSerialization(Person.class, PersonSerializable.class);方式來指定序列化該對象的某些字段。初級程序員在storm工程上開發時,由於業務須要對傳輸對象增長了一個字段,可是沒有在PersonSerializable中序列化和反序列化該對象。恰巧的時,初級工程師本地模式和準生產測試時,topology的work的數量都爲1,致使對象在bolt和bolt節點傳輸時並無走序列化方式,結果測試一切正常,可是上生產後,由於work數量是10個,立馬在後一個bolt中報大量的空指針異常,形成很嚴重的生產問題。

相關文章
相關標籤/搜索