spark數據源操做

Spark應用的數據源:

1)Driver驅動中的一個集合(parallelizePairs  parallelize)java

2)從本地(file:///d:/test)或者網絡(file:///hdfs:localhost:7777)存上獲取git

    textFile textWholeFilesgithub

3)流式數據源:Socket (socketTextStream)數據庫

 

1、Spark封裝的格式:

一、普通文件apache

二、JSONjson

三、CSVapi

若是CSV的全部數據字段均沒有包含換行符,可使用 textFile() 讀取並解析數據,若是在字段中嵌有換行符,就須要用wholeTextFiles()完整讀入每一個文件,而後解析各段.數組

因爲在 CSV 中咱們不會在每條記錄中輸出字段名,所以爲了使輸出保持一致,須要 建立一種映射關係。一種簡單作法是寫一個函數,用於將各字段轉爲指定順序的數組。網絡

四、sequence file  二進制形式 鍵值對多線程

五、object file  JDK 序列化(看起來是對sequenceFile進行了簡單封裝,他容許存儲只包含值的RDD,和sequenceFile不同的是,對象文件是java序列化寫出的,讀取的對象不能改變(輸出會依賴對象))

普通文件file

import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

import scala.Tuple2;
import au.com.bytecode.opencsv.CSVReader;

import com.fasterxml.jackson.databind.ObjectMapper;

public class SparkIO_File {
    public static void main(String[] args) {
    	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
		JavaSparkContext sc = new JavaSparkContext(conf);
		sc.setLogLevel("WARN");
		fileTest(sc);
		sc.stop();
		sc.close();
	}

    static void fileTest(JavaSparkContext sc){
    	//每行都是rdd
//    	JavaRDD<String> rdd = sc.textFile("file:///E:/codes2016/workspace/Spark1/src/spark1106_StreamSpark/UpdateStateByKeyDemo.java");
		//wholeTextFiles返回一個鍵值對類型,鍵爲文件全路徑,值爲文件內容,分區數是2
    	
 
    	JavaPairRDD<String, String> rdd = sc.wholeTextFiles("file:///E:/codes2016/workspace/Spark1/src/spark1106_StreamSpark");
       	System.out.println("分區數:"+rdd.getNumPartitions());          //分區數爲2
    	rdd.foreach(x->{
			System.out.println("當前元素:" + x);
		});
		System.out.println(rdd.count());
		rdd.saveAsTextFile("file:///d:/jsontext/filewholetext");
    }
}

 

json文件

import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

import scala.Tuple2;
import au.com.bytecode.opencsv.CSVReader;

import com.fasterxml.jackson.databind.ObjectMapper;

public class SparkIO_JSON {
    public static void main(String[] args) {
    	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
		JavaSparkContext sc = new JavaSparkContext(conf);
		sc.setLogLevel("WARN");
		writeJsonTest(sc);
		sc.stop();
		sc.close();
	}

    //讀JSON
    static void readJsonTest(JavaSparkContext sc){
    	//若是json文件中斷了行就讀不出來了,沒截斷的部分任然會顯示
    	JavaRDD<String> input = sc.textFile("file:///d:/jsontext/jsonsong.json");
    	//使用wholetextfile就不會有斷行的錯誤,由於讀的是整個文件 
//    	JavaRDD<String> input = sc.wholeTextFiles("file:///d:/jsontext/jsonsong.json");
//    	JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson());
    	JavaRDD<Mp3Info> result = input.map(x->{
    		ObjectMapper mapper=new ObjectMapper();
    		return mapper.readValue(x, Mp3Info.class);
    	});
    	result.foreach(x->System.out.println(x));
    }
    //寫JSON
    static void writeJsonTest(JavaSparkContext sc){
    	JavaRDD<String> input = sc.textFile("file:///d:/jsontext/jsonsong.json");
    	JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()).
    			                      filter(
    			                          x->x.getAlbum().equals("懷舊專輯")
    			                      );
//    	JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
    	JavaRDD<String> formatted = result.map(x->{
    		ObjectMapper mapper=new ObjectMapper();
    		return mapper.writeValueAsString(x);
    	});
    	result.foreach(x->System.out.println(x));
    	formatted.saveAsTextFile("file:///d:/jsontext/jsonsongout");
    }
}

