1.背景java
以前使用spark進行數據計算,須要將計算結果發送到rocketmq上去,有兩種作法:第一種是將計算結果collect到Driver端,而後統一發送。第二種是直接在各個計算結果的partition(即foreachPartition函數)分片中發送。第一種存在的問題是,若是計算結果的數據量很是龐大,如上千萬,就須要很大的內存來支持,同時增長了網絡傳輸開銷。若是是第二種就不存在這種問題,直接在worker節點發送完畢,不存在數據堆積和網絡開銷。網絡
既然說是要發送數據到rocketMQ就要說到rocketmq客戶端DefaultMQProducer類,該類是沒有實現java的Serializable接口的,因此沒法定義一個全局變量,讓各個worker直接使用該變量來發送數據,因此須要用到另外一種寫法——靜態類工具。app
2.Java序列化基本規則函數
上面說到須要使用靜態類工具來實如今各個partition分別發送mq消息,其理論基礎就是Java序列化規則。咱們知道Java在默認狀況下,不會對被static和transient關鍵詞修飾的屬性進行序列化和反序列化。這個能夠驗證,靜態屬性反序列化有仍是默認值,利用這個原理封裝rocketmq工具。工具
public class JavaBean { private String name; private int version; } public class WrapperBean implements Serializable { private static JavaBean javaBean;//因爲改對象沒有實現Serializable接口,因此必須定義爲靜態屬性,不然報錯 private static String staticName="默認靜態變量值"; } ###序列化 public class JdkSerializableMain { public static void main(String[] args) { String file = "D:/demo/javabean.seri"; serializable(file); } private static void serializable(String file) { ObjectOutputStream oos = null; try{ oos = new ObjectOutputStream(new FileOutputStream(file)); Object object = getObject(); System.out.println("序列化對象:"+object.toString()); oos.writeObject(object); oos.flush(); }catch (Exception e){ e.printStackTrace(); }finally { if(oos != null){ try { oos.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static Object getObject() { JavaBean javaBean = new JavaBean("Java設計本來", 44); WrapperBean wb = new WrapperBean(javaBean,"修改後的靜態變量值"); return wb; } } #####反序列化 public class JdkDeSerializableMain { public static void main(String[] args) { String file = "D:/demo/javabean.seri"; deserializable(file); } private static void deserializable(String file) { ObjectInputStream ois = null; try{ ois = new ObjectInputStream(new FileInputStream(file)); Object o = ois.readObject(); if(o != null){ System.out.println("Class :"+o.getClass()); WrapperBean jb = (WrapperBean)o; System.out.println("反序列化結果:"+jb.toString()); } }catch (Exception e){ e.printStackTrace(); }finally { if(ois != null){ try { ois.close(); } catch (IOException e) { e.printStackTrace(); } } } } } 結果: 序列化對象:WrapperBean{javaBean=JavaBean{name='Java設計本來', version=44}},staticName=修改後的靜態變量值 反序列化結果:WrapperBean{javaBean=null},staticName=默認靜態變量值
3.RocketMq工具spa
該工具利用靜態屬性沒法被序列化原理,在各個worker節點中調用getInstance()方法時,實際拿到的是該worker節點加載RocketMqUtils初始化靜態代碼塊拿到的DefaultMQProducer實例,因此能夠正常在foreachPartition()中調用發送rocketmq消息設計
public class RocketMqUtils implements Serializable { private static Logger log = LoggerFactory.getLogger(RocketMqUtils.class); private static DefaultMQProducer producer=null; private static RocketMqUtils rocketMqUtils = null; static { ClassPathResource classPathResource = new ClassPathResource("/task-config.properties"); Properties properties = null; try { properties = PropertiesLoaderUtils.loadProperties(classPathResource); String address = properties.getProperty("mq.namesrvAddr"); String produceGroup = properties.getProperty("mq.producerGroup"); log.info("初始化RocketMq,Address={},producerGroup={}",address,produceGroup); producer = new DefaultMQProducer(produceGroup); producer.setNamesrvAddr(address); producer.start(); } catch (Exception e) { log.error("初始化RocketMq失敗",e); } } public static synchronized RocketMqUtils getInstance(){ if(rocketMqUtils ==null){ rocketMqUtils = new RocketMqUtils(); } return rocketMqUtils; } public static void main(String[] args) throws Exception { RocketMqUtils rm = new RocketMqUtils(); Message msg = new Message(); msg.setTopic("test_jcc"); msg.setTags("jcc"); msg.setKeys("kkk"); msg.setBody("test msg".getBytes()); rm.sendMsg(msg); rm.shutDownMq(); } public void sendMsg(Message msg) throws Exception { try { SendResult sendResult = producer.send(msg); log.info("sendMsg = " + sendResult.toString()); System.out.println(sendResult.toString()); } catch (Exception var3) { log.error("MQ send ERROR", var3); throw new Exception("操做MQ出錯!"); } } public void shutDownMq(){ if (producer != null){ producer.shutdown(); } } }