1)Driver驅動中的一個集合(parallelizePairs parallelize)java
2)從本地(file:///d:/test)或者網絡(file:///hdfs:localhost:7777)存上獲取git
textFile textWholeFilesgithub
3)流式數據源:Socket (socketTextStream)數據庫
一、普通文件apache
二、JSONjson
三、CSVapi
若是CSV的全部數據字段均沒有包含換行符,可使用 textFile() 讀取並解析數據,若是在字段中嵌有換行符,就須要用wholeTextFiles()完整讀入每一個文件,而後解析各段.數組
因爲在 CSV 中咱們不會在每條記錄中輸出字段名,所以爲了使輸出保持一致,須要 建立一種映射關係。一種簡單作法是寫一個函數,用於將各字段轉爲指定順序的數組。網絡
四、sequence file 二進制形式 鍵值對多線程
五、object file JDK 序列化(看起來是對sequenceFile進行了簡單封裝,他容許存儲只包含值的RDD,和sequenceFile不同的是,對象文件是java序列化寫出的,讀取的對象不能改變(輸出會依賴對象))
import java.io.Serializable; import java.io.StringReader; import java.util.ArrayList; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import scala.Tuple2; import au.com.bytecode.opencsv.CSVReader; import com.fasterxml.jackson.databind.ObjectMapper; public class SparkIO_File { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); fileTest(sc); sc.stop(); sc.close(); } static void fileTest(JavaSparkContext sc){ //每行都是rdd // JavaRDD<String> rdd = sc.textFile("file:///E:/codes2016/workspace/Spark1/src/spark1106_StreamSpark/UpdateStateByKeyDemo.java"); //wholeTextFiles返回一個鍵值對類型,鍵爲文件全路徑,值爲文件內容,分區數是2 JavaPairRDD<String, String> rdd = sc.wholeTextFiles("file:///E:/codes2016/workspace/Spark1/src/spark1106_StreamSpark"); System.out.println("分區數:"+rdd.getNumPartitions()); //分區數爲2 rdd.foreach(x->{ System.out.println("當前元素:" + x); }); System.out.println(rdd.count()); rdd.saveAsTextFile("file:///d:/jsontext/filewholetext"); } }
import java.io.Serializable; import java.io.StringReader; import java.util.ArrayList; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import scala.Tuple2; import au.com.bytecode.opencsv.CSVReader; import com.fasterxml.jackson.databind.ObjectMapper; public class SparkIO_JSON { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); writeJsonTest(sc); sc.stop(); sc.close(); } //讀JSON static void readJsonTest(JavaSparkContext sc){ //若是json文件中斷了行就讀不出來了,沒截斷的部分任然會顯示 JavaRDD<String> input = sc.textFile("file:///d:/jsontext/jsonsong.json"); //使用wholetextfile就不會有斷行的錯誤,由於讀的是整個文件 // JavaRDD<String> input = sc.wholeTextFiles("file:///d:/jsontext/jsonsong.json"); // JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()); JavaRDD<Mp3Info> result = input.map(x->{ ObjectMapper mapper=new ObjectMapper(); return mapper.readValue(x, Mp3Info.class); }); result.foreach(x->System.out.println(x)); } //寫JSON static void writeJsonTest(JavaSparkContext sc){ JavaRDD<String> input = sc.textFile("file:///d:/jsontext/jsonsong.json"); JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()). filter( x->x.getAlbum().equals("懷舊專輯") ); // JavaRDD<String> formatted = result.mapPartitions(new WriteJson()); JavaRDD<String> formatted = result.map(x->{ ObjectMapper mapper=new ObjectMapper(); return mapper.writeValueAsString(x); }); result.foreach(x->System.out.println(x)); formatted.saveAsTextFile("file:///d:/jsontext/jsonsongout"); } } class ParseJson implements FlatMapFunction<Iterator<String>, Mp3Info>, Serializable { public Iterator<Mp3Info> call(Iterator<String> lines) throws Exception { ArrayList<Mp3Info> people = new ArrayList<Mp3Info>(); ObjectMapper mapper = new ObjectMapper(); while (lines.hasNext()) { String line = lines.next(); try { people.add(mapper.readValue(line, Mp3Info.class)); } catch (Exception e) { e.printStackTrace(); } } return people.iterator(); } } class WriteJson implements FlatMapFunction<Iterator<Mp3Info>, String> { public Iterator<String> call(Iterator<Mp3Info> song) throws Exception { ArrayList<String> text = new ArrayList<String>(); ObjectMapper mapper = new ObjectMapper(); while (song.hasNext()) { Mp3Info person = song.next(); text.add(mapper.writeValueAsString(person)); } return text.iterator(); } } class Mp3Info implements Serializable{ /* {"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"} {"name":"一輩子何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"} {"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"} {"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"} {"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"} */ private String name; private String album; private String path; private String singer; public String getSinger() { return singer; } public void setSinger(String singer) { this.singer = singer; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAlbum() { return album; } public void setAlbum(String album) { this.album = album; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } @Override public String toString() { return "Mp3Info [name=" + name + ", album=" + album + ", path=" + path + ", singer=" + singer + "]"; } } /* {"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"} {"name":"一輩子何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"} {"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"} {"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"} {"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"} */
import java.io.StringReader; import java.io.StringWriter; import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import scala.Tuple2; import au.com.bytecode.opencsv.CSVReader; import au.com.bytecode.opencsv.CSVWriter; public class SparkIO_CSV { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); readCsv2(sc); sc.stop(); sc.close(); } static void readCsv1(JavaSparkContext sc) { JavaRDD<String> csvFile1 = sc.textFile("file:///d:/jsontext/csvsong.csv"); // csvFile1.foreach(x->System.out.println(x)); JavaRDD<String[]> csvData = csvFile1.map(new ParseLine()); csvData.foreach(x->{ for(String s : x){ System.out.println(s); } } ); } static void writeCsv1(JavaSparkContext sc) { JavaRDD<String> csvFile1 = sc.textFile("file:///d:/jsontext/csvsong.csv"); JavaRDD<String[]> parsedData = csvFile1.map(new ParseLine()); parsedData = parsedData.filter(x->x[2].equals("懷舊專輯")); //過濾 若是在這裏存文件的話,存的是數組類型的對象 parsedData.foreach( x->{ long id = Thread.currentThread().getId(); System.out.println("在線程 "+ id +" 中" + "打印當前數據元素:"); for(String s : x){ System.out.print(s+ " "); } System.out.println(); } ); parsedData.map(x->{ StringWriter stringWriter = new StringWriter(); CSVWriter csvWriter = new CSVWriter(stringWriter); csvWriter.writeNext(x); //把數組轉換成爲CSV的格式 csvWriter.close(); return stringWriter.toString(); }).saveAsTextFile("file:///d:/jsontext/csvout"); } public static class ParseLine implements Function<String, String[]> { public String[] call(String line) throws Exception { CSVReader reader = new CSVReader(new StringReader(line)); String[] lineData = reader.readNext(); reader.close(); //關閉流資源 // String[] lineData =line.split(","); //這樣還有 return lineData; } } static void readCsv2(JavaSparkContext sc){ //若是文件中有斷行,wholetextfile能夠跳行 JavaPairRDD<String, String> csvData = sc.wholeTextFiles("d:/jsontext/csvsong.csv"); JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLineWhole()); keyedRDD.foreach(x-> { for(String s : x){ System.out.println(s); } } ); } public static class ParseLineWhole implements FlatMapFunction<Tuple2<String, String>, String[]> { public Iterator<String[]> call(Tuple2<String, String> file) throws Exception { CSVReader reader = new CSVReader(new StringReader(file._2())); Iterator<String[]> data = reader.readAll().iterator(); reader.close(); return data; } } } /* "上海灘","葉麗儀","香港電視劇主題歌","mp3/shanghaitan.mp3" "一輩子何求","陳百強","香港電視劇主題歌","mp3/shanghaitan.mp3" "紅日","李克勤","懷舊專輯","mp3/shanghaitan.mp3" "愛如潮水","張信哲","懷舊專輯","mp3/airucaoshun.mp3" "紅茶館","陳惠嫻","懷舊專輯","mp3/redteabar.mp3" */
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.serializer.KryoSerializer; import scala.Tuple2; public class SparkIO_SeqFile { public static void main(String[] args) { //多線程,開了兩個線程 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO") .set("spark.testing.memory", "2147480000") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); //sequenceFile存取的是鍵值對,是序列化文本文件(將對象轉換爲二進制形式) writeSeqFile(sc); readSeqFile(sc); sc.stop(); sc.close(); } private static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> { public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) { return new Tuple2<String, Integer>(record._1.toString(), record._2.get()); } } private static void writeSeqFile(JavaSparkContext sc) { List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<String, Integer>("ABC", 1)); data.add(new Tuple2<String, Integer>("DEF", 3)); data.add(new Tuple2<String, Integer>("GHI", 2)); data.add(new Tuple2<String, Integer>("JKL", 4)); data.add(new Tuple2<String, Integer>("ABC", 1)); // JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(("d",1)),1); //設置分區數,有多少個分區數就有多少個輸出文件 JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data, 1); String dir = "file:///D:jsontext/sequenceFile"; //sequenceFile將鍵值對使用maptoPair裝換爲文本類型的鍵值對 JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes()); //四個參數,文件名,輸出鍵值對的類型,輸出格式 saveAsNewAPIHadoopFile是新接口 result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class, SequenceFileOutputFormat.class); } static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> { public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) { return new Tuple2<Text, IntWritable>(new Text(record._1), new IntWritable(record._2)); } } private static void readSeqFile(JavaSparkContext sc) { //讀取sequenceFile文件,輸出到PairRDD,三個參數,文件名,輸入鍵值對類型 JavaPairRDD<Text, IntWritable> input = sc.sequenceFile( "file:///D:/jsontext/sequenceFile", Text.class, IntWritable.class); // input.foreach(System.out::println); //調用mapToPair將文件的鍵值對裝換爲string的鍵值對類型,輸出 JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes()); result.foreach(x->System.out.println(x)); } }
import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkIO_ObjFile { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); writeObjFile(sc); //文件所讀取的對象是person對象,輸出的形式爲person對象,因此若是沒有了person對象,foreach輸出將會報錯 readObjFile(sc); sc.stop(); sc.close(); } private static void readObjFile(JavaSparkContext sc) { //object二進制文件讀取爲rdd JavaRDD<Object> input = sc.objectFile("file:///D:/jsontext/objFile"); //輸出object文件時自動讀取引用的person對象,若是person對象不存在,將會報錯,終止操做 input.foreach(x->System.out.println(x)); } private static void writeObjFile(JavaSparkContext sc) { List<Person> data = new ArrayList<Person>(); data.add(new Person("ABC", 1)); data.add(new Person("DEF", 3)); data.add(new Person("GHI", 2)); data.add(new Person("JKL", 4)); data.add(new Person("ABC", 1)); //設置分區數,多少個分區數有多少個個輸出文件 JavaRDD<Person> rdd = sc.parallelize(data, 2); //將文件保存爲textFile類型,輸出爲文本文件,可見的文本爲tostring方法 String dir = "file:///D:/jsontext/textFile"; rdd.saveAsTextFile(dir); //輸出爲objectFile類型,爲二進制文件,此文件保存的是對象的類型和值,類型爲文本類型,值爲二進制類型,使用saveAsObject方法存到文件 //objectFile存儲只包含值的rdd String dir1 = "file:///D:/jsontext/objFile"; rdd.saveAsObjectFile(dir1); } static class Person implements Serializable{ public Person(String name, int id) { super(); this.name = name; this.id = id; } @Override public String toString() { return "Person [name=" + name + ", id=" + id + "]"; } String name; int id; } }
一、例如:KeyValueTextInputFormat 是最簡單的 Hadoop 輸入格式之一,能夠用於從文本文件中讀取 鍵值對數據。每一行都會被獨立處理,鍵和值之間用製表符隔開。
newAPIHadoopFile/saveAsNewAPIHadoopFile
二、非文件系統數據(HBase/MongoDB)
使用newAPIHadoopDataset/saveAsNewAPIHadoopDataset
三、Protocol buffer(簡稱 PB,https://github.com/google/protobuf)
一、本地文件系統
file:///D:/sequenceFile
file:///home/sequenceFile
Spark 支持從本地文件系統中讀取文件,不過它要求文件在集羣中全部節點的相同路徑下 均可以找到。
一些像 NFS、AFS 以及 MapR 的 NFS layer 這樣的網絡文件系統會把文件以本地文件系統 的形式暴露給用戶。若是你的數據已經在這些系統中,那麼你只須要指定輸入爲一個 file:// 路徑;只要這個文件系統掛載在每一個節點的同一個路徑下,Spark 就會自動處理(如例 5-29 所示)。若是文件尚未放在集羣中的全部節點上,你能夠在驅動器程序中從本地讀取該文件而無 需使用整個集羣,而後再調用 parallelize 將內容分發給工做節點。不過這種方式可能會 比較慢,因此推薦的方法是將文件先放到像 HDFS、NFS、S3 等共享文件系統上。
二、 網絡文件系統
file:///hdfs:localhost:7088/ sequenceFile
一、JDBC
二、Cassandra
三、HBase
四、Elasticsearch