Apache Spark+PyTorch 案例實戰

Apache Spark+PyTorch 案例實戰
 隨着數據量和複雜性的不斷增加,深度學習是提供大數據預測分析解決方案的理想方法,須要增長計算處理能力和更先進的圖形處理器。經過深度學習,可以利用非結構化數據(例如圖像、文本和語音),應用到圖像識別、自動翻譯、天然語言處理等領域。圖像分類:識別和分類圖像,便於排序和更準確的搜索。目標檢測:快速的目標檢測使自動駕駛汽車和人臉識別成爲現實。天然語言處理:準確理解口語,爲語音到文本和智能家居提供動力。
深度學習面臨的挑戰:雖然大數據和人工智能提供了大量的潛力,但從大數據中提取可操做的洞察力並非一項普通的任務。隱藏在非結構化數據(圖像、聲音、文本等)中的大量快速增加的信息,須要先進技術的發展和跨學科團隊(數據工程、數據科學和業務)的密切合做。
    基於Databricks雲平臺可以輕鬆構建、訓練和部署深度學習應用程序。


node

  •  Databricks雲平臺集羣提供一個交互式環境,能夠輕鬆地使用深度學習的框架,如Tensorflow、Keras、Pytorch、Mxnet、Caffe、Cntk和Theano。
  •  Databricks提供處理數據準備、模型訓練和大規模預測的雲集羣平臺。 
  • Spark分佈式計算進行性能優化,能夠在強大的GPU硬件上大規模運行。
  • 交互式數據科學。Databricks雲平臺支持多種編程語言,支持實時數據集的深度學習模型訓練。

目錄

  • Spark+PyTorch案例簡介
  • Spark+PyTorch案例實戰
    • 建立Databricks雲平臺集羣Notebook
    • 導入庫
    • 準備預訓練模型和數據,廣播ResNet50模型
    • 構建函數方法
    • 檢查鮮花數據的文件目錄
    • 將圖像文件名加載到Spark數據框中
    • 構建自定義Pytorch的數據集類ImageDataset、 定義模型預測的函數
    • 將函數包裝爲Pandas UDF, 經過Pandas UDF進行模型預測
    • 加載保存的Parquet文件並檢查預測結果
  • 《Spark大數據商業實戰三部曲》第二版簡介

 
Spark+PyTorch案例簡介


   本文基於AWS+Databricks雲平臺,基於ResNet-50網絡模型,使用Spark3.0.0、Pytorch1.5.1 對鮮花圖像數據(鬱金香、向日葵、玫瑰、蒲公英、菊花)進行分佈式圖像識別實戰。
   
  Spark+PyTorch案例實戰示意圖:


python


  
ResNet網絡模型有不一樣的網絡層數,比較經常使用的是50-layer,101-layer和152-layer,都是由ResNet模塊堆疊在一塊兒實現的,ResNet網絡結構如圖所示。
 