class ParseJson implements FlatMapFunction<Iterator<String>, Mp3Info>, Serializable {
	public Iterator<Mp3Info> call(Iterator<String> lines) throws Exception {
		ArrayList<Mp3Info> people = new ArrayList<Mp3Info>();
		ObjectMapper mapper = new ObjectMapper();
		while (lines.hasNext()) {
			String line = lines.next();
			try {
				people.add(mapper.readValue(line, Mp3Info.class));
			} catch (Exception e) {
			    e.printStackTrace();
			}
		}
		return people.iterator();
	}
}

class WriteJson implements FlatMapFunction<Iterator<Mp3Info>, String> {
	public Iterator<String> call(Iterator<Mp3Info> song) throws Exception {
		ArrayList<String> text = new ArrayList<String>();
		ObjectMapper mapper = new ObjectMapper();
		while (song.hasNext()) {
			Mp3Info person = song.next();
		    text.add(mapper.writeValueAsString(person));
		}
		return text.iterator();
	}
}

class Mp3Info implements Serializable{
	/*
{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一輩子何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"}
	 */
	private String name;
    private String album;
    private String path;
    private String singer;

    public String getSinger() {
		return singer;
	}
	public void setSinger(String singer) {
		this.singer = singer;
	}    
    public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getAlbum() {
		return album;
	}
	public void setAlbum(String album) {
		this.album = album;
	}
	public String getPath() {
		return path;
	}
	public void setPath(String path) {
		this.path = path;
	}
    @Override
	public String toString() {
		return "Mp3Info [name=" + name + ", album=" 
	             + album + ", path=" + path + ", singer=" + singer + "]";
	}
}

/*
{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一輩子何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"}
 */

 

csv文件

import java.io.StringReader;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

import scala.Tuple2;
import au.com.bytecode.opencsv.CSVReader;
import au.com.bytecode.opencsv.CSVWriter;

public class SparkIO_CSV {
    public static void main(String[] args) {
    	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
		JavaSparkContext sc = new JavaSparkContext(conf);
		sc.setLogLevel("WARN");
		readCsv2(sc);
		sc.stop();
		sc.close();
	}

    static void readCsv1(JavaSparkContext sc) {
    	JavaRDD<String> csvFile1 = sc.textFile("file:///d:/jsontext/csvsong.csv");
//    	csvFile1.foreach(x->System.out.println(x));
    	JavaRDD<String[]> csvData = csvFile1.map(new ParseLine());
    	csvData.foreach(x->{
	    	                  for(String s : x){
	    	                      System.out.println(s);
	    	                  }
    	                   }
    	               );
    }

    static void writeCsv1(JavaSparkContext sc) {
    	JavaRDD<String> csvFile1 = sc.textFile("file:///d:/jsontext/csvsong.csv");
    	JavaRDD<String[]> parsedData = csvFile1.map(new ParseLine());
    	parsedData = parsedData.filter(x->x[2].equals("懷舊專輯"));  //過濾   若是在這裏存文件的話,存的是數組類型的對象
    	parsedData.foreach(
    			            x->{
    			                long id = Thread.currentThread().getId();
					            System.out.println("在線程 "+ id +" 中" + "打印當前數據元素:");
					            for(String s : x){
					                System.out.print(s+ " ");
					            }
					            System.out.println();
                            }
                        );
    	parsedData.map(x->{
    		 StringWriter stringWriter = new StringWriter();
    		 CSVWriter csvWriter = new CSVWriter(stringWriter);
    		 csvWriter.writeNext(x);  //把數組轉換成爲CSV的格式
    		 csvWriter.close();
    		 return stringWriter.toString();
    	}).saveAsTextFile("file:///d:/jsontext/csvout");
    }
    
