使用 Google Cloud 上的 tf.Transform 對 TensorFlow 管道模式進行預處理

文 / Matthias Feys,ML6 首席技術官html

來源 | TensorFlow 公衆號git

機器學習模型須要數據來訓練,可是一般須要對這些數據進行預處理,以便在訓練模型時發揮做用。這種預處理(一般稱爲 「特徵工程」)採用多種形式,例如:規範化和縮放數據,將分類值編碼爲數值,造成詞彙表,以及連續數值的分級。github

在生產過程當中利用機器學習時,爲了確保在模型的離線培訓期間應用的特徵工程步驟與使用模型用於預測時應用的特徵工程步驟保持相同,這每每就成爲一項極具挑戰性的任務。此外,放眼當今世界,機器學習模型會在超大型的數據集上進行訓練,所以在訓練期間應用的預處理步驟將會在大規模分佈式計算框架(例如 Google Cloud Dataflow 或 Apache Spark)上實現。因爲訓練環境一般與服務環境截然不同,在訓練和服務期間執行的特徵工程之間可能會產生不一致的狀況。apache

幸運的是,咱們如今有了 tf.Transform,這是一個 TensorFlow 庫,它提供了一個優雅的解決方案,以確保在訓練和服務期間特徵工程步驟的一致性。 在這篇文章中,咱們將提供在 Google Cloud Dataflow 上使用 tf.Transform,以及在 Cloud ML Engine 上進行模型訓練和服務的具體示例。 注:tf.Transform 連接 github.com/tensorflow/…bash

應用於機器模擬上的變換用例

ecc.ai 是一個有助於優化機器配置的平臺。 咱們模擬物理機器(例如瓶灌裝機或餅乾機)以便找到更優化的參數設置。 因爲每一個模擬的物理機器的目標是具備與實際機器相同的輸入/輸出特性,咱們稱之爲 「數字孿生」。框架

這篇文章將展現這個 「數字孿生」 的設計和實現過程。 在最後一段中,您能夠找到有關咱們以後如何使用這些數字孿生來優化機器配置的更多信息。 注:ecc.ai 連接 ecc.ai/機器學習

tf.Transform 釋義

tf.Transform 是 TensorFlow 的一個庫,它容許用戶定義預處理管道模式並使用大規模數據處理框架運行這些管道模式,同時還以能夠做爲 TensorFlow 圖形的一部分運行的方式導出管道。 用戶經過組合模塊化 Python 函數來定義管道,而後 tf.Transform 隨着 Apache Beam 一塊兒運行。 tf.Transform 導出的 TensorFlow 圖形能夠在使用訓練模型進行預測時複製預處理步驟,好比在使用 TensorFlow Serving 服務模型時。 注:Apache Beam 連接 beam.apache.org/ TensorFlow Serving 連接 ai.googleblog.com/2016/02/run…分佈式

tf.Transform 容許用戶定義預處理管道。 用戶能夠實現預處理數據以用於 TensorFlow 訓練,還能夠將轉換編碼爲 TensorFlow 圖形後導出。而後將該變換圖形結合到用於推斷的模型圖中

tf.Transform 創建數字孿生

數字雙模型的目標是可以根據其輸入預測機器的全部輸出參數。 爲了訓練這個模型,咱們分析了包含這種關係的觀察記錄歷史的日誌數據。 因爲日誌的數據量可能會至關普遍,理想的狀況是應該以分佈式方式運行此步驟。 此外,必須在訓練和服務的時間之間使用相同的概念和代碼,這樣對預處理代碼的改動最小。模塊化

開發伊始,咱們在任何現有的開源項目中都找不到此功能。 所以,咱們開始構建用於 Apache Beam 預處理的自定義工具,這使咱們可以分配咱們的工做負載並輕鬆地在多臺機器之間切換。 可是不太幸運的是,這種方法不容許咱們在服務時(即在生產環境中使用訓練模型時)重複使用相同的代碼做爲 TensorFlow 圖形的一部分運行。函數

在實踐中,咱們必須在 Apache Beam 中編寫自定義分析步驟,計算並保存每一個變量所需的元數據,以便在後續步驟中進行實際的預處理。 咱們在訓練期間使用 Apache Beam 執行後續預處理步驟,並在服務期間做爲 API 的一部分執行。 不幸的是,因爲它不是 TensorFlow 圖形的一部分,咱們不能簡單地使用 ML Engine 將咱們的模型部署爲 API,而咱們的 API 老是由預處理部分和模型部分組成,這使得統一升級變得更加困難。並且,對於全部想要使用的那些已有的和全新的轉換,咱們須要爲此實施和維護分析並轉換步驟。

TensorFlow Transform 解決了這些問題。 自發布以來,咱們將其直接整合爲咱們完整管道模式的主要構建塊。

