SparkStreaming讀kafka數據保存爲一個文件

需求是這樣的:kafka裏的數據存至afs上,每條數據都有一個start_time字段,是時間戳格式。現須要按照start_time字段存到具體的某一天某個小時的路徑下,相似這種目錄:xxx/2020-01-01(日期)/16(小時)/xxxspring

那就開始吧:
pom.xmlapache

<properties>
        <target.dir>log-processor</target.dir>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <log4j.version>2.8.2</log4j.version>
        <disruptor.version>3.3.6</disruptor.version>
        <org.springframework.version>4.3.10.RELEASE</org.springframework.version>
        <scala.version>2.11.0</scala.version>
        <hadoop.version>2.7.4</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty</artifactId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.18.Final</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- 會有包衝突致使的異常,所以排除net.jpountz.lz4-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
public class SparkStreaming {
    public static void main(String[] args) throws InterruptedException {
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroupId");
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        SparkConf conf = new SparkConf().setAppName("sparkTask");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        jssc.sparkContext().setLogLevel("WARN");

        String brokers = "broker1, broker2";
        final JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(new HashSet<>(Arrays.asList(brokers)), kafkaParams));

        lines.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
            public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                JsonObject obj = null;
                String date = null;
                try {
                    obj = new JsonParser().parse(record.value()).getAsJsonObject();
                    Long startTime = Long.parseLong(obj.get("start_time").getAsString());
                    LocalDateTime time = LocalDateTime.ofEpochSecond(startTime, 0, ZoneOffset.ofHours(8));
                    date = time.toLocalDate() + "/" + time.getHour();
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
                return new Tuple2<String, String>(date, record.value());
            }
        }).foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
            @Override
            public void call(JavaPairRDD<String, String> dateStringJavaPairRDD) {
                try {
                    // 取出(date, line)中date並去重
                    List<String> dateKeys = dateStringJavaPairRDD.map(new Function<Tuple2<String, String>, String>() {
                        @Override
                        public String call(Tuple2<String, String> v1) throws Exception {
                            return v1._1;
                        }
                    }).distinct().collect();
                    // 根據date過濾數據並分別寫入不一樣目錄
                    for (String dateKey : dateKeys) {
                        String savePath = String.join("/",
                                new String[]{"feature", dateKey,
                                        String.valueOf(Timestamp.valueOf(LocalDateTime.now()).getTime())});
                        JavaRDD<String> resultRdd = null;
                        resultRdd = dateStringJavaPairRDD.filter(new Function<Tuple2<String, String>, Boolean>() {
                            @Override
                            public Boolean call(Tuple2<String, String> v1) throws Exception {
                                if (v1._1.equals(dateKey)) {
                                    return true;
                                } else {
                                    return false;
                                }
                            }
                        }).map(new Function<Tuple2<String, String>, String>() {

                            @Override
                            public String call(Tuple2<String, String> v1) throws Exception {
                                return v1._2;
                            }
                        });
                        resultRdd.repartition(1).saveAsTextFile(savePath);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        jssc.start();
        jssc.awaitTermination();
    }
}
相關文章
相關標籤/搜索