    public static class ParseLine implements Function<String, String[]> {
   	    public String[] call(String line) throws Exception {
	    	 CSVReader reader = new CSVReader(new StringReader(line));
	    	 String[] lineData = reader.readNext();
	    	 reader.close();    //關閉流資源
//   	    	String[] lineData =line.split(","); //這樣還有
   	    	return lineData;
   	    }
    }

    static void readCsv2(JavaSparkContext sc){
    	//若是文件中有斷行,wholetextfile能夠跳行
    	   JavaPairRDD<String, String> csvData = sc.wholeTextFiles("d:/jsontext/csvsong.csv");
    	   JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLineWhole());
    	   keyedRDD.foreach(x->
				    	       {
					               for(String s : x){
					                   System.out.println(s);
					               }
				               }
                           );
    }
    public static class ParseLineWhole implements FlatMapFunction<Tuple2<String, String>, String[]> {
	    public Iterator<String[]> call(Tuple2<String, String> file) throws Exception {
		    CSVReader reader = new CSVReader(new StringReader(file._2()));
		    Iterator<String[]> data = reader.readAll().iterator();
		    reader.close();
		    return data;
        }
    }
}

/*
"上海灘","葉麗儀","香港電視劇主題歌","mp3/shanghaitan.mp3"
"一輩子何求","陳百強","香港電視劇主題歌","mp3/shanghaitan.mp3"
"紅日","李克勤","懷舊專輯","mp3/shanghaitan.mp3"
"愛如潮水","張信哲","懷舊專輯","mp3/airucaoshun.mp3"
"紅茶館","陳惠嫻","懷舊專輯","mp3/redteabar.mp3"   
 */

 

seq二進制文件

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.serializer.KryoSerializer;

import scala.Tuple2;

public class SparkIO_SeqFile {
    public static void main(String[] args) {
    	//多線程,開了兩個線程
    	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO")
    			.set("spark.testing.memory", "2147480000")
    			.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
		JavaSparkContext sc = new JavaSparkContext(conf);
		sc.setLogLevel("WARN");
		//sequenceFile存取的是鍵值對,是序列化文本文件(將對象轉換爲二進制形式)
		writeSeqFile(sc);
		readSeqFile(sc);
		sc.stop();
		sc.close();
	}
    
	private static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
	    public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
	        return new Tuple2<String, Integer>(record._1.toString(), record._2.get());
	    }
	}
	private static void writeSeqFile(JavaSparkContext sc) {
	    List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
	    data.add(new Tuple2<String, Integer>("ABC", 1));
	    data.add(new Tuple2<String, Integer>("DEF", 3));
	    data.add(new Tuple2<String, Integer>("GHI", 2));
	    data.add(new Tuple2<String, Integer>("JKL", 4));
	    data.add(new Tuple2<String, Integer>("ABC", 1));

//	    JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(("d",1)),1);
	    
	    //設置分區數,有多少個分區數就有多少個輸出文件
	    JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data, 1);
	    String dir = "file:///D:jsontext/sequenceFile";
	    //sequenceFile將鍵值對使用maptoPair裝換爲文本類型的鍵值對
	    JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
	    //四個參數,文件名,輸出鍵值對的類型,輸出格式          saveAsNewAPIHadoopFile是新接口
	    result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
	    
	}

	static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
		public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
		    return new Tuple2<Text, IntWritable>(new Text(record._1), new IntWritable(record._2));
	    }
    }
	private static void readSeqFile(JavaSparkContext sc) {
		
		//讀取sequenceFile文件,輸出到PairRDD,三個參數,文件名,輸入鍵值對類型
		JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(
					                               "file:///D:/jsontext/sequenceFile", 
					                               Text.class,
					                               IntWritable.class);
//		input.foreach(System.out::println);
		//調用mapToPair將文件的鍵值對裝換爲string的鍵值對類型,輸出
	    JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
	    result.foreach(x->System.out.println(x));
	}

	

}

 

