使用Spark Streaming與咱們操做RDD的方式很接近,處理數據流也變得簡單了。使用Spark的流處理元素結合MLlib的基於SGD的在線學習能力,能夠建立實時的機器學習模型,當數據流到達時實時更新學習模型。javascript
[Spark] 04 - What is Spark Streamingcss
[Spark] 06 - Structured Streaminghtml5
[Link] http://shartoo.github.io/spark-python-example/ [若干有用例子]java
[Scikit-learn] 1.5 Generalized Linear Models - SGD for Regressionnode
[Scikit-learn] 1.5 Generalized Linear Models - SGD for Classificationpython
[Scikit-learn] 1.1 Generalized Linear Models - Comparing online solversjquery
例如:LinearRegressionWithSGD linux
SGD相關算法:android
Algorithms are all implemented in Scala: 這些個都是隻存在於mllib中的api。
StreamingLinearRegressionWithSGD(...)
Train or predict a linear regression model on streaming data. Training uses Stochastic Gradient Descent to update the model based on each new batch of incoming data from a DStream (see LinearRegressionWithSGD for model equation).
Each batch of data is assumed to be an RDD of LabeledPoints. The number of data points per batch can vary, but the number of features must be constant. An initial weight vector must be provided.
Ref: How does Spark's StreamingLinearRegressionWithSGD work?
spark.ml是否有對應api的問題,只能經過閱讀源代碼才能一探究竟了。
Ref: Spark機器學習9· 實時機器學習(scala with sbt)
電子書下載:https://www.iteblog.com/download/2150.html
代碼下載:https://github.com/PacktPublishing/Machine-Learning-with-Spark-Second-Edition
Chapter 1: Getting Up and Running with Spark
Chapter 2: Math for Machine Learning
Chapter 3: Designing a Machine Learning System
Chapter 4: Obtaining, Processing, and Preparing Data with Spark
Chapter 5: Building a Recommendation Engine with Spark
Chapter 6: Building a Classification Model with Spark
Chapter 7: Building a Regression Model with Spark
Chapter 8: Building a Clustering Model with Spark
Chapter 9: Dimensionality Reduction with Spark
Chapter 10: Advanced Text Processing with Spark
Chapter 11: Real-Time Machine Learning with Spark Streaming
Chapter 12: Pipeline APIs for Spark ML
|
Vectors.dense
稀疏格式表示爲(4,[0,2,3],[1.0,1.0,3.0]) 第一個4表示向量的長度(元素個數),[0,2,3]就是indices數組,[1.0,1.0,3.0]是values數組,表示向量0的位置的值是1.0,2的位置的值是1.0,而3的位置的值是3.0,其餘的位置都是0。
LabeledPoint
//建立一個標籤爲1.0(分類中可視爲正樣本)的稠密向量標註點 scala> val pos = LabeledPoint(1.0, Vectors.dense(2.0, 0.0, 8.0)) pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[2.0,0.0,8.0])
//建立一個標籤爲0.0(分類中可視爲負樣本)的稀疏向量標註點 scala> val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(2.0, 8.0))) neg: org.apache.spark.mllib.regression.LabeledPoint = (0.0, (3,[0,2],[2.0,8.0]))
Spark Streaming 使用 streamingContext.queueStream(queueOfRDDs) 方法能夠建立基於 RDD 隊列的DStream,每一個RDD 隊列將被視爲 DStream 中一塊數據流進行加工處理。
trainOn, update the model by training on batches of data from a DStream. This operation registers a DStream for training the model, and updates the model based on every subsequent batch of data from the stream.
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import datetime
def fnGetAppName():
currentSecond=datetime.datetime.now().second
currentMinute=datetime.datetime.now().minute
currentHour=datetime.datetime.now().hour
currentDay=datetime.datetime.now().day
currentMonth=datetime.datetime.now().month
currentYear=datetime.datetime.now().year
return "{}-{}-{}_{}-{}-{}".format(currentYear, currentMonth, currentDay, currentHour, currentMinute, currentSecond)
def fn_timer(a_func):
def wrapTheFunction():
import time
time_start=time.time()
a_func()
time_end=time.time()
print('totally cost {} sec'.format(time_end-time_start))
return wrapTheFunction
appName = fnGetAppName()
print("appName: {}".format(appName))
conf = SparkConf().setMaster("spark://node-master:7077").setAppName(appName)
# conf = SparkConf().setMaster("local").setAppName(appName)
sc = SparkContext(conf = conf)
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
ssc = StreamingContext(sc, 1)
from __future__ import print_function
# $example on$
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# $example off$
# $example on$
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
# (1)
trainingData = sc.textFile("/test/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("/test/streaming_kmeans_data_test.txt").map(parse)
# (2)
trainingQueue = [trainingData]
testingQueue = [testingData]
# rdd隊列流,做爲模型的「參數」
# More details: [Spark] 04 - What is Spark Streaming
# (3)
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
# (4)
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
# $example off$
print("Final centers: " + str(model.latestModel().centers))
實踐代碼:
/* implement */