案例引入
某銀行A與某互聯網公司B達成了企業級的合做。互聯網公司A與銀行B有着一大部分重合的用戶,A有着客戶上網行爲等特徵信息。B有着客戶的存貸狀況等特徵信息以及客戶的標籤信息——客戶的還貸狀況(Y)。B但願可以將他所獨有的特徵信息與A所獨有的特徵信息相結合,訓練出一個更強大的識別客戶信用風險的模型,但因爲不一樣行業之間的行政手續,用戶數據隱私安全等因素,企業A,B沒法直接互通數據,聯邦學習應運而生。python
聯邦學習概述
聯邦學習的定義
聯邦學習旨在創建一個基於分佈數據集的聯邦學習模型。在模型訓練的過程當中,模型相關的信息可以在各方之間交換(或者是以加密形式交換),但原始數據不能。這一交換不會暴露每一個站點上數據的任何受保護的隱私部分。已訓練好的聯邦學習模型能夠置於聯邦學習系統的各參與方,也能夠在多方之間共享。 設有N位參與方協做使用各自的訓練數據集
來訓練機器學習模型。傳統的方法是將全部的數據
收集起來而且存儲在同一個地方,例如存儲在某一臺雲端數據服務器上,從而在該服務器上使用集中後的數據集訓練獲得一個機器學習模型
。在傳統方法的訓練過程當中,任何一位參與方會將本身的數據暴露給服務器甚至其餘參與方。聯邦學習是一種不須要收集各參與方全部的數據便能協做訓練一個模型
的機器學習過程。 設
和
分別爲集中型模型
和聯邦型模型
的性能度量。在使用安全的聯邦學習在分佈式數據源上構建機器學習模型時,咱們容許在保護用戶隱私的狀況下,聯邦學習模型的性能略低於集中型模型的性能。
其中
即爲容許的性能損失。算法
聯邦學習的分類
根據聯邦學習所使用數據在各參與方的不一樣分佈狀況,咱們能夠將聯邦學習劃分爲三類:橫向聯邦學習(Horizontal Federated Learning, HFL)、縱向聯邦學習(Vertical Federated Learning, VFL)和聯邦遷移學習(Federated Transfer Learning, FTL)。下面是這三種類型聯邦學習所針對的不一樣數據分佈狀況:安全
- 橫向聯邦學習:不一樣參與方的數據有較大的特徵的重疊(橫向),但數據樣本(縱向),即特徵所屬的樣本的重疊度不高。例如,聯邦學習的參與方是兩家服務於不一樣區域市場的銀行,他們所服務的客戶羣體差異較大,但客戶的特徵可能會由於類似的商業模式而重疊度較高。
- 縱向聯邦學習:不一樣參與方的數據樣本有較大的重疊,但樣本特徵的重疊度不高。例如,兩家公司(銀行和電子商務公司)向客戶提供不一樣的服務,擁有客戶不一樣方面的數據,但他們所服務的客戶羣體有較大的重疊。
- 聯邦遷移學習:不一樣參與方的數據在特徵和樣本維度重疊度都不是很是高。
縱向聯邦學習算法
縱向聯邦學習算法有利於各企業之間創建合做,使用各自的特有數據,共同創建更增強大的模型。本篇將着重介紹一種基於加法同態加密的縱向聯邦學習算法。服務器
應用情景
細化開頭的案例,企業B 有特徵X3 和Y(標籤),可獨立建模,企業A 有特徵X一、X2,缺少Y,沒法獨立建模,如今企業A,B 合做,創建聯合模型,顯然效果會超過企業B單邊數據建模。 但兩方之間如何合做來共同訓練一個模型呢?以邏輯迴歸爲例,一個經典的邏輯迴歸的損失函數和梯度公式以下所示:
app
能夠看到,梯度的計算離不開特徵數據(x)和標籤數據(y)。所以,一種最直接的數據交互方向就是其中一方將本身獨有的數據直接以明文的方式發送給對方,由對方計算出梯度後再返回。但這樣的交互方式會產生信息的泄露,其中一方會得到所有的信息,這顯然是不符合規範的。 既然明文的傳輸不行,一種解決思路就是將須要的數據以密文的形式發送,但這又會產生另外一個問題,其中一方得到另外一方的密文數據後沒法解密,又如何進行計算呢?這時就須要引入同態加密算法。
dom
同態加密算法簡介
因爲篇幅所限,這裏將只介紹同態加密算法的做用,而不介紹其具體細節。 同態加密(Homomorphic Encryption)是一種特殊的加密方法,容許對密文進行處理獲得仍然是加密的結果,即對密文直接進行處理,跟對明文進行處理後再對處理結果加密,獲得的結果相同。從抽象代數的角度講,保持了同態性。 假設存在兩個數x、y,OP(x,y)表示x與y之間的一種操做運算(加、減、乘、除、指數……)。E(x)表示對x的加密操做,D(x)表示對x的解密操做,則當某種加密算法對某個操做OP知足同態性時,表達式以下: 或
根據算法所能支持的操做運算的範圍和次數的大小,能夠將同態加密算法分爲部分同態加密算法(PHE)、些許同態加密算法(SHE)和全同態加密算法(FHE),其支持的運算範圍與次數依次擴大。本文以後的縱向聯邦學習算法將基於Paillier算法實現,它是一種部分同態加密算法,支持加法以及與常數的乘法運算。下面我將基於Python的phe庫演示Paillier算法的做用。機器學習
#phe庫須要安裝 from phe import paillier #生成公鑰與私鑰 public_key, private_key = paillier.generate_paillier_keypair() #須要加密的數據 secret_number_list = [3.141592653, 300, -4.6e-12] #公鑰加密 encrypted_number_list = [public_key.encrypt(x) for x in secret_number_list] #私鑰解密 [private_key.decrypt(x) for x in encrypted_number_list]
支持加減法以及與常數的乘除法分佈式
a, b, c = encrypted_number_list a_plus_5 = a + 5 #= a + 5 print("a + 5 =",private_key.decrypt(a_plus_5)) a_plus_b = a + b #= a + b print("a + b =",private_key.decrypt(a_plus_b)) a_times_3_5 = a * 3.5 #= a * 3.5 print("a * 3.5 =",private_key.decrypt(a_times_3_5)) a_minus_1 = a - 1 #= a + (-1) print("a - 1=",private_key.decrypt(a_minus_1)) a_div_minus_3_1 = a / -3.1 #= a * (-1/3.1) print("a / -3.1 =",private_key.decrypt(a_div_minus_3_1)) a_minus_b = a - b #= a + (b*-1) print("a - b =",private_key.decrypt(a_minus_b))
若一些函數內部的邏輯是加法或者是與常數的乘法,一樣支持。函數
import numpy as np enc_mean = np.mean(encrypted_number_list) enc_dot = np.dot(encrypted_number_list, [2, -400.1, 5318008]) print("enc_mean:", private_key.decrypt(enc_mean)) print("enc_dot:", private_key.decrypt(enc_dot))
算法流程
邏輯迴歸的損失和梯度的公式中包含着指數運算,所以,若是要用Paillier算法進行加密,須要對原公式進行必定的改造,使其僅用加法和乘法來表示。將指數運算改造爲加法與乘法運算的一個經常使用方法就是用泰勒展開來進行近似。 最終獲得的轉化後的梯度矩陣的上半部分就是參與方A更新其參數須要的梯度(其中包含了正則項),下半部分對應B。咱們的目標是但願參與方A、B可以儘可能地進行單獨的計算,再經過加密信息的交互得到各自的梯度計算結果,所以咱們須要對計算的任務進行必定的劃分,能夠採用如下的一種設計流程。 在每一輪參數更新中,各參與方須要按序進行以下的計算和交互:性能
- 參與方A和B各自初始化本身的參數,參與方C生成祕鑰對並分發公鑰給A和B。
- 參與方A計算
,使用公鑰加密後發送給B。參與方B計算
,使用公鑰加密後發送給A。
- 此時A和B能各自計算
以及
([[x]]表示x的同態加密形式)。
- A和B須要加密的梯度發送給C來進行解密,但爲了不C直接得到梯度信息,A和B能夠將梯度加上一個隨機數
與
再發送給C。C得到加密梯度進行後進行解密再返還A和B。
- A和B只須要再減去之間加的隨機數就能得到真實的梯度,更新其參數。
代碼實現
下面咱們將基於Python代碼來實現這整個算法流程。爲了更清晰地展示算法的流程,將極度簡化交互流程的實現。
導入所需模塊
import math import numpy as np from phe import paillier import pandas as pd from sklearn import datasets from sklearn.datasets import load_diabetes from sklearn.preprocessing import StandardScaler from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split from sklearn.utils import shuffle
各參與方的定義
設置參與方的父類,各參與方都須要保存模型的參數、一些中間計算結果以及與其餘參與方的鏈接情況。
class Client: def __init__(self, config): ## 模型參數 self.config = config ## 中間計算結果 self.data = {} ## 與其餘節點的鏈接情況 self.other_client = {} ## 與其餘參與方創建鏈接 def connect(self, client_name, target_client): self.other_client[client_name] = target_client ## 向特定參與方發送數據 def send_data(self, data, target_client): target_client.data.update(data)
參與方A在訓練過程當中僅提供特徵數據。
class ClientA(Client): def __init__(self, X, config): super().__init__(config) self.X = X self.weights = np.zeros(X.shape[1]) def compute_z_a(self): z_a = np.dot(self.X, self.weights) return z_a ## 加密梯度的計算,對應step4 def compute_encrypted_dJ_a(self, encrypted_u): encrypted_dJ_a = self.X.T.dot(encrypted_u) + self.config['lambda'] * self.weights return encrypted_dJ_a ##參數的更新 def update_weight(self, dJ_a): self.weights = self.weights - self.config["lr"] * dJ_a / len(self.X) return ## A: step2 def task_1(self, client_B_name): dt = self.data assert "public_key" in dt.keys(), "Error: 'public_key' from C in step 1 not successfully received." public_key = dt['public_key'] z_a = self.compute_z_a() u_a = 0.25 * z_a z_a_square = z_a ** 2 encrypted_u_a = np.asarray([public_key.encrypt(x) for x in u_a]) encrypted_z_a_square = np.asarray([public_key.encrypt(x) for x in z_a_square]) dt.update({"encrypted_u_a": encrypted_u_a}) data_to_B = {"encrypted_u_a": encrypted_u_a, "encrypted_z_a_square": encrypted_z_a_square} self.send_data(data_to_B, self.other_client[client_B_name]) ## A: step三、4 def task_2(self, client_C_name): dt = self.data assert "encrypted_u_b" in dt.keys(), "Error: 'encrypted_u_b' from B in step 1 not successfully received." encrypted_u_b = dt['encrypted_u_b'] encrypted_u = encrypted_u_b + dt['encrypted_u_a'] encrypted_dJ_a = self.compute_encrypted_dJ_a(encrypted_u) mask = np.random.rand(len(encrypted_dJ_a)) encrypted_masked_dJ_a = encrypted_dJ_a + mask dt.update({"mask": mask}) data_to_C = {'encrypted_masked_dJ_a': encrypted_masked_dJ_a} self.send_data(data_to_C, self.other_client[client_C_name]) ## A: step6 def task_3(self): dt = self.data assert "masked_dJ_a" in dt.keys(), "Error: 'masked_dJ_a' from C in step 2 not successfully received." masked_dJ_a = dt['masked_dJ_a'] dJ_a = masked_dJ_a - dt['mask'] self.update_weight(dJ_a) print(f"A weight: {self.weights}") return
參與方B在訓練過程當中既提供特徵數據,又提供標籤數據。
class ClientB(Client): def __init__(self, X, y, config): super().__init__(config) self.X = X self.y = y self.weights = np.zeros(X.shape[1]) self.data = {} def compute_u_b(self): z_b = np.dot(self.X, self.weights) u_b = 0.25 * z_b - self.y + 0.5 return z_b, u_b def compute_encrypted_dJ_b(self, encrypted_u): encrypted_dJ_b = self.X.T.dot(encrypted_u) + self.config['lambda'] * self.weights return encrypted_dJ_b def update_weight(self, dJ_b): self.weights = self.weights - self.config["lr"] * dJ_b / len(self.X) ## B: step2 def task_1(self, client_A_name): try: dt = self.data assert "public_key" in dt.keys(), "Error: 'public_key' from C in step 1 not successfully received." public_key = dt['public_key'] except Exception as e: print("B step 1 exception: %s" % e) try: z_b, u_b = self.compute_u_b() encrypted_u_b = np.asarray([public_key.encrypt(x) for x in u_b]) dt.update({"encrypted_u_b": encrypted_u_b}) dt.update({"z_b": z_b}) except Exception as e: print("Wrong 1 in B: %s" % e) data_to_A= {"encrypted_u_b": encrypted_u_b} self.send_data(data_to_A, self.other_client[client_A_name]) ## B: step三、4 def task_2(self,client_C_name): try: dt = self.data assert "encrypted_u_a" in dt.keys(), "Error: 'encrypt_u_a' from A in step 1 not successfully received." encrypted_u_a = dt['encrypted_u_a'] encrypted_u = encrypted_u_a + dt['encrypted_u_b'] encrypted_dJ_b = self.compute_encrypted_dJ_b(encrypted_u) mask = np.random.rand(len(encrypted_dJ_b)) encrypted_masked_dJ_b = encrypted_dJ_b + mask dt.update({"mask": mask}) except Exception as e: print("B step 2 exception: %s" % e) try: assert "encrypted_z_a_square" in dt.keys(), "Error: 'encrypted_z_a_square' from A in step 1 not successfully received." encrypted_z = 4*encrypted_u_a + dt['z_b'] encrypted_loss = np.sum((0.5-self.y)*encrypted_z + 0.125*dt["encrypted_z_a_square"] + 0.125*dt["z_b"] * (encrypted_z+4*encrypted_u_a)) except Exception as e: print("B step 2 exception: %s" % e) data_to_C = {"encrypted_masked_dJ_b": encrypted_masked_dJ_b, "encrypted_loss": encrypted_loss} self.send_data(data_to_C, self.other_client[client_C_name]) ## B: step6 def task_3(self): try: dt = self.data assert "masked_dJ_b" in dt.keys(), "Error: 'masked_dJ_b' from C in step 2 not successfully received." masked_dJ_b = dt['masked_dJ_b'] dJ_b = masked_dJ_b - dt['mask'] self.update_weight(dJ_b) except Exception as e: print("A step 3 exception: %s" % e) print(f"B weight: {self.weights}") return
參與方C在整個訓練過程當中主要的做用就是分發祕鑰,以及最後的對A和B加密梯度的解密。
class ClientC(Client): """ Client C as trusted dealer. """ def __init__(self, A_d_shape, B_d_shape, config): super().__init__(config) self.A_data_shape = A_d_shape self.B_data_shape = B_d_shape self.public_key = None self.private_key = None ## 保存訓練中的損失值(泰展開近似) self.loss = [] ## C: step1 def task_1(self, client_A_name, client_B_name): try: public_key, private_key = paillier.generate_paillier_keypair() self.public_key = public_key self.private_key = private_key except Exception as e: print("C step 1 error 1: %s" % e) data_to_AB = {"public_key": public_key} self.send_data(data_to_AB, self.other_client[client_A_name]) self.send_data(data_to_AB, self.other_client[client_B_name]) return ## C: step5 def task_2(self, client_A_name, client_B_name): try: dt = self.data assert "encrypted_masked_dJ_a" in dt.keys() and "encrypted_masked_dJ_b" in dt.keys(), "Error: 'masked_dJ_a' from A or 'masked_dJ_b' from B in step 2 not successfully received." encrypted_masked_dJ_a = dt['encrypted_masked_dJ_a'] encrypted_masked_dJ_b = dt['encrypted_masked_dJ_b'] masked_dJ_a = np.asarray([self.private_key.decrypt(x) for x in encrypted_masked_dJ_a]) masked_dJ_b = np.asarray([self.private_key.decrypt(x) for x in encrypted_masked_dJ_b]) except Exception as e: print("C step 2 exception: %s" % e) try: assert "encrypted_loss" in dt.keys(), "Error: 'encrypted_loss' from B in step 2 not successfully received." encrypted_loss = dt['encrypted_loss'] loss = self.private_key.decrypt(encrypted_loss) / self.A_data_shape[0] + math.log(2) print("******loss: ", loss, "******") self.loss.append(loss) except Exception as e: print("C step 2 exception: %s" % e) data_to_A = {"masked_dJ_a": masked_dJ_a} data_to_B = {"masked_dJ_b": masked_dJ_b} self.send_data(data_to_A, self.other_client[client_A_name]) self.send_data(data_to_B, self.other_client[client_B_name]) return
模擬數據的生成
這裏將基於sklearn中的乳腺癌數據集生成一組模擬數據,參與方A得到部分特徵數據,參與方B得到部分特徵數據與標籤數據。
def load_data(): # 加載數據 breast = load_breast_cancer() # 數據拆分 X_train, X_test, y_train, y_test = train_test_split(breast.data, breast.target, random_state=1) # 數據標準化 std = StandardScaler() X_train = std.fit_transform(X_train) X_test = std.transform(X_test) return X_train, y_train, X_test, y_test ## 將特徵分配給A和B def vertically_partition_data(X, X_test, A_idx, B_idx): """ Vertically partition feature for party A and B :param X: train feature :param X_test: test feature :param A_idx: feature index of party A :param B_idx: feature index of party B :return: train data for A, B; test data for A, B """ XA = X[:, A_idx] XB = X[:, B_idx] XB = np.c_[np.ones(X.shape[0]), XB] XA_test = X_test[:, A_idx] XB_test = X_test[:, B_idx] XB_test = np.c_[np.ones(XB_test.shape[0]), XB_test] return XA, XB, XA_test, XB_test
訓練流程的實現
def vertical_logistic_regression(X, y, X_test, y_test, config): """ Start the processes of the three clients: A, B and C. :param X: features of the training dataset :param y: labels of the training dataset :param X_test: features of the test dataset :param y_test: labels of the test dataset :param config: the config dict :return: True """ ## 獲取數據 XA, XB, XA_test, XB_test = vertically_partition_data(X, X_test, config['A_idx'], config['B_idx']) print('XA:',XA.shape, ' XB:',XB.shape) ## 各參與方的初始化 client_A = ClientA(XA, config) print("Client_A successfully initialized.") client_B = ClientB(XB, y, config) print("Client_B successfully initialized.") client_C = ClientC(XA.shape, XB.shape, config) print("Client_C successfully initialized.") ## 各參與方之間鏈接的創建 client_A.connect("B", client_B) client_A.connect("C", client_C) client_B.connect("A", client_A) client_B.connect("C", client_C) client_C.connect("A", client_A) client_C.connect("B", client_B) ## 訓練 for i in range(config['n_iter']): client_C.task_1("A", "B") client_A.task_1("B") client_B.task_1("A") client_A.task_2("C") client_B.task_2("C") client_C.task_2("A", "B") client_A.task_3() client_B.task_3() print("All process done.") return True config = { 'n_iter': 100, 'lambda': 10, 'lr': 0.05, 'A_idx': [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], 'B_idx': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], } X, y, X_test, y_test = load_data() vertical_logistic_regression(X, y, X_test, y_test, config)
訓練效果
爲測試該縱向聯邦學習算法的訓練效果。能夠設置普通的集中式訓練的邏輯迴歸算法做爲對照組,基於乳腺癌數據集,使用相同的訓練集數據及相同的邏輯迴歸模型來進行訓練,觀察其損失值的降低曲線以及在相同測試集上的預測準確率。 如下是兩種狀況下,訓練的損失值的降低狀況: 各曲線表明的情形: Logistic: 普通邏輯迴歸的損失值變化曲線,使用的是正常的損失函數 Taylor_Logistic: 普通邏輯迴歸的損失值變化曲線,使用的是泰勒展開擬合的損失函數 Taylor_Taylor:縱向邏輯迴歸的損失值變化曲線,使用的是泰勒展開擬合的損失函數
如下是在sklearn中不一樣數據集上,普通邏輯迴歸與縱向邏輯迴歸的訓練結果的正確率及AUC的差別,其中rows表明樣本數量,feat表明特徵數量,logistic表明集中式邏輯迴歸的訓練結果,Vertical表明縱向聯邦學習算法的訓練效果。 由訓練結果的比較能夠看到,與普通的邏輯迴歸相比,該縱向邏輯迴歸算法在保證各方數據隱私性的同時,在實驗數據集上可以達到不錯的訓練效果。
參考文獻
[1] Yang Q , Liu Y , Chen T , et al. Federated Machine Learning: Concept and Applications[J]. ACM Transactions on Intelligent Systems and Technology, 2019, 10(2):1-19. [2] Hardy S , Henecka W , Ivey-Law H , et al. Private federated learning on vertically partitioned data via entity resolution and additively homomorphic encryption[J]. 2017. [3] https://zhuanlan.zhihu.com/p/94105330