object文件

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkIO_ObjFile {
    public static void main(String[] args) {
    	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
		JavaSparkContext sc = new JavaSparkContext(conf);
		sc.setLogLevel("WARN");		
		writeObjFile(sc);
		//文件所讀取的對象是person對象,輸出的形式爲person對象,因此若是沒有了person對象,foreach輸出將會報錯
		readObjFile(sc);
		sc.stop();
		sc.close();
	}

	private static void readObjFile(JavaSparkContext sc) {
		//object二進制文件讀取爲rdd
		JavaRDD<Object> input = sc.objectFile("file:///D:/jsontext/objFile");
		
		//輸出object文件時自動讀取引用的person對象,若是person對象不存在,將會報錯,終止操做
		input.foreach(x->System.out.println(x));
	}
    
	private static void writeObjFile(JavaSparkContext sc) {
	    List<Person> data = new ArrayList<Person>();
	    data.add(new Person("ABC", 1));
	    data.add(new Person("DEF", 3));
	    data.add(new Person("GHI", 2));
	    data.add(new Person("JKL", 4));
	    data.add(new Person("ABC", 1));

	    //設置分區數,多少個分區數有多少個個輸出文件
	    JavaRDD<Person> rdd = sc.parallelize(data, 2);
	    //將文件保存爲textFile類型,輸出爲文本文件,可見的文本爲tostring方法
	    String dir = "file:///D:/jsontext/textFile";
        rdd.saveAsTextFile(dir);  
        
        //輸出爲objectFile類型,爲二進制文件,此文件保存的是對象的類型和值,類型爲文本類型,值爲二進制類型,使用saveAsObject方法存到文件
        //objectFile存儲只包含值的rdd
	    String dir1 = "file:///D:/jsontext/objFile";
        rdd.saveAsObjectFile(dir1);
	}
	
	static class Person implements Serializable{
		public Person(String name, int id) {
			super();
			this.name = name;
			this.id = id;
		}
		@Override
		public String toString() {
			return "Person [name=" + name + ", id=" + id + "]";
		}
		String name;
		int id;
	}
}

 

2、Hadoop支持格式

    一、例如:KeyValueTextInputFormat 是最簡單的 Hadoop 輸入格式之一,能夠用於從文本文件中讀取 鍵值對數據。每一行都會被獨立處理,鍵和值之間用製表符隔開。 

newAPIHadoopFile/saveAsNewAPIHadoopFile

    二、非文件系統數據(HBase/MongoDB)

使用newAPIHadoopDataset/saveAsNewAPIHadoopDataset

    三、Protocol buffer(簡稱 PB,https://github.com/google/protobuf

 

3、文件壓縮

 

4、文件系統

一、本地文件系統

file:///D:/sequenceFile

file:///home/sequenceFile

Spark 支持從本地文件系統中讀取文件,不過它要求文件在集羣中全部節點的相同路徑下 均可以找到。

一些像 NFS、AFS 以及 MapR 的 NFS layer 這樣的網絡文件系統會把文件以本地文件系統 的形式暴露給用戶。若是你的數據已經在這些系統中,那麼你只須要指定輸入爲一個 file:// 路徑;只要這個文件系統掛載在每一個節點的同一個路徑下,Spark 就會自動處理(如例 5-29 所示)。若是文件尚未放在集羣中的全部節點上,你能夠在驅動器程序中從本地讀取該文件而無 需使用整個集羣,而後再調用 parallelize 將內容分發給工做節點。不過這種方式可能會 比較慢,因此推薦的方法是將文件先放到像 HDFS、NFS、S3 等共享文件系統上。

二、 網絡文件系統

file:///hdfs:localhost:7088/ sequenceFile

 

5、數據庫

一、JDBC

二、Cassandra

三、HBase

四、Elasticsearch

相關文章
相關標籤/搜索