簡化數字孿生示例流程

咱們如今將專一於構建和使用特定機器的數字孿生。 舉個例子,咱們假設有一個布朗尼麪糰機器。 這臺機器對不一樣的原料進行加熱、攪拌,直到麪糰產生完美的質地。 咱們將從批次問題開始,這意味着數據在完整的生產批次中進行彙總,而不是在接二連三的生產線上進行彙總。

數據 咱們有兩種類型的數據: 輸入數據:原料描述(綠色)和布朗尼麪糰機(藍色)的設置。 您能夠在下面找到列名稱和 3 個示例行。

輸出數據:帶有這些原料的機器設置結果:消耗的能量,輸出的質量度量和輸出量。 您能夠在下面找到列名稱和 3 個示例行。

製做數字孿生

在這裏,咱們在雲存儲中根據兩種不一樣類型文件的歷史日誌數據來訓練系統的數字孿生。 該數字孿生可以基於輸入數據預測輸出數據。上圖顯示咱們在此流程中使用的 Google 服務。

預處理 使用 tf.Transform 函數,Apache Beam 將完成預處理(製做訓練示例)。

預處理階段包括 4 個步驟,代碼以下:

  1. 組合輸入/輸出數據,並製做原始數據 PCollection。
1    raw_data_input = (    
2            p    
3            | 'ReadInputData' >> textio.ReadFromText(train_data_file) 
4            | 'ParseInputCSV'>> beam.Map(converter_input.decode)
5            | 'ExtractBatchKeyIn'>> beam.Map(extract_batchkey))
6
7    raw_data_output = (    
8            p    
9           | 'ReadOutputData' >> textio.ReadFromText(train_data_file)    
10          | 'ParseOutputCSV'>> beam.Map(converter_output.decode)    
11          | 'ExtractBatchKeyOut'>> beam.Map(extract_batchkey))
12
13    raw_data = (    
14          (raw_data_input, raw_data_output)    
15          | 'JoinData' >> CoGroupByKey()    
16          | 'RemoveKeys'>> beam.Map(remove_keys))
複製代碼
  1. 定義將預處理原始數據的預處理功能。 此函數將組合多個 TF-Transform 函數,以生成 TensorFlow Estimators 的示例。

Language: Python

1    def preprocessing_fn(inputs):    
2        """Preprocess input columns into transformed columns."""    
3        outputs = {}    
4        # Encode categorical column: 
5        outputs['Mixing Speed'] = tft.string_to_int(inputs['Mixing Speed'])    
6        # Calculate Derived Features: 
7        outputs['Total Mass'] = inputs['Butter Mass'] + inputs['Sugar Mass'] + inputs['Flour Mass']    
8        for ingredient in ['Butter', 'Sugar', 'Flour']:      
9                ingredient_percentage = inputs['{} Mass'.format(ingredient)] / outputs['Total Mass']       
10                outputs['Norm {} perc'.format(ingredient)] = tft.scale_to_z_score(ingredient_percentage)    
11        # Keep absolute numeric columns 
12        for key in ['Total Volume', 'Energy']:        
13                outputs[key]=inputs[key]    
14        # Normalize other numeric columns 
15        for key in [          
16                        'Butter Temperature',          
17                        'Sugar Humidity',          
18                        'Flour Humidity'          
19                        'Heating Time',          
20                        'Mixing Time',          
21                        'Density',          
22                        'Temperature',          
23                        'Humidity',      
24                ]:        
25                    outputs[key] = tft.scale_to_z_score(inputs[key])  26        # Extract Specific Problems 
27        chunks_detected_str = tf.regex_replace(        
28                inputs['Problems'],        
29                '.*chunk.*'        
30                'chunk',        
31                name='Detect Chunk')    
32        outputs['Chunks']=tf.equal(chunks_detected_str,'chunk')
33    return outputs
複製代碼
  1. 使用預處理功能分析和轉換整個數據集。這部分代碼將採用預處理功能,首先分析數據集,即完整傳遞數據集以計算分類列的詞彙表,而後計算平均值和標準化列的標準誤差。 接下來,Analyze 步驟的輸出用於轉換整個數據集。
1    transform_fn = raw_data | AnalyzeDataset(preprocessing_fn)
2    transformed_data = (raw_data, transform_fn) | TransformDataset()
複製代碼
  1. 保存數據並將 TransformFn 和元數據文件序列化。
1    transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(    
2        transformed_eval_data_base,    
3        coder=example_proto_coder.ExampleProtoCoder(transformed_metadata))
4
5    _ = (    
6        transform_fn    
7        | "WriteTransformFn" >>    
8        transform_fn_io.WriteTransformFn(working_dir))
9
10
11    transformed_metadata | 'WriteMetadata' >> beam_metadata_io.WriteMetadata(    
12            transformed_metadata_file, pipeline=p)
複製代碼

