spark Core的使用基礎知識java
rdd爲spark的一個分佈式數據源的計算的抽象sql
sparkContext爲spark環境上下文用於保持集羣鏈接,建立RDD 並行數據 accumular boardcast變量 用戶建立spark job做業apache
SparkConf conf = new SparkConf().setAppName("indeximage").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); JavaPairRDD<String, PortableDataStream> imagefiles=context.binaryFiles("C:/baidu/features/sift", 2);
RDD是個分佈式不變的抽象數據計算源 ,被劃分多個分區,並在多臺機器上分佈計算,懶加載模式,當rdd開始計算時候纔會加載來源使用,因此使用時候rdd 能夠設置store級別,能夠存儲在內存 磁盤 或者 系列化 等幾種方式 默認cache就是存儲在內存中 其餘幾種見storagelevel數組
JavaRDD<File> imagefiles=context.parallelize(getFiles("C:/baidu/imagetest/"), 2);//.binaryFiles("C:/baidu/imagetest/*").coalesce(1); imagefiles.persist(StorageLevel.MEMORY_ONLY());
它可以從多種數據來源加載 經常使用的數組 文件 以及 hadoop格式文件(能夠用來自定義後面叫你們建立自定義)dom
context.textFile(path)
機器學習
context.paralllize/binarryFile分佈式
spark有不少行爲操做,這裏不一一介紹了 只介紹 幾種經常使用的 ,好比ide
1,map map其實就是一個轉換 從一個格式 轉換從 另外一種格式的操做 oop
2,flatMap 同map基本一致,可是flatmap 返回的是一個數組的格式 學習
3.filter 主要是用來過濾掉rdd中 的數據
另外javaRDD 與 javaPairRDD 基本無非就是List 與 map的區別,分佈式上 key value的轉換
因此會javaPairRDD會涉及一些group by key combie key value的操做
因爲就是key value map能夠至關於list 因此我這裏只講一下List的狀況,其餘都是相似的方法加上byKey values等等關鍵字
distinct() 同sql 同樣 去重 union() 由於rdd是不可變 因此外部添加聯合 intersection() 交集 subtract() 差集 cartesian() 笛卡爾積 reduce() reduce運算 把多個rdd的的元素最後合併成一個值 aggregate()/fold() 同上面相似,不過能夠設定初值 以及 聚合後的值類型能夠與合併前的不同,好比多個integer 聚合成double collect() 把RDD中全部元素 最後聚合彙總到主分區中 返回List 或者 map foreach 故名思意 遍歷 rdd的元素 並進行操做,好比遍歷元素把它存儲到Hbase中 cogroup、join 前者就是全鏈接 後者 就是內關聯 count 同sql同樣統計數目 coalesce、repartition 意義同樣 都是設定RDD分區數目
Spark 計數器 同hadoop中的couter差很少 ,不過主要是外部最後使用 ,內部中不能使用實時的值,智能使用localValue
Accumulator<Integer> counter = sc.accumulator(0); counter.add(1);
spark 共享數據變量
不少狀況須要數據變量進行共享
Broadcast<Object> share = sc.broadcast(object) 發送廣播通知 會實時更新,在內部能夠獲取調用
Spark的MLIB的使用基礎知識
本地向量
本地向量的基類是 Vector,咱們提供了兩個實現 DenseVector 和 SparseVector。咱們建議經過 Vectors中實現的工廠方法來建立本地向量:(注意:Scala語言默認引入的是 scala.collection.immutable.Vector,爲了使用MLlib的Vector,你必須顯示引入org.apache.spark.mllib.linalg.Vector。)
import org.apache.spark.mllib.linalg.{Vector, Vectors} // Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries. val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
2.含類標籤的點
含有類標籤的點經過case class LabeledPoint來表示。
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // Create a labeled point with a positive label and a dense feature vector. val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // Create a labeled point with a negative label and a sparse feature vector. val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
3.稀疏數據Sparse data
實際運用中,稀疏數據是很常見的。MLlib能夠讀取以LIBSVM格式存儲的訓練實例,LIBSVM格式是 LIBSVM 和 LIBLINEAR的默認格式,這是一種文本格式,每行表明一個含類標籤的稀疏特徵向量。格式以下:
label index1:value1 index2:value2 ...
索引是從 1 開始而且遞增。加載完成後,索引被轉換爲從 0 開始。
經過 MLUtils.loadLibSVMFile讀取訓練實例並以LIBSVM 格式存儲。
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
4.本地矩陣
一個本地矩陣由整型的行列索引數據和對應的 double 型值數據組成,存儲在某一個機器中。MLlib 支持密集矩陣(暫無稀疏矩陣!),實體值以列優先的方式存儲在一個 double數組中。
本 地 矩 陣 的 基 類 是 Matrix , 我 們 提 供 了 一 個 實 現 DenseMatrix 。 我 們 建 議 經過 Matrices 中實現的工廠方法來建立本地矩陣:
import org.apache.spark.mllib.linalg.{Matrix, Matrices} // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
5.分佈式矩陣
一個分佈式矩陣由 long 型行列索引數據和對應的 double 型值數據組成,分佈式存儲在一個或多個 RDD 中。對於巨大的分佈式的矩陣來講,選擇正確的存儲格式很是重要。將一個分佈式矩陣轉換爲另外一個不一樣格式須要全局洗牌(shuffle),因此代價很高。目前,實現了三類分佈式矩陣存儲格式。最基本的類型是 RowMatrix。一個 RowMatrix 是一個面向行的分佈式矩陣,其行索引是沒有具體含義的。好比一系列特徵向量的一個集合。經過一個 RDD 來表明全部的行,每一行就是一個本地向量。對於 RowMatrix,咱們假定其列數量並不巨大,因此一個本地向量能夠恰當的與驅動節點(driver)交換信息,而且可以在某一節點中存儲和操做。
IndexedRowMatrix 與 RowMatrix 類似,但有行索引,能夠用來識別行和進行 join 操做。而 CoordinateMatrix 是一個以三元組列表格式(coordinate list ,COO)存儲的分佈式矩陣,其實體集合是一個 RDD。注 意 : 因 爲 我 們 需 要 緩 存 矩 陣 大 小 , 分 布 式 矩 陣 的 底 層 RDD 必 須 是 確 定 的(deterministic)。一般來講,使用非肯定的 RDD(non-deterministic RDDs)會致使錯誤。
5.1 面向行的分佈式矩陣(RowMatrix)
一個 RowMatrix 是一個面向行的分佈式矩陣,其行索引是沒有具體含義的。好比一系列特徵向量的一個集合。經過一個 RDD 來表明全部的行,每一行就是一個本地向量。既然每一行由一個本地向量表示,因此其列數就被整型數據大小所限制,其實實踐中列數是一個很小的數值。
一個 RowMatrix可從一個RDD[Vector]實例建立。而後咱們能夠計算出其概要統計信息。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols()
5.2行索引矩陣(IndexedRowMatrix)
IndexedRowMatrix 與 RowMatrix 類似,但其行索引具備特定含義,本質上是一個含有索引信息的行數據集合(an RDD of indexed rows)。每一行由 long 型索引和一個本地向量組成。一個 IndexedRowMatrix可從一個RDD[IndexedRow]實例建立,這裏的 IndexedRow是 (Long, Vector) 的 封 裝 類 。 剔 除 IndexedRowMatrix 中 的 行 索 引 信 息 就 變 成 一 個RowMatrix。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix(
)
5.3三元組矩陣(CoordinateMatrix)
一個 CoordinateMatrix 是一個分佈式矩陣,其實體集合是一個 RDD。每個實體是一個(i: Long, j: Long, value: Double)三元組,其中 i 表明行索引,j 表明列索引,value 表明實體的值。只有當矩陣的行和列都很巨大,而且矩陣很稀疏時才使用 CoordinateMatrix。
一個 CoordinateMatrix可從一個RDD[MatrixEntry]實例建立,這裏的 MatrixEntry是 (Long, Long, Double) 的 封 裝 類 。 通 過 調 用 toIndexedRowMatrix 可 以 將 一 個CoordinateMatrix轉變爲一個IndexedRowMatrix(但其行是稀疏的)。目前暫不支持其餘計算操做。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries) // Get its size. val m = mat.numRows() val n = mat.numCols() // Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
Spark MLIB測試基本使用
理解了vector 就是double[] 而後咱們就好辦了 只須要double[] 轉爲vector 或者加上標籤的labelPoint 帶入spark 的Mlib包的機器學習的類 就好了 下面咱們看看kmean聚類使用。其餘相似
static{ System.loadLibrary(Core.NATIVE_LIBRARY_NAME); } final static FeatureDetector detector = FeatureDetector.create(FeatureDetector.ORB);//ORB final static DescriptorExtractor extractor = DescriptorExtractor.create(DescriptorExtractor.ORB); //BRIEF public static List<double[]> readFeatureByStream(DataInputStream open) throws Exception { MatOfKeyPoint keypoints=new MatOfKeyPoint(); Mat mat=OpenCVUtil.bufferedImageToMat(ImageIO.read(open)); Mat descriptors=new Mat(); detector.detect(mat, keypoints); // List<KeyPoint> referenceKeypointsList = // keypoints.toList(); extractor.compute(mat, keypoints, descriptors); int numPoints = (int) keypoints.rows(); int descrpnum=(int) descriptors.rows(); // double[][] descriptions = new double[numPoints][descrpnum]; List<double[]> descriptions=Lists.newArrayList(); System.out.println(numPoints+"=============="+descrpnum+"=================="+descriptors.rows()+"=================="+descriptors.cols()); for (int i = 0; i < descriptors.rows(); i++) { int cols=descriptors.cols(); double[] desc=new double[cols]; for (int j = 0; j < cols; j++) { desc[j]=descriptors.get(i, j)[0]; } //descriptions[i]=desc; descriptions.add(desc); } return descriptions; }
SparkConf conf = new SparkConf().setAppName("indeximage").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD<File> imagefiles=context.parallelize(getFiles("C:/baidu/imagetest/"), 2);//.binaryFiles("C:/baidu/imagetest/*").coalesce(1); imagefiles.persist(StorageLevel.MEMORY_ONLY()); JavaRDD<Vector> vectors=imagefiles.map(new Function<File, List<Vector>>() { @Override public List<Vector> call(File v1) throws Exception { try{ List<Vector> sample=Lists.newArrayList(); final List<double[]> fkeys =readFeatureByFile(v1); final int[] indices = RandomData.getUniqueRandomInts((int) (fkeys.size() * 0.1f), 0, fkeys.size()); for (int i : indices) { sample.add(Vectors.dense(fkeys.get(i))); } return sample; }catch(Exception e){ e.printStackTrace(); return null; } } }).flatMap(new FlatMapFunction<List<Vector>, Vector>() { //這裏多餘的其實不須要這樣寫,我爲了演示flatMap的用法 @Override public Iterable<Vector> call( List<Vector> t) throws Exception { return t; } });//.repartition(1); vectors.persist(StorageLevel.MEMORY_ONLY()); int numClusters = 64; int numIterations = 1000; long startTime = System.nanoTime(); // BisectingKMeans kmeans=new BisectingKMeans(); // kmeans.setK(64); // kmeans.setMaxIterations(100); // kmeans.setMinDivisibleClusterSize(1.0); // BisectingKMeansModel clusters=kmeans.run(vectors.rdd()); KMeansModel clusters = KMeans.train(vectors.rdd(), numClusters, numIterations); //KMeansModel clusters = KMeans.train(vectors.rdd(), numClusters, numIterations); System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"); double WSSSE = clusters.computeCost(vectors.rdd()); long endTime = System.nanoTime(); System.out.println("Execution Time: " + (endTime - startTime)/1000000 + " ms"); System.out.println("Within Set Sum of Squared Errors = " + WSSSE); System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$聚類成功¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥"+clusters.k()+"============="); Vector[] vs=clusters.clusterCenters();
下面運行結果展現
這是我本地剛剛運行的結果。,。。spark很方便 跟storm 同樣 本地 調試很是方便快捷。