做者|LAKSHAY ARORA
編譯|VK
來源|Analytics Vidhyapython
流數據是機器學習領域的一個新興概念git
學習如何使用機器學習模型(如logistic迴歸)使用PySpark對流數據進行預測程序員
咱們將介紹流數據和Spark流的基礎知識,而後深刻到實現部分github
想象一下,每秒有超過8500條微博被髮送,900多張照片被上傳到Instagram上,超過4200個Skype電話被打,超過78000個谷歌搜索發生,超過200萬封電子郵件被髮送(根據互聯網實時統計)。算法
咱們正在之前所未有的速度和規模生成數據。在數據科學領域工做真是太好了!可是,隨着大量數據的出現,一樣面臨着複雜的挑戰。sql
主要是,咱們如何收集這種規模的數據?咱們如何確保咱們的機器學習管道在數據生成和收集後繼續產生結果?這些都是業界面臨的重大挑戰,也是爲何流式數據的概念在各組織中愈來愈受到重視的緣由。緩存
增長處理流式數據的能力將大大提升你當前的數據科學能力。這是業界急需的技能,若是你能掌握它,它將幫助你得到下一個數據科學的角色。bash
所以,在本文中,咱們將瞭解什麼是流數據,瞭解Spark流的基本原理,而後研究一個與行業相關的數據集,以使用Spark實現流數據。服務器
什麼是流數據?session
Spark流基礎
離散流
緩存
檢查點
流數據中的共享變量
累加器變量
廣播變量
利用PySpark對流數據進行情感分析
咱們看到了上面的社交媒體數據——咱們正在處理的數據使人難以置信。你能想象存儲全部這些數據須要什麼嗎?這是一個複雜的過程!所以,在咱們深刻討論本文的Spark方面以前,讓咱們花點時間瞭解流式數據究竟是什麼。
流數據沒有離散的開始或結束。這些數據是每秒從數千個數據源生成的,須要儘快進行處理和分析。至關多的流數據須要實時處理,好比Google搜索結果。
咱們知道,一些結論在事件發生後更具價值,它們每每會隨着時間而失去價值。舉個體育賽事的例子——咱們但願看到即時分析、即時統計得出的結論,以便在那一刻真正享受比賽,對吧?
Spark流是Spark API的擴展,它支持對實時數據流進行可伸縮和容錯的流處理。
在跳到實現部分以前,讓咱們先了解Spark流的不一樣組件。
離散流或數據流表明一個連續的數據流。這裏,數據流要麼直接從任何源接收,要麼在咱們對原始數據作了一些處理以後接收。
構建流應用程序的第一步是定義咱們從數據源收集數據的批處理時間。若是批處理時間爲2秒,則數據將每2秒收集一次並存儲在RDD中。而這些RDD的連續序列鏈是一個不可變的離散流,Spark能夠將其做爲一個分佈式數據集使用。
想一想一個典型的數據科學項目。在數據預處理階段,咱們須要對變量進行轉換,包括將分類變量轉換爲數值變量、刪除異常值等。Spark維護咱們在任何數據上定義的全部轉換的歷史。所以,不管什麼時候發生任何錯誤,它均可以追溯轉換的路徑並從新生成計算結果。
咱們但願Spark應用程序運行24小時 x 7,而且不管什麼時候出現任何故障,咱們都但願它儘快恢復。可是,Spark在處理大規模數據時,出現任何錯誤時須要從新計算全部轉換。你能夠想象,這很是昂貴。
如下是應對這一挑戰的一種方法。咱們能夠臨時存儲計算(緩存)的結果,以維護在數據上定義的轉換的結果。這樣,當出現任何錯誤時,咱們沒必要一次又一次地從新計算這些轉換。
數據流容許咱們將流數據保存在內存中。當咱們要計算同一數據上的多個操做時,這頗有幫助。
當咱們正確使用緩存時,它很是有用,但它須要大量內存。並非每一個人都有數百臺擁有128GB內存的機器來緩存全部東西。
這就引入了檢查點的概念。
檢查點是保存轉換數據幀結果的另外一種技術。它將運行中的應用程序的狀態不時地保存在任何可靠的存儲器(如HDFS)上。可是,它比緩存速度慢,靈活性低。
當咱們有流數據時,咱們可使用檢查點。轉換結果取決於之前的轉換結果,須要保留才能使用它。咱們還檢查元數據信息,好比用於建立流數據的配置和一組DStream(離散流)操做的結果等等。
有時咱們須要爲Spark應用程序定義map、reduce或filter等函數,這些函數必須在多個集羣上執行。此函數中使用的變量將複製到每一個計算機(集羣)。
在這裏,每一個集羣有一個不一樣的執行器,咱們須要一些東西,能夠給咱們這些變量之間的關係。
例如,假設咱們的Spark應用程序運行在100個不一樣的集羣上,捕獲來自不一樣國家的人發佈的Instagram圖片。咱們須要一個在他們的帖子中提到的特定標籤的計數。
如今,每一個集羣的執行器將計算該集羣上存在的數據的結果。可是咱們須要一些東西來幫助這些集羣進行通訊,這樣咱們就能夠獲得聚合的結果。在Spark中,咱們有一些共享變量能夠幫助咱們克服這個問題。
用例,好比錯誤發生的次數、空白日誌的次數、咱們從某個特定國家收到請求的次數,全部這些均可以使用累加器來解決。
每一個集羣上的執行器將數據發送回驅動程序進程,以更新累加器變量的值。累加器僅適用於關聯和交換的操做。例如,sum和maximum有效,而mean無效。
當咱們處理位置數據時,好比城市名稱和郵政編碼的映射,這些都是固定變量。如今,若是任何集羣上的特定轉換每次都須要此類數據,咱們不須要向驅動程序發送請求,由於這太昂貴了。
相反,咱們能夠在每一個集羣上存儲此數據的副本。這些類型的變量稱爲廣播變量。
廣播變量容許程序員在每臺機器上緩存一個只讀變量。一般,Spark會使用有效的廣播算法自動分配廣播變量,但若是咱們有多個階段須要相同數據的任務,咱們也能夠定義它們。
是時候啓動你最喜歡的IDE了!讓咱們在本節中進行寫代碼,並以實際的方式理解流數據。
在本節中,咱們將使用真實的數據集。咱們的目標是在推特上發現仇恨言論。爲了簡單起見,若是推特帶有種族主義或性別歧視情緒,咱們說它包含仇恨言論。
所以,任務是將種族主義或性別歧視的推文與其餘推文進行分類。咱們將使用Tweets和label的訓練樣本,其中label'1'表示Tweet是種族主義/性別歧視,label'0'表示其餘。
爲何這個項目與流處理相關?由於社交媒體平臺以評論和狀態更新的形式接收海量流媒體數據。這個項目將幫助咱們限制公開發布的內容。
你能夠在這裏更詳細地查看問題陳述-練習問題:Twitter情感分析(https://datahack.analyticsvidhya.com/contest/practice-problem-twitter-sentiment-analysis/?utm_source=blog&utm_medium=streaming-data-pyspark-machine-learning-model)。咱們開始吧!
模型構建:咱們將創建一個邏輯迴歸模型管道來分類tweet是否包含仇恨言論。在這裏,咱們的重點不是創建一個很是精確的分類模型,而是查看如何使用任何模型並返回流數據的結果
初始化Spark流上下文:一旦構建了模型,咱們就須要定義從中獲取流數據的主機名和端口號
流數據:接下來,咱們將從定義的端口添加netcat服務器的tweets,Spark API將在指定的持續時間後接收數據
預測並返回結果:一旦咱們收到tweet文本,咱們將數據傳遞到咱們建立的機器學習管道中,並從模型返回預測的情緒
下面是咱們工做流程的一個簡潔說明:
咱們在映射到標籤的CSV文件中有關於Tweets的數據。咱們將使用logistic迴歸模型來預測tweet是否包含仇恨言論。若是是,那麼咱們的模型將預測標籤爲1(不然爲0)。
你能夠在這裏下載數據集和代碼(https://github.com/lakshay-arora/PySpark/tree/master/spark_streaming)。
首先,咱們須要定義CSV文件的模式,不然,Spark將把每列的數據類型視爲字符串。咱們讀取數據並檢查:
# 導入所需庫 from pyspark import SparkContext from pyspark.sql.session import SparkSession from pyspark.streaming import StreamingContext import pyspark.sql.types as tp from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer from pyspark.ml.classification import LogisticRegression from pyspark.sql import Row # 初始化spark session sc = SparkContext(appName="PySparkShell") spark = SparkSession(sc) # 定義方案 my_schema = tp.StructType([ tp.StructField(name= 'id', dataType= tp.IntegerType(), nullable= True), tp.StructField(name= 'label', dataType= tp.IntegerType(), nullable= True), tp.StructField(name= 'tweet', dataType= tp.StringType(), nullable= True) ]) # 讀取數據集 my_data = spark.read.csv('twitter_sentiments.csv', schema=my_schema, header=True) # 查看數據 my_data.show(5) # 輸出方案 my_data.printSchema()
如今咱們已經在Spark數據幀中有了數據,咱們須要定義轉換數據的不一樣階段,而後使用它從咱們的模型中獲取預測的標籤。
在第一階段中,咱們將使用RegexTokenizer 將Tweet文本轉換爲單詞列表。而後,咱們將從單詞列表中刪除停用詞並建立單詞向量。在最後階段,咱們將使用這些詞向量創建一個邏輯迴歸模型,並獲得預測情緒。
請記住,咱們的重點不是創建一個很是精確的分類模型,而是看看如何在預測模型中得到流數據的結果。
# 定義階段1:標記tweet文本 stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W') # 定義階段2:刪除停用字 stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words') # 定義階段3:建立大小爲100的詞向量 stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100) # 定義階段4:邏輯迴歸模型 model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')
讓咱們在Pipeline對象中添加stages變量,而後按順序執行這些轉換。將管道與訓練數據集匹配,如今,每當咱們有新的Tweet時,咱們只須要將其傳遞到管道對象並轉換數據以得到預測:
# 設置管道 pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model]) #擬合模型 pipelineFit = pipeline.fit(my_data)
假設咱們每秒收到數百條評論,咱們但願經過阻止發佈包含仇恨言論的評論的用戶來保持平臺的乾淨。因此,每當咱們收到新的文本,咱們就會把它傳遞到管道中,獲得預測的情緒。
咱們將定義一個函數 get_prediction,它將刪除空白語句並建立一個數據框,其中每行包含一條推特。
所以,初始化Spark流上下文並定義3秒的批處理持續時間。這意味着咱們將對每3秒收到的數據進行預測:
#定義一個函數來計算情感 def get_prediction(tweet_text): try: # 過濾獲得長度大於0的tweets tweet_text = tweet_text.filter(lambda x: len(x) > 0) # 建立一個列名爲「tweet」的數據框,每行將包含一條tweet rowRdd = tweet_text.map(lambda w: Row(tweet=w)) # 建立spark數據框 wordsDataFrame = spark.createDataFrame(rowRdd) # 利用管道對數據進行轉換,獲得預測的情緒 pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show() except : print('No data') # 初始化流上下文 ssc = StreamingContext(sc, batchDuration= 3) # 建立一個將鏈接到hostname:port的數據流,如localhost:9991 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 用一個關鍵字「tweet_APP」分割tweet文本,這樣咱們就能夠從一條tweet中識別出一組單詞 words = lines.flatMap(lambda line : line.split('TWEET_APP')) # 獲取收到的推文的預期情緒 words.foreachRDD(get_prediction) #開始計算 ssc.start() # 等待結束 ssc.awaitTermination()
在一個終端上運行程序並使用Netcat(一個實用工具,可用於將數據發送到定義的主機名和端口號)。可使用如下命令啓動TCP鏈接:
nc -lk port_number
最後,在第二個終端中鍵入文本,你將在另外一個終端中實時得到預測:
視頻演示地址:https://cdn.analyticsvidhya.com/wp-content/uploads/2019/12/final_twitter_sentiment.mp4?_=1
流數據在將來幾年會增長的愈來愈多,因此你應該開始熟悉這個話題。記住,數據科學不只僅是創建模型,還有一個完整的管道須要處理。
本文介紹了Spark流的基本原理以及如何在真實數據集上實現它。我鼓勵你使用另外一個數據集或收集實時數據並實現咱們剛剛介紹的內容(你也能夠嘗試其餘模型)。
原文連接:https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/
歡迎關注磐創AI博客站:
http://panchuang.net/
sklearn機器學習中文官方文檔:
http://sklearn123.com/
歡迎關注磐創博客資源彙總站:
http://docs.panchuang.net/