訓練

使用預處理數據做爲 TFRecords,咱們如今可使用 Estimators 輕鬆訓練帶有標準 TensorFlow 代碼的 TensorFlow 模型。

導出訓練的模型

在分析數據集的結構化方法旁邊,tf.Transform 的實際功能在於能夠導出預處理圖。 您能夠導出 TensorFlow 模型,該模型包含與訓練數據徹底相同的預處理步驟。

爲此,咱們只須要使用 tf.Transform 輸入函數導出訓練模型:

1 tf_transform_output = tft.TFTransformOutput(working_dir) 2 serving_input_fn = _make_serving_input_fn(tf_transform_output) 3 exported_model_dir = os.path.join(working_dir, EXPORTED_MODEL_DIR) 4 estimator.export_savedmodel(exported_model_dir, serving_input_fn)

_make_serving_input_fn 函數是一個很是通用的函數,無論項目的邏輯如何,您均可以簡單地在不一樣項目之間重用:

Language: Python

1 def _make_serving_input_fn(tf_transform_output):
2 raw_feature_spec = RAW_DATA_METADATA.schema.as_feature_spec()
3 raw_feature_spec.pop(LABEL_KEY)
4 5 def serving_input_fn():
6 raw_input_fn = input_fn_utils.build_parsing_serving_input_fn(
7 raw_feature_spec)
8 raw_features, _, default_inputs = raw_input_fn()
9 transformed_features = tf_transform_output.transform_raw_features(
10 raw_features)
11 return input_fn_utils.InputFnOps(transformed_features, None, default_inputs)
12 13 return serving_input_fn

使用數字孿生

數字孿生示例流程的最後一部分使用保存的模型根據輸入預測系統的輸出。 這是咱們能夠充分利用 tf.Transform 的地方,由於這使得在 Cloud ML Engine 上部署 「TrainedModel」(包括預處理)變得很是容易。

要部署訓練模型,您只需運行 2 個命令:

1    gcloud ml-engine models create MODEL_NAME
2    gcloud ml-engine versions create VERSION --model=MODEL_NAME --origin=ORIGIN
複製代碼

如今,咱們可使用如下代碼輕鬆地與咱們的數字孿生進行交互

1    def get_predictions(project, model, instances, version=None):    
2        service = discovery.build('ml', 'v1')    
3        name = 'projects/{}/models/{}'.format(project, model)    
4
5        if version is not None:        
6                    name += '/versions/{}'.format(version)    
7
8        response = service.projects().predict(        
9                name=name,        
10                body={'instances': instances}    
11            ).execute()    
12
13            if 'error' in response:        
14                raise RuntimeError(response['error'])
15 
16            return response['predictions']
17
18
19    if __name__ == "__main__":    
20            predictions = get_predictions(        
21                project="<project_id>",        
22                model="<model_name>",        
23                instances=[            
24                        {                  
25                                    "Butter Mass": 121,
26    "Butter Temperature": 20,
27    "Sugar Mass": 200,
28    "Sugar Humidity": 0.22,
29    "Flour Mass ": 50,
30    "Flour Humidity": 0.23,
31    "Heating Time": 50,
32    "Mixing Speed": "Max Speed",
33    "Mixing Time": 200            
34                        }]    
35            )
複製代碼

ecc.ai,咱們使用數字孿生來優化物理機器的參數。

簡而言之,咱們的方法包括 3 個步驟(以下圖 1 所示):

使用歷史機器數據建立模擬環境。機器的這種 「數字孿生」 則將做爲可以容許加強代理來學習最佳控制策略的環境。 利用數字孿生使用咱們的強化學習(RL)代理查找(新的)最佳參數設置。 使用 RL 代理配置真實機器的參數。

總結

經過 tf.Transform,咱們如今已將咱們的模型部署在 ML Engine 上做爲一個 API,成爲特定布朗尼麪糰機的數字孿生:它採用原始輸入功能(成分描述和機器設置),並將反饋機器的預測輸出。

好處是咱們不須要維護 API 而且包含全部內容 - 由於預處理是服務圖形的一部分。 若是咱們須要更新 API,只須要使用最新的版原本刷新模型,全部相關的預處理步驟將會自動爲您更新。

此外,若是咱們須要爲另外一個布朗尼麪糰機器(使用相同數據格式的機器)製做數字孿生模型,可是是在不一樣的工廠或設置中運行,咱們也能夠輕鬆地從新運行相同的代碼,無需手動調整預處理代碼或執行自定義分析步驟。

您能夠在 GitHub 上找到這篇文章的代碼。 注:GitHub 連接 github.com/Fematich/tf…

相關文章
相關標籤/搜索