MLlib支持存儲在單機上的local vectors和metrices,也支持分佈式的matrics(背後經過一或多個RDD實現)。
local vectors和local matrices都是簡單數據類型,做爲公共接口使用。
底層的線性算法操做則由Breeze和jblas來實現。MLlib中,監督學習的一個訓練樣本,被稱爲「labeled point」。html
存儲在單機上的local vector,由一個整數類型的從0開始的索引(indice),double類型的值(value)組成。MLlib支持兩種類型的local vectors: dense和sparse。dense vector 背後經過一個double array來表示它的條目值,而sparse vector則由兩個並列數組實現:索引(indices)和值(values)。例如,一個vector(1.0, 0.0, 3.0),能夠表示成dense格式:[1.0, 0.0, 3.0],也能夠表示成sparse格式:(3, [0, 2], [1.0, 3.0]),其中,3就是vector的size。算法
import org.apache.spark.mllib.linalg.{Vector, Vectors} // Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries. val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
local matrix由整數型的行索引、列索引(indices),以及浮點型的值(values)組成,存儲在單機上。MLlib支持dense matrices,它的條目值存儲在單個double array上,以列爲主(column-major)的順序。而sparse matrices,它是非零條目值以壓縮稀疏列(CSC: Compressed Sparse Column)的格式存儲,以列爲主(column-major)的順序。apache
local matrices的基類是Matrix,它提供了兩種實現:DenseMatrix和SparseMatrix. 咱們推薦使用Matrices的工廠方法來建立local matrices。記住,MLlib的local matrices被存成以列爲主的順序。後端
import org.apache.spark.mllib.linalg.{Matrix, Matrices} // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) // https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.mllib.linalg.Matrices$ // Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) // colPtrs: Array[Int], rowIndices: Array[Int] val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
distributed matrix由long型的行索引和列索引,以及double型的值組成,以一或多個RDD的方式分佈式存儲。選擇合適的格式來存儲分佈式大矩陣至關重要。將一個分佈式矩陣轉換成一個不一樣的格式,可能須要一個全局的shuffle,計算代價高昂!至今支持三種類型的分佈式矩陣。api
基本類型被稱爲RowMatrix。數組
一個RowMatrix是以行爲主的分佈式矩陣,它沒有行索引,只是一個特徵向量的集合。背後由一個包含這些行的RDD實現,其中,每一個行(row)都是一個local vector。咱們假設,對於一個RowMatrix,列的數目並不大,於是,單個local vector能夠合理地與driver進行通訊,也可使用單個節點被存儲/操做。app
一個IndexedRowMatrix與一個RowMatrix類似,但有行索引,它能夠被用來標識出行(rows)以及正在執行的join操做(executing joins)。分佈式
一個CoordinateMatrix是一個分佈式矩陣,以coordinate list(COO)的格式進行存儲,後端由一個包含這些條目的RDD實現。學習
注意:spa
一個分佈式矩陣的底層RDD實現必須是肯定的(deterministic),由於咱們會對matrix size進行cache。總之,使用非肯定的RDD會致使errors。
rowMatrix是面向row的分佈式矩陣,沒有行索引,背後由一個包含這些行的RDD實現,基中,每一個行是一個local vector。由於每一個row都被表示成一個local vector,列的數目被限制在一個整數範圍內,實際使用時會更小。
RowMatrix能夠經過一個RDD[Vector]實例被建立。接着,咱們能夠計算它的列概括統計(column summary statistics),以及分解(decompositions)。QR deceompsition的格式:A=QR,其中Q是一個正交矩陣(orthogonal matrix),R是一個上三角矩陣(upper triangular matrix)。對於SVD和PCA,請參考降維這一節。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // QR decomposition val qrResult = mat.tallSkinnyQR(true)
IndexedRowMatrix與RowMatrix類似,但有行索引。它背後由一個帶索引的行的RDD實現,所以,每一個行能夠被表示成long型索引和local vector。
一個IndexedRowMatrix由RDD[IndexedRow]實例實現,其中,IndexedRow是一個在(Long,Vector)上的封裝wrapper。IndexedRowMatrix能夠被轉換成一個RowMatrix,經過drop掉它的行索引來完成。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix()
CoordinateMatrix是一個分佈式矩陣,背後由一個包含這些條目(entries)的RDD實現。每一個條目(entry)是一個三元組(tuple):(i: Long, j: Long, value: Double), 其中: i是行索引,j是列索引,value是entry value。當矩陣的維度很大,而且很稀疏時,推薦使用CoordinateMatrix。
CoordinateMatrix能夠經過一個RDD[MatrixEntry]實例來建立,其中MatrixEntry是一個(Long,Long,Double)的Wrapper。經過調用toIndexedRowMatrix,一個CoordinateMatrix能夠被轉化成一個帶有稀疏行的IndexedRowMatrix。CoordinateMatrix的其它計算目前不支持。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries) // Get its size. val m = mat.numRows() val n = mat.numCols() // Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
BlockMatrix是一個分佈式矩陣,背後由一個MatrixBlocks的RDD實現,其中MatrixBlock是一個tuple: ((Int,Int),Matrix),其中(Int,Int)是block的索引,Matrix是由rowsPerBlock x colsPerBlock的sub-matrix。BlockMatrix支持方法:add和multiply。BlockMatrix也有一個helper function:validate,它能夠被用於確認BlockMatrix的設置是否正確。
BlockMatrix 能夠由一個IndexedRowMatrix或CoordinateMatrix,經過調用toBlockMatrix很容易地建立。toBlockMatrix缺省會建立1024x1024的blocks。能夠經過toBlockMatrix(rowsPerBlock, colsPerBlock)進行修改。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate() // Calculate A^T A. val ata = matA.transpose.multiply(matA)