入門知識
PySpark MLlib
1、基本介紹
這裏是MLlib,但目前推薦使用ml庫直接針對DataFrame,這裏使用老庫,主要是爲了「瞭解」。html
Apache Spark提供了一個名爲 MLlib 的機器學習API。PySpark也在Python中使用這個機器學習API。它支持不一樣類型的算法,以下所述git
-
-
mllib.classification - spark.mllib 包支持二進制分類,多類分類和迴歸分析的各類方法。分類中一些最流行的算法是 隨機森林,樸素貝葉斯,決策樹 等。算法
-
mllib.clustering - 聚類是一種無監督的學習問題,您能夠根據某些類似概念將實體的子集彼此分組。sql
-
mllib.fpm - 頻繁模式匹配是挖掘頻繁項,項集,子序列或其餘子結構,這些一般是分析大規模數據集的第一步。 多年來,這一直是數據挖掘領域的一個活躍的研究課題。apache
-
mllib.linalg - 線性代數的MLlib實用程序。api
-
mllib.recommendation - 協同過濾一般用於推薦系統。 這些技術旨在填寫用戶項關聯矩陣的缺失條目。服務器
-
spark.mllib - 它目前支持基於模型的協同過濾,其中用戶和產品由一小組可用於預測缺失條目的潛在因素描述。 spark.mllib使用交替最小二乘(ALS)算法來學習這些潛在因素。app
-
mllib.regression - 線性迴歸屬於迴歸算法族。 迴歸的目標是找到變量之間的關係和依賴關係。使用線性迴歸模型和模型摘要的界面相似於邏輯迴歸案例。機器學習
2、代碼示範
提供了一個簡單的數據集。ide
使用數據集 - test.data
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
數據集
from __future__ import print_function
from pyspark import SparkContext
# 這裏使用了mllib,比較老的api,最新的是ml。 from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
sc = SparkContext(appName="Pspark mllib Example")
data = sc.textFile("test.data")
# 訓練模型
ratings = data.map(lambda s: s.split(',')).map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# 測試模型
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
3、spark.ml 庫
Ref: http://spark.apache.org/docs/latest/ml-guide.html
As of Spark 2.0, the RDD-based APIs in the spark.mllib
package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml
package.
-
- spark.mllib: 數據抽象是rdd。
- spark.ml: 數據抽象是dataframe。

NLP基礎
1、TF-IDF 單詞的重要性
第一步,分拆單詞,構成單詞集合
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0, "..."),
(1, "..."),
...
]).toDF("label", "sentence")
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()
可見,多出了最後一列:

第二步,Hash成爲特徵向量
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2000)
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words", "rawFeatures").show(truncate=False)
第三步,IDF構造
idf = IDF(InputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
第四步,調權重
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("feature", "label").show(truncate=False)
2、Word2Vec 單詞的向量化
org.apache.spark.ml.feature包:
-
- StringIndexer
- IndexToString
- OneHotEncoder
- VectorIndexer
方法一,StringIndexer
頻率最高的爲0。IndexToString(),是反操做。
from pyspark.ml.feature import StringIndexer # 構建一個DataFrame, 設置StringIndexer的輸入列和輸出列的名字
df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
# 訓練並轉換
model = indexer.fit(df)
indexed = model.transform(df)
indexed.show()

方法二,VectorIndexer
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.linalg import Vector, Vectors
df = spark.createDataFrame([ \
(Vectors.dense(-1.0, 1.0, 1.0), ),
(Vectors.dense(-1.0, 3.0, 1.0), ),
(Vectors.dense( 0.0, 5.0, 1.0), )], ["features"])
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=2)
indexerModel = indexer.fit(df)
categoricalFeatures = indexerModel.categoryMaps.keys()
轉換後查看。
indexed = indexerModel.transform(df)
indexed.show()
第一列,只有兩種,認爲是類別特徵,故轉換;
第二列,有三種,不認爲是類別特徵,故保持;
第三列,只有一種,認爲是列別特徵,故轉換。

工做流 Pipeline
Logistic 迴歸分類器
(1) 得到 SparkSesson
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("WordCount").getOrCreate()
(2) 準備 train 數據

(3) 構建流水線
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
(4) 測試並預測

開始預測以上這些數據,獲得預測結果。
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction= row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))玩兒
決策樹分類器
(1) 引用包

(2) 構造數據
def f(x):
rel = {}
rel['feature'] = Vectors.dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))
ref['label'] = str(x[4])
return rel
data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").
map(lambda line: line.split(',')).
map(lambda p: Row(**f(p))).
toDF() # 成爲二維表
(3) 轉換器

(4) 分類模型
dtClassfier = DecisionTreeClassifier(). \
setLabelCol("indexedLabel"). \
setFeaturesCol("indexedFeatures")
(5) 構建流水線
dtPipeline = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
dtPipelineModel = dtPipeline.fit(trainingData)
dtPredictions = dtPipelineModel.transform(testData)
dtPredictions.select("predictedLabel", "label", "features").show(20)

(6) 評估模型
evaluator = MulticlassClassificationEvaluator(). \
setLabelCol("indexedLabel"). \
setPredictionCol("prediction")
dtAccuracy = evaluator.evaluate(dtPredictions)
AWS ETL Pipeline
1、學習資源
Ref: 使用 AWS Glue 和 Amazon Athena 實現無服務器的自主型機器學習
Ref: AWS Glue 常見問題
Extract is the process of reading data from a database. In this stage, the data is collected, often from multiple and different types of sources.
Transform is the process of converting the extracted data from its previous form into the form it needs to be in so that it can be placed into another database. Transformation occurs by using rules or lookup tables or by combining the data with other data.
Load is the process of writing the data into the target database.
2、代碼示範
ETL具有pipeline的思想,這裏沒用,但能夠加上。
import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import SelectFields
from awsglue.transforms import RenameField
from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader, DynamicFrameWriter, DynamicFrameCollection
from pyspark.context import SparkContext
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
#JOB INPUT DATA
destination = "s3://luiscarosnaprds/gluescripts/results/ClusterResults3.parquet"
namespace = "nyc-transportation-version2"
tablename = "green"
# 固定套路
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#Load table and select fields
datasource0 = glueContext.create_dynamic_frame.from_catalog(name_space = namespace, table_name = tablename)
SelectFields0 = SelectFields.apply(frame = datasource0, paths=["trip_distance","fare_amount","pickup_longitude","pickup_latitude" ])
DataFrame0 = DynamicFrame.toDF(SelectFields0) # 變成了二維表
#------------------------------------------------------------
#Filter some unwanted values
DataFrameFiltered = DataFrame0.filter("pickup_latitude > 40.472278 AND pickup_latitude < 41.160886 AND pickup_longitude > -74.300074 AND pickup_longitude < -71.844077")
#Select features and convert to SparkML required format
features = ["pickup_longitude","pickup_latitude"]
assembler = VectorAssembler(inputCols=features,outputCol='features')
assembled_df = assembler.transform(DataFrameFiltered)
#Fit and Run Kmeans
kmeans = KMeans(k=100, seed=1)
model = kmeans.fit(assembled_df)
transformed = model.transform(assembled_df)
#Save data to destination
transformed.write.mode('overwrite').parquet(destination)
job.commit()
End.