spark-stream 訪問 Redis

最近在spark-stream上寫了一些流計算處理程序,程序架構以下java

clipboard.png

程序運行在Spark-stream上,個人目標是kafka、Redis的參數都支持在啓動時指定。redis

在寫代碼時參考了這篇文章 https://www.iteblog.com/archi...,該文講的比較清楚,可是有兩個問題:apache

  1. 用scala實現的性能優化

  2. Redis服務器的地址是寫死的,個人程序要挪個位置,要從新改代碼編譯。服務器

當時倒騰了一些時間,如今寫出來和你們分享,提升後來者的效率。架構

clipboard.png

如上圖Spark是分佈式引擎,Driver中建立的Redis Pool,在Worker上又得從新建立,參考文章中是定義一個Redis鏈接池管理類,Redis Pool是類的靜態變量,類加載時由JVM自動建立。這個和個人預期有差距。分佈式

在Driver中建立Redis管理對象,而後將該對象廣播,而後在Worker上獲取該廣播對象,從而實現參數可變,可是Redis管理對象在每一個Worker上又只實例化了一次。ide

Driver

Driver 指定序列化方式,Spark支持兩種序列化方式,Java 和 Kryo,Kryo更高效。函數

資料上說Kryo方式須要註冊類,可是我沒有註冊也能成功運行。性能

public static void main(String[] args) {
        if (args.length < 3) {
            System.err.println("Usage: kafka_spark_redis <brokers> <topics> <redisServer>\n" +
                    "  <brokers> Kafka broker列表\n" +
                    "  <topics> 要消費的topic列表\n" +
                    " <redisServer> redis 服務器地址 \n\n");
            System.exit(1);
        }

        /* 解析參數 */
        String brokers = args[0];
        String topics = args[1];
        String redisServer = args[2];

        // 建立stream context,兩秒鐘的數據算一批
        SparkConf sparkConf = new SparkConf().setAppName("kafka_spark_redis");
//        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");//java的序列號速度沒有Kryo速度快
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//        sparkConf.set("spark.kryo.registrator", "MyRegistrator");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
        JavaSparkContext sc = jssc.sparkContext();

        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);
        kafkaParams.put("group.id","kakou-test");

        //Redis鏈接池管理類
        RedisClient redisClient = new RedisClient(redisServer);//建立redis鏈接池管理類

        //廣播Reids鏈接池管理對象
        final Broadcast<RedisClient> broadcastRedis = sc.broadcast(redisClient);

        // 建立流處理對象
        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,               /* kafka key class */
                String.class,               /* kafka value class */
                StringDecoder.class,        /* key 解碼類 */
                StringDecoder.class,        /* value 解碼類 */
                kafkaParams,                /* kafka 參數,如設置kafka broker */
                topicsSet                   /* 待消費的topic名稱 */
        );

        // 將行分拆爲單詞
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            //@Override
            // kafka傳來key-value對
            public String call(Tuple2<String, String> tuple2) {

                // 取value值
                return tuple2._2();
            }
        });
        /* 大量省略 */
        ........
    }

RedisClient

RedisClient 是本身實現的類,在類中重載write/read這兩個序列化和反序列化函數,須要注意的是若是是Java Serializer 須要實現其它的接口。

在Driver廣播時會觸發調用write序列化函數。

public class RedisClient implements KryoSerializable {
    public static JedisPool jedisPool;
    public String host;

    public RedisClient(){
        Runtime.getRuntime().addShutdownHook(new CleanWorkThread());
    }

    public RedisClient(String host){
        this.host=host;
        Runtime.getRuntime().addShutdownHook(new CleanWorkThread());
        jedisPool = new JedisPool(new GenericObjectPoolConfig(), host);
    }

    static class CleanWorkThread extends Thread{
        @Override
        public void run() {
            System.out.println("Destroy jedis pool");
            if (null != jedisPool){
                jedisPool.destroy();
                jedisPool = null;
            }
        }
    }

    public Jedis getResource(){
        return jedisPool.getResource();
    }

    public void returnResource(Jedis jedis){
        jedisPool.returnResource(jedis);
    }

    public void write(Kryo kryo, Output output) {
        kryo.writeObject(output, host);
    }

    public void read(Kryo kryo, Input input) {
        host=kryo.readObject(input, String.class);
        this.jedisPool =new JedisPool(new GenericObjectPoolConfig(), host) ;
    }
}

Worker

在foreachRDD中獲取廣播變量,由廣播變量觸發先調用RedisClient的無參反序列化函數,而後再調用反序列化函數,咱們的作法是在反序列化函數中建立Redis Pool。

//標準輸出,對車輛的車牌和黑名單進行匹配,對與匹配成功的,保存到redis上。
        paircar.foreachRDD(new Function2<JavaRDD<HashMap<String, String>>, Time, Void>() {
            public Void call(JavaRDD<HashMap<String, String>> rdd, Time time) throws Exception {
                Date now=new Date();
                rdd.foreachPartition(new VoidFunction<Iterator<HashMap<String, String>>>() {
                    public void call(Iterator<HashMap<String, String>> it) throws Exception {
                        String tmp1;
                        String tmp2;
                        Date now=new Date();
                        RedisClient redisClient=broadcastRedis.getValue();
                        Jedis jedis=redisClient.getResource();

                        ......

                        redisClient.returnResource(jedis);
                    }
                });

結語

Spark對分佈式計算作了封裝,但不少場景下仍是要了解它的工做機制,不少問題和性能優化都和Spark的工做機制緊密相關。

相關文章
相關標籤/搜索