如何在Spark的Worker節點中給RocketMq發送消息

1.背景java

    以前使用spark進行數據計算,須要將計算結果發送到rocketmq上去,有兩種作法:第一種是將計算結果collect到Driver端,而後統一發送。第二種是直接在各個計算結果的partition(即foreachPartition函數)分片中發送。第一種存在的問題是,若是計算結果的數據量很是龐大,如上千萬,就須要很大的內存來支持,同時增長了網絡傳輸開銷。若是是第二種就不存在這種問題,直接在worker節點發送完畢,不存在數據堆積和網絡開銷。網絡

    既然說是要發送數據到rocketMQ就要說到rocketmq客戶端DefaultMQProducer類,該類是沒有實現java的Serializable接口的,因此沒法定義一個全局變量,讓各個worker直接使用該變量來發送數據,因此須要用到另外一種寫法——靜態類工具。app

2.Java序列化基本規則函數

    上面說到須要使用靜態類工具來實如今各個partition分別發送mq消息,其理論基礎就是Java序列化規則。咱們知道Java在默認狀況下,不會對被static和transient關鍵詞修飾的屬性進行序列化和反序列化。這個能夠驗證,靜態屬性反序列化有仍是默認值,利用這個原理封裝rocketmq工具。工具

public class JavaBean {

    private String name;
    private int version;
}

public class WrapperBean implements Serializable {

    private static JavaBean javaBean;//因爲改對象沒有實現Serializable接口,因此必須定義爲靜態屬性,不然報錯

    private static String staticName="默認靜態變量值";
}

###序列化
public class JdkSerializableMain {

    public static void main(String[] args) {
        String file = "D:/demo/javabean.seri";
        serializable(file);
    }

    private static void serializable(String file) {
        ObjectOutputStream oos = null;
        try{
            oos = new ObjectOutputStream(new FileOutputStream(file));
            Object object = getObject();
            System.out.println("序列化對象:"+object.toString());
            oos.writeObject(object);
            oos.flush();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(oos !=  null){
                try {
                    oos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static Object getObject() {
        JavaBean javaBean = new JavaBean("Java設計本來", 44);
        WrapperBean wb = new WrapperBean(javaBean,"修改後的靜態變量值");
        return wb;
    }
}
#####反序列化
public class JdkDeSerializableMain {

    public static void main(String[] args) {
        String file = "D:/demo/javabean.seri";
        deserializable(file);
    }

    private static void deserializable(String file) {
        ObjectInputStream ois = null;
        try{
            ois = new ObjectInputStream(new FileInputStream(file));
            Object o = ois.readObject();
            if(o != null){
                System.out.println("Class :"+o.getClass());
                WrapperBean jb = (WrapperBean)o;
                System.out.println("反序列化結果:"+jb.toString());
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(ois != null){
                try {
                    ois.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

結果:
序列化對象:WrapperBean{javaBean=JavaBean{name='Java設計本來', version=44}},staticName=修改後的靜態變量值
反序列化結果:WrapperBean{javaBean=null},staticName=默認靜態變量值

3.RocketMq工具spa

    該工具利用靜態屬性沒法被序列化原理,在各個worker節點中調用getInstance()方法時,實際拿到的是該worker節點加載RocketMqUtils初始化靜態代碼塊拿到的DefaultMQProducer實例,因此能夠正常在foreachPartition()中調用發送rocketmq消息設計

public class RocketMqUtils implements Serializable {

    private static Logger log = LoggerFactory.getLogger(RocketMqUtils.class);

    private static DefaultMQProducer producer=null;
    private static  RocketMqUtils rocketMqUtils = null;
    static {
        ClassPathResource classPathResource = new ClassPathResource("/task-config.properties");
        Properties properties = null;
        try {
            properties = PropertiesLoaderUtils.loadProperties(classPathResource);
            String address = properties.getProperty("mq.namesrvAddr");
            String produceGroup = properties.getProperty("mq.producerGroup");
            log.info("初始化RocketMq,Address={},producerGroup={}",address,produceGroup);
            producer = new DefaultMQProducer(produceGroup);
            producer.setNamesrvAddr(address);
            producer.start();
        } catch (Exception e) {
            log.error("初始化RocketMq失敗",e);
        }
    }

    public static synchronized RocketMqUtils getInstance(){
        if(rocketMqUtils ==null){
            rocketMqUtils = new RocketMqUtils();
        }
        return rocketMqUtils;
    }

    public static void main(String[] args) throws Exception {
        RocketMqUtils rm = new RocketMqUtils();
        Message msg = new Message();
        msg.setTopic("test_jcc");
        msg.setTags("jcc");
        msg.setKeys("kkk");
        msg.setBody("test msg".getBytes());
        rm.sendMsg(msg);
        rm.shutDownMq();
    }

    public  void sendMsg(Message msg) throws Exception {
        try {
            SendResult sendResult = producer.send(msg);
            log.info("sendMsg = " + sendResult.toString());
            System.out.println(sendResult.toString());
        } catch (Exception var3) {
            log.error("MQ send ERROR", var3);
            throw new Exception("操做MQ出錯!");
        }
    }

    public void shutDownMq(){
        if (producer != null){
            producer.shutdown();
        }
    }
}
相關文章
相關標籤/搜索