需求是這樣的:kafka裏的數據存至afs上,每條數據都有一個start_time字段,是時間戳格式。現須要按照start_time字段存到具體的某一天某個小時的路徑下,相似這種目錄:xxx/2020-01-01(日期)/16(小時)/xxxspring
那就開始吧:
pom.xmlapache
<properties> <target.dir>log-processor</target.dir> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <log4j.version>2.8.2</log4j.version> <disruptor.version>3.3.6</disruptor.version> <org.springframework.version>4.3.10.RELEASE</org.springframework.version> <scala.version>2.11.0</scala.version> <hadoop.version>2.7.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>io.netty</groupId> <artifactId>netty</artifactId> </exclusion> </exclusions> <scope>provided</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.18.Final</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.2.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.11</artifactId> <version>2.4.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.1</version> </dependency> <!-- 會有包衝突致使的異常,所以排除net.jpountz.lz4--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> <exclusions> <exclusion> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency>
public class SparkStreaming { public static void main(String[] args) throws InterruptedException { Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers"); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroupId"); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); SparkConf conf = new SparkConf().setAppName("sparkTask"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); jssc.sparkContext().setLogLevel("WARN"); String brokers = "broker1, broker2"; final JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(new HashSet<>(Arrays.asList(brokers)), kafkaParams)); lines.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() { public Tuple2<String, String> call(ConsumerRecord<String, String> record) { JsonObject obj = null; String date = null; try { obj = new JsonParser().parse(record.value()).getAsJsonObject(); Long startTime = Long.parseLong(obj.get("start_time").getAsString()); LocalDateTime time = LocalDateTime.ofEpochSecond(startTime, 0, ZoneOffset.ofHours(8)); date = time.toLocalDate() + "/" + time.getHour(); } catch (Exception e) { e.printStackTrace(); return null; } return new Tuple2<String, String>(date, record.value()); } }).foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() { @Override public void call(JavaPairRDD<String, String> dateStringJavaPairRDD) { try { // 取出(date, line)中date並去重 List<String> dateKeys = dateStringJavaPairRDD.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> v1) throws Exception { return v1._1; } }).distinct().collect(); // 根據date過濾數據並分別寫入不一樣目錄 for (String dateKey : dateKeys) { String savePath = String.join("/", new String[]{"feature", dateKey, String.valueOf(Timestamp.valueOf(LocalDateTime.now()).getTime())}); JavaRDD<String> resultRdd = null; resultRdd = dateStringJavaPairRDD.filter(new Function<Tuple2<String, String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { if (v1._1.equals(dateKey)) { return true; } else { return false; } } }).map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> v1) throws Exception { return v1._2; } }); resultRdd.repartition(1).saveAsTextFile(savePath); } } catch (Exception e) { e.printStackTrace(); } } }); jssc.start(); jssc.awaitTermination(); } }