Apache Spark+PyTorch 案例實戰
隨着數據量和複雜性的不斷增加,深度學習是提供大數據預測分析解決方案的理想方法,須要增長計算處理能力和更先進的圖形處理器。經過深度學習,可以利用非結構化數據(例如圖像、文本和語音),應用到圖像識別、自動翻譯、天然語言處理等領域。圖像分類:識別和分類圖像,便於排序和更準確的搜索。目標檢測:快速的目標檢測使自動駕駛汽車和人臉識別成爲現實。天然語言處理:準確理解口語,爲語音到文本和智能家居提供動力。
深度學習面臨的挑戰:雖然大數據和人工智能提供了大量的潛力,但從大數據中提取可操做的洞察力並非一項普通的任務。隱藏在非結構化數據(圖像、聲音、文本等)中的大量快速增加的信息,須要先進技術的發展和跨學科團隊(數據工程、數據科學和業務)的密切合做。
基於Databricks雲平臺可以輕鬆構建、訓練和部署深度學習應用程序。
node
- Databricks雲平臺集羣提供一個交互式環境,能夠輕鬆地使用深度學習的框架,如Tensorflow、Keras、Pytorch、Mxnet、Caffe、Cntk和Theano。
- Databricks提供處理數據準備、模型訓練和大規模預測的雲集羣平臺。
- Spark分佈式計算進行性能優化,能夠在強大的GPU硬件上大規模運行。
- 交互式數據科學。Databricks雲平臺支持多種編程語言,支持實時數據集的深度學習模型訓練。
目錄
- Spark+PyTorch案例簡介
- Spark+PyTorch案例實戰
- 建立Databricks雲平臺集羣Notebook
- 導入庫
- 準備預訓練模型和數據,廣播ResNet50模型
- 構建函數方法
- 檢查鮮花數據的文件目錄
- 將圖像文件名加載到Spark數據框中
- 構建自定義Pytorch的數據集類ImageDataset、 定義模型預測的函數
- 將函數包裝爲Pandas UDF, 經過Pandas UDF進行模型預測
- 加載保存的Parquet文件並檢查預測結果
- 《Spark大數據商業實戰三部曲》第二版簡介
Spark+PyTorch案例簡介
本文基於AWS+Databricks雲平臺,基於ResNet-50網絡模型,使用Spark3.0.0、Pytorch1.5.1 對鮮花圖像數據(鬱金香、向日葵、玫瑰、蒲公英、菊花)進行分佈式圖像識別實戰。
Spark+PyTorch案例實戰示意圖:
python
ResNet網絡模型有不一樣的網絡層數,比較經常使用的是50-layer,101-layer和152-layer,都是由ResNet模塊堆疊在一塊兒實現的,ResNet網絡結構如圖所示。
ResNet-50模型的論文連接地址( https://arxiv.org/pdf/1512.03385.pdf ),論文題目:Deep Residual Learning for Image Recognition,做者:Kaiming He、Xiangyu Zhang 、Shaoqing Ren 、Jian Sun。
sql
AWS+Databricks雲平臺基本操做: 編程
Spark+PyTorch案例實戰
使用Spark+PyTorch案例實戰的步驟以下。
數組
1. 準備預訓練的模型和鮮花集數據。
從torchvision.models加載預訓練的ResNet-50模型。
將鮮花數據下載到databricks文件系統空間。
2. Spark加載鮮花數據,並轉換爲Spark數據幀。
3. 經過Pandas UDF進行模型預測。
性能優化
建立Databricks雲平臺集羣Notebook
在Workspace欄目中單擊右鍵,在彈出菜單欄目中選擇Create,依次輸入名稱、開發語言(選擇python語言)、雲平臺集羣,單擊Create建立Notebook。
網絡
導入庫
在啓用CPU的Apache Spark集羣上運行notebook,設置變量cuda=False。在啓用GPU的Apache Spark集羣上運行notebook,設置變量cuda=True; 啓動 Arrow支持。Apache Arrow是一種內存中的列式數據格式,在Spark中用於高效傳輸JVM和Python進程之間的數據。將Spark數據幀轉換爲Pandas數據幀時,可使用Arrow 進行優化;導入pandas、pytorch、pyspark等庫。Pytorch設置是否使用GPU。
數據結構
```python cuda = False spark.conf.set("spark.sql.execution.arrow.enabled", "true") spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "2048") import os import shutil import tarfile import time import zipfile try: from urllib.request import urlretrieve except ImportError: from urllib import urlretrieve import pandas as pd import torch from torch.utils.data import Dataset from torchvision import datasets, models, transforms from torchvision.datasets.folder import default_loader # private API from pyspark.sql.functions import col, pandas_udf, PandasUDFType from pyspark.sql.types import ArrayType, FloatType use_cuda = cuda and torch.cuda.is_available() device = torch.device("cuda" if use_cuda else "cpu") ```
準備預訓練模型和數據,廣播ResNet50模型
定義輸入和輸出目錄。建議使用Databricks Runtime 5.4 ML或更高版本,將訓練集數據保存到Databricks文件系統dbfs:/ml目錄,該文件映射到Driver及Worker節點上的文件/dbfs/ml。dbfs:/ml是一個特殊的文件夾,爲深度學習工做負載提供高性能的I/O。在Spark Driver 節點上加載ResNet50預訓練模型,並廣播ResNet50模型的狀態。
架構
```python URL = "http://download.tensorflow.org/example_images/flower_photos.tgz" input_local_dir = "/dbfs/ml/tmp/flower/" output_file_path = "/tmp/predictions" bc_model_state = sc.broadcast(models.resnet50(pretrained=True).state_dict()) ```
廣播ResNet50預訓練模型參數如圖所示。
app
構建函數方法
定義get_model_for_eval方法,返回一個Pytorch ResNet50預訓練模型實例,其加載Spark廣播變量ResNet50模型的參數。定義maybe_download_and_extract方法。從Tensorflow網站(http://download.tensorflow.org/example_images/flower_photos.tgz)下載鮮花文件並解壓縮,解壓的文件包括鬱金香、向日葵、玫瑰、蒲公英、菊花等圖像類型。
```python def get_model_for_eval(): """Gets the broadcasted model.""" model = models.resnet50(pretrained=True) model.load_state_dict(bc_model_state.value) model.eval() return model def maybe_download_and_extract(url, download_dir): filename = url.split('/')[-1] file_path = os.path.join(download_dir, filename) print(file_path) if not os.path.exists(file_path): if not os.path.exists(download_dir): os.makedirs(download_dir) file_path, _ = urlretrieve(url=url, filename=file_path) print() print("Download finished. Extracting files.") if file_path.endswith(".zip"): # Unpack the zip-file. zipfile.ZipFile(file=file_path, mode="r").extractall(download_dir) elif file_path.endswith((".tar.gz", ".tgz")): # Unpack the tar-ball. tarfile.open(name=file_path, mode="r:gz").extractall(download_dir) print("Done.") else: print("Data has apparently already been downloaded and unpacked.") maybe_download_and_extract(url=URL, download_dir=input_local_dir) ```
檢查鮮花數據的文件目錄
在Databricks文件系統中檢查已經下載鮮花數據的文件目錄。在Databricks雲平臺Notebook中運行上述代碼,運行結果以下:
```python print(dbutils.fs.ls("dbfs:/ml/tmp/flower_photos/")) [FileInfo(path='dbfs:/ml/tmp/flower_photos/LICENSE.txt', name='LICENSE.txt', size=418049), FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/', name='daisy/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/dandelion/', name='dandelion/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/roses/', name='roses/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/sunflowers/', name='sunflowers/', size=0), FileInfo(path='dbfs:/ml/tmp/flower_photos/tulips/', name='tulips/', size=0)] ```
查看Databricks文件系統中菊花目錄的文件信息。
```python print(dbutils.fs.ls("dbfs:/ml/tmp/flower_photos/daisy/")) [FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/100080576_f52e8ee070_n.jpg', name='100080576_f52e8ee070_n.jpg', size=26797), FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/10140303196_b88d3d6cec.jpg', name='10140303196_b88d3d6cec.jpg', size=117247), FileInfo(path='dbfs:/ml/tmp/flower_photos/daisy/10172379554_b296050f82_n.jpg', name='10172379554_b296050f82_n.jpg', size=36410), FileInfo ...... ```
也能夠將鮮花文件下載到本地電腦,查看鮮花目錄如圖所示。
單擊向日葵的文件目錄,查看向日葵的圖片如圖所示。
獲取鮮花數據集各目錄中圖像文件的數量。
```python local_dir = input_local_dir + 'flower_photos/' files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(local_dir) for f in filenames if os.path.splitext(f)[1] == '.jpg'] len(files) ```
在Databricks雲平臺Notebook中運行上述代碼,運行結果以下:
```python Out[44]: 3670 ```
將圖像文件名加載到Spark數據框中
```python files_df = spark.createDataFrame( map(lambda path: (path,), files), ["path"] ).repartition(10) # number of partitions should be a small multiple of total number of nodes display(files_df.limit(10)) ```
- 第1行代碼調用spark.createDataFrame方法建立數據幀。
- 第2行代碼中createDataFrame方法的第一個輸入參數是map函數,在map函數中遍歷每個圖像的文件名,調用匿名函數將每個文件名組成(path,)的格式;createDataFrame方法的第二個參數是數據幀的列名。
- 第3行代碼調用Spark的repartition方法進行重分區,將圖像文件名的數據分爲10個分區。
在Databricks雲平臺Notebook中運行上述代碼,展現10條記錄的圖像路徑及文件名,運行結果如圖所示:
單擊圖中的View文本,能夠查詢Databricks雲平臺Spark Jobs的執行狀況,如圖所示。
構建自定義Pytorch的數據集類ImageDataset、 定義模型預測的函數
```python class ImageDataset(Dataset): def __init__(self, paths, transform=None): self.paths = paths self.transform = transform def __len__(self): return len(self.paths) def __getitem__(self, index): image = default_loader(self.paths[index]) if self.transform is not None: image = self.transform(image) return image ```
定義模型預測的函數
def predict_batch(paths): transform = transforms.Compose([ transforms.Resize(224), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) images = ImageDataset(paths, transform=transform) loader = torch.utils.data.DataLoader(images, batch_size=500, num_workers=8) model = get_model_for_eval() model.to(device) all_predictions = [] with torch.no_grad(): for batch in loader: predictions = list(model(batch.to(device)).cpu().numpy()) for prediction in predictions: all_predictions.append(prediction) return pd.Series(all_predictions)
- 第2行代碼使用PyTorch對圖像數據進行數據加強,將多個數據轉換步驟整合在一塊兒。
- 第3行代碼按給定大小進行圖像尺寸變化。
- 第4行代碼圖像中心收縮到給定的大小。
- 第5行代碼將圖像數據或者數組轉換爲Tensor數據結構。
- 第6行代碼對圖像數據按通道進行標準化處理。
- 第10行代碼調用PyTorch的torch.utils.data.DataLoader方法加載圖像數據,每一個批次包含500張圖像。
- 第11行代碼獲取Spark廣播的ResNet50預訓練模型參數實例。
- 第16行代碼獲取每批次圖像數據的預測分類。
在本地測試函數。
```python predictions = predict_batch(pd.Series(files[:200])) ```
將函數包裝爲Pandas UDF, 經過Pandas UDF進行模型預測
```python predict_udf = pandas_udf(ArrayType(FloatType()), PandasUDFType.SCALAR)(predict_batch) predictions_df = files_df.select(col('path'), predict_udf(col('path')).alias("prediction")) predictions_df.write.mode("overwrite").parquet(output_file_path) ```
Pandas_UDF是在PySpark 2.3版本中新增的API,Spark經過Arrow傳輸數據,使用Pandas處理數據。Pandas_UDF使用關鍵字pandas_udf做爲裝飾器或聲明一個函數進行定義, Pandas_UDF包括Scalar(標量映射)和Grouped Map(分組映射)等類型。在Databricks雲平臺Notebook中運行上述代碼,運行結果如圖所示:
加載保存的Parquet文件並檢查預測結果
```python result_df = spark.read.load(output_file_path) display(result_df) ```
在Databricks雲平臺Notebook中運行上述代碼,運行結果如圖所示:
其中預測分類值是一個大小爲1000的數組,根據ResNet-50模型預測1000個分類的機率。本案例鮮花的分類實際爲5類:鬱金香、向日葵、玫瑰、蒲公英、菊花,感興趣的同窗能夠改寫ResNet-50模型代碼進行優化,獲得5個類別的預測值。
《Spark大數據商業實戰三部曲》第二版簡介
https://duanzhihua.blog.csdn.net/article/details/106294896
在大數據和AI緊密協同時代,最佳的AI系統依賴海量數據才能構建出高度複雜的模型,海量數據須要藉助Al才能挖掘出終極價值。本書以數據智能爲靈魂,以Spark 2.4.X版本爲載體,以Spark+ AI商業案例實戰和生產環境下幾乎全部類型的性能調優爲核心,對企業生產環境下的Spark+AI商業案例與性能調優抽絲剝繭地進行剖析。全書共分4篇,內核解密篇基於Spark源碼,從一個實戰案例入手,按部就班地全面解析Spark 2.4.X版本的新特性及Spark內核源碼;商業案例篇選取Spark開發中最具表明性的經典學習案例,在案例中綜合介紹Spark的大數據技術;性能調優篇覆蓋Spark在生產環境下的全部調優技術; Spark+ AI內幕解密篇講解深度學習動手實踐,經過整合Spark、PyTorch以及TensorFlow揭祕Spark上的深度學習內幕。
本書適合全部大數據和人工智能學習者及從業人員使用。對於有豐富大數據和AI應用經驗的人員,本書也能夠做爲大數據和AI高手修煉的參考用書。同時,本書也特別適合做爲高等院校的大數據和人工智能教材。
做者簡介
王家林,Apache Spark執牛耳者現工做於硅谷的AI實驗室,專一於NLP框架超過20本Spark、Al、Android書籍做者Toastmasters International Division Director GRE博士入學考試連續兩次滿分得到者
段智華,就任於中國電信股份有限公司上海分公司,系統架構師,CSDN博客專家,專一於Spark大數據技術研發及推廣,跟隨Spark核心源碼技術的發展,深刻研究Spark 2.1.1版本及Spark 2.4.0版本的源碼優化,對Spark大數據處理、機器學習等技術有豐富的實戰經驗和濃厚興趣。