ResNet-50模型的論文連接地址( https://arxiv.org/pdf/1512.03385.pdf ),論文題目:Deep Residual Learning for Image Recognition,做者:Kaiming He、Xiangyu Zhang 、Shaoqing Ren 、Jian Sun。



sql

AWS+Databricks雲平臺基本操做: 編程

Spark+PyTorch案例實戰


  使用Spark+PyTorch案例實戰的步驟以下。
數組

1. 準備預訓練的模型和鮮花集數據。
      從torchvision.models加載預訓練的ResNet-50模型。
      將鮮花數據下載到databricks文件系統空間。
2. Spark加載鮮花數據,並轉換爲Spark數據幀。
3. 經過Pandas UDF進行模型預測。  



性能優化

建立Databricks雲平臺集羣Notebook


在Workspace欄目中單擊右鍵,在彈出菜單欄目中選擇Create,依次輸入名稱、開發語言(選擇python語言)、雲平臺集羣,單擊Create建立Notebook。
 

網絡

導入庫


在啓用CPU的Apache Spark集羣上運行notebook,設置變量cuda=False。在啓用GPU的Apache Spark集羣上運行notebook,設置變量cuda=True; 啓動 Arrow支持。Apache Arrow是一種內存中的列式數據格式,在Spark中用於高效傳輸JVM和Python進程之間的數據。將Spark數據幀轉換爲Pandas數據幀時,可使用Arrow 進行優化;導入pandas、pytorch、pyspark等庫。Pytorch設置是否使用GPU。
數據結構

```python
cuda = False
 
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "2048")


import os
import shutil
import tarfile
import time
import zipfile

try:
    from urllib.request import urlretrieve
except ImportError:
    from urllib import urlretrieve

import pandas as pd

import torch
from torch.utils.data import Dataset
from torchvision import datasets, models, transforms
from torchvision.datasets.folder import default_loader  # private API

from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, FloatType



use_cuda = cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
```


準備預訓練模型和數據,廣播ResNet50模型


定義輸入和輸出目錄。建議使用Databricks Runtime 5.4 ML或更高版本,將訓練集數據保存到Databricks文件系統dbfs:/ml目錄,該文件映射到Driver及Worker節點上的文件/dbfs/ml。dbfs:/ml是一個特殊的文件夾,爲深度學習工做負載提供高性能的I/O。在Spark Driver 節點上加載ResNet50預訓練模型,並廣播ResNet50模型的狀態。
架構

```python
URL = "http://download.tensorflow.org/example_images/flower_photos.tgz"
input_local_dir = "/dbfs/ml/tmp/flower/"
output_file_path = "/tmp/predictions"

bc_model_state = sc.broadcast(models.resnet50(pretrained=True).state_dict())

```


廣播ResNet50預訓練模型參數如圖所示。
 

app

構建函數方法


定義get_model_for_eval方法,返回一個Pytorch ResNet50預訓練模型實例,其加載Spark廣播變量ResNet50模型的參數。定義maybe_download_and_extract方法。從Tensorflow網站(http://download.tensorflow.org/example_images/flower_photos.tgz)下載鮮花文件並解壓縮,解壓的文件包括鬱金香、向日葵、玫瑰、蒲公英、菊花等圖像類型。

```python

def get_model_for_eval():
  """Gets the broadcasted model."""
  model = models.resnet50(pretrained=True)
  model.load_state_dict(bc_model_state.value)
  model.eval()
  return model



def maybe_download_and_extract(url, download_dir):
    filename = url.split('/')[-1]
    file_path = os.path.join(download_dir, filename)
    print(file_path)
    if not os.path.exists(file_path):
        if not os.path.exists(download_dir):
            os.makedirs(download_dir)
            
        file_path, _ = urlretrieve(url=url, filename=file_path)
        print()
        print("Download finished. Extracting files.")

        if file_path.endswith(".zip"):
            # Unpack the zip-file.
            zipfile.ZipFile(file=file_path, mode="r").extractall(download_dir)
        elif file_path.endswith((".tar.gz", ".tgz")):
            # Unpack the tar-ball.
            tarfile.open(name=file_path, mode="r:gz").extractall(download_dir)

        print("Done.")
    else:
        print("Data has apparently already been downloaded and unpacked.")
maybe_download_and_extract(url=URL, download_dir=input_local_dir)
```


檢查鮮花數據的文件目錄


在Databricks文件系統中檢查已經下載鮮花數據的文件目錄。在Databricks雲平臺Notebook中運行上述代碼,運行結果以下:

```python
 print(dbutils.fs.ls("dbfs:/ml/tmp/flower_photos/"))

[FileInfo(path='dbfs:/ml/tmp/flower_photos/LICENSE.txt', name='LICENSE.txt', size=418049), FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/', name='daisy/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/dandelion/', name='dandelion/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/roses/', name='roses/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/sunflowers/', name='sunflowers/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/tulips/', name='tulips/', size=0)]
```


查看Databricks文件系統中菊花目錄的文件信息。

```python
 print(dbutils.fs.ls("dbfs:/ml/tmp/flower_photos/daisy/"))
[FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/100080576_f52e8ee070_n.jpg', name='100080576_f52e8ee070_n.jpg', size=26797), FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/10140303196_b88d3d6cec.jpg', name='10140303196_b88d3d6cec.jpg', size=117247), FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/10172379554_b296050f82_n.jpg', name='10172379554_b296050f82_n.jpg', size=36410), FileInfo 
......

```


也能夠將鮮花文件下載到本地電腦,查看鮮花目錄如圖所示。
 
單擊向日葵的文件目錄,查看向日葵的圖片如圖所示。
 
獲取鮮花數據集各目錄中圖像文件的數量。




```python
local_dir = input_local_dir + 'flower_photos/'
files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(local_dir) for f in filenames if os.path.splitext(f)[1] == '.jpg']
len(files)

```


在Databricks雲平臺Notebook中運行上述代碼,運行結果以下:

```python
Out[44]: 3670 
```


將圖像文件名加載到Spark數據框中

```python
files_df = spark.createDataFrame(
  map(lambda path: (path,), files), ["path"]
).repartition(10)  # number of partitions should be a small multiple of total number of nodes
display(files_df.limit(10))
```


- 第1行代碼調用spark.createDataFrame方法建立數據幀。
- 第2行代碼中createDataFrame方法的第一個輸入參數是map函數,在map函數中遍歷每個圖像的文件名,調用匿名函數將每個文件名組成(path,)的格式;createDataFrame方法的第二個參數是數據幀的列名。
- 第3行代碼調用Spark的repartition方法進行重分區,將圖像文件名的數據分爲10個分區。


在Databricks雲平臺Notebook中運行上述代碼,展現10條記錄的圖像路徑及文件名,運行結果如圖所示:
 
單擊圖中的View文本,能夠查詢Databricks雲平臺Spark Jobs的執行狀況,如圖所示。
 
構建自定義Pytorch的數據集類ImageDataset、 定義模型預測的函數



```python

class ImageDataset(Dataset):
  def __init__(self, paths, transform=None):
    self.paths = paths
    self.transform = transform
  def __len__(self):
    return len(self.paths)
  def __getitem__(self, index):
    image = default_loader(self.paths[index])
    if self.transform is not None:
      image = self.transform(image)
    return image
```


定義模型預測的函數

def predict_batch(paths):
  transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                       std=[0.229, 0.224, 0.225])
  ])
  images = ImageDataset(paths, transform=transform)
  loader = torch.utils.data.DataLoader(images, batch_size=500, num_workers=8)
  model = get_model_for_eval()
  model.to(device)
  all_predictions = []
  with torch.no_grad():
    for batch in loader:
      predictions = list(model(batch.to(device)).cpu().numpy())
      for prediction in predictions:
        all_predictions.append(prediction)
  return pd.Series(all_predictions)

