spark jdk8 單詞統計示例

在github上有spark-java8 實例地址:java

https://github.com/ypriverol/spark-java8git

https://github.com/ihr/java8-sparkgithub

 

學些java8 Lambda Expressions 的能夠參考下,同時本身也作下比較。sql

java8 代碼實例express

 1 /* 
 2  * Licensed to the Apache Software Foundation (ASF) under one or more 
 3  * contributor license agreements.  See the NOTICE file distributed with 
 4  * this work for additional information regarding copyright ownership. 
 5  * The ASF licenses this file to You under the Apache License, Version 2.0 
 6  * (the "License"); you may not use this file except in compliance with 
 7  * the License.  You may obtain a copy of the License at 
 8  * 
 9  *    http://www.apache.org/licenses/LICENSE-2.0 
10  * 
11  * Unless required by applicable law or agreed to in writing, software 
12  * distributed under the License is distributed on an "AS IS" BASIS, 
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14  * See the License for the specific language governing permissions and 
15  * limitations under the License. 
16  */  
17   
18 package com.east.spark.stream;  
19   
20 import java.util.Arrays;  
21 import java.util.List;  
22 import java.util.regex.Pattern;  
23   
24 import org.apache.spark.api.java.JavaPairRDD;  
25 import org.apache.spark.api.java.JavaRDD;  
26 import org.apache.spark.sql.SparkSession;  
27   
28 import scala.Tuple2;  
29   
30 public final class JavaWordCount2 {  
31     private static final Pattern SPACE = Pattern.compile(" ");  
32   
33     public static void main(String[] args) throws Exception {  
34   
35         args = new String[] { "D:/tmp/spark/test.txt" };  
36   
37         if (args.length < 1) {  
38             System.err.println("Usage: JavaWordCount <file>");  
39             System.exit(1);  
40         }  
41   
42         SparkSession spark = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();  
43   
44         // SparkConf conf = new  
45         // SparkConf().setAppName("ingini-spark-java8").setMaster("local");  
46   
47         JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();  
48   
49         JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());  
50   
51         JavaPairRDD<String, Integer> counts = words.mapToPair(w -> new Tuple2<String, Integer>(w, 1))  
52                 .reduceByKey((x, y) -> x + y);  
53         // counts.collect();  
54   
55         List<Tuple2<String, Integer>> output = counts.collect();  
56         for (Tuple2<?, ?> tuple : output) {  
57             System.out.println(tuple._1() + ":== " + tuple._2());  
58         }  
59   
60         spark.stop();  
61     }  
62 }  

 

更簡潔的寫法:apache

1 JavaRDD<String> lines = sc.textFile("src/main/resources/a.txt");
2         JavaPairRDD<String, Integer> counts = lines.flatMap(line -> Arrays.asList(line.split(REGEX)))
3                 .mapToPair(word -> new Tuple2(word, 1))
4                 .reduceByKey((x, y) -> (Integer) x + (Integer) y)
5                 .sortByKey();
6         
7         counts.foreach(stringIntegerTuple2 ->System.out.println( stringIntegerTuple2._1+":"+stringIntegerTuple2._2));
相關文章
相關標籤/搜索