- 第2行代碼使用PyTorch對圖像數據進行數據加強,將多個數據轉換步驟整合在一塊兒。
- 第3行代碼按給定大小進行圖像尺寸變化。
- 第4行代碼圖像中心收縮到給定的大小。
- 第5行代碼將圖像數據或者數組轉換爲Tensor數據結構。
- 第6行代碼對圖像數據按通道進行標準化處理。
- 第10行代碼調用PyTorch的torch.utils.data.DataLoader方法加載圖像數據,每一個批次包含500張圖像。
- 第11行代碼獲取Spark廣播的ResNet50預訓練模型參數實例。
- 第16行代碼獲取每批次圖像數據的預測分類。






在本地測試函數。

```python
predictions = predict_batch(pd.Series(files[:200]))
```


將函數包裝爲Pandas UDF, 經過Pandas UDF進行模型預測

```python
predict_udf = pandas_udf(ArrayType(FloatType()), PandasUDFType.SCALAR)(predict_batch)


predictions_df = files_df.select(col('path'), predict_udf(col('path')).alias("prediction"))
predictions_df.write.mode("overwrite").parquet(output_file_path)
```


 Pandas_UDF是在PySpark 2.3版本中新增的API,Spark經過Arrow傳輸數據,使用Pandas處理數據。Pandas_UDF使用關鍵字pandas_udf做爲裝飾器或聲明一個函數進行定義, Pandas_UDF包括Scalar(標量映射)和Grouped Map(分組映射)等類型。在Databricks雲平臺Notebook中運行上述代碼,運行結果如圖所示:
 
加載保存的Parquet文件並檢查預測結果


```python
result_df = spark.read.load(output_file_path)
display(result_df)
```


在Databricks雲平臺Notebook中運行上述代碼,運行結果如圖所示:


 
其中預測分類值是一個大小爲1000的數組,根據ResNet-50模型預測1000個分類的機率。本案例鮮花的分類實際爲5類:鬱金香、向日葵、玫瑰、蒲公英、菊花,感興趣的同窗能夠改寫ResNet-50模型代碼進行優化,獲得5個類別的預測值。
 


 《Spark大數據商業實戰三部曲》第二版簡介

 https://duanzhihua.blog.csdn.net/article/details/106294896
在大數據和AI緊密協同時代,最佳的AI系統依賴海量數據才能構建出高度複雜的模型,海量數據須要藉助Al才能挖掘出終極價值。本書以數據智能爲靈魂,以Spark 2.4.X版本爲載體,以Spark+ AI商業案例實戰和生產環境下幾乎全部類型的性能調優爲核心,對企業生產環境下的Spark+AI商業案例與性能調優抽絲剝繭地進行剖析。全書共分4篇,內核解密篇基於Spark源碼,從一個實戰案例入手,按部就班地全面解析Spark 2.4.X版本的新特性及Spark內核源碼;商業案例篇選取Spark開發中最具表明性的經典學習案例,在案例中綜合介紹Spark的大數據技術;性能調優篇覆蓋Spark在生產環境下的全部調優技術; Spark+ AI內幕解密篇講解深度學習動手實踐,經過整合Spark、PyTorch以及TensorFlow揭祕Spark上的深度學習內幕。
本書適合全部大數據和人工智能學習者及從業人員使用。對於有豐富大數據和AI應用經驗的人員,本書也能夠做爲大數據和AI高手修煉的參考用書。同時,本書也特別適合做爲高等院校的大數據和人工智能教材。

做者簡介
王家林,Apache Spark執牛耳者現工做於硅谷的AI實驗室,專一於NLP框架超過20本Spark、Al、Android書籍做者Toastmasters International Division Director GRE博士入學考試連續兩次滿分得到者
  
段智華,就任於中國電信股份有限公司上海分公司,系統架構師,CSDN博客專家,專一於Spark大數據技術研發及推廣,跟隨Spark核心源碼技術的發展,深刻研究Spark 2.1.1版本及Spark 2.4.0版本的源碼優化,對Spark大數據處理、機器學習等技術有豐富的實戰經驗和濃厚興趣。


本文的視頻講解課程:
 https://edu.csdn.net/course/detail/29974/433229

相關文章
相關標籤/搜索