Deep Interest Network(DIN)是阿里媽媽精準定向檢索及基礎算法團隊在2017年6月提出的。其針對電子商務領域(e-commerce industry)的CTR預估,重點在於充分利用/挖掘用戶歷史行爲數據中的信息。python
本系列文章解讀論文以及源碼,順便梳理一些深度學習相關概念和TensorFlow的實現。git
本文經過DIN源碼 https://github.com/mouna99/dien 分析,來深刻展開看看embedding層如何自動更新。github
在上文中,咱們分析了embedding層的做用,可是留了一個問題還沒有解答:算法
即DIN代碼中,以下變量怎麼更新:編程
self.uid_embeddings_var = tf.get_variable("uid_embedding_var", [n_uid, EMBEDDING_DIM]) self.mid_embeddings_var = tf.get_variable("mid_embedding_var", [n_mid, EMBEDDING_DIM]) self.cat_embeddings_var = tf.get_variable("cat_embedding_var", [n_cat, EMBEDDING_DIM])
由於在DIN中,只有這一處初始化 embeddings 的地方,沒有找到迭代更新的代碼,這會給初學者帶來一些困擾。網絡
先簡要說一下答案,embedding層經過 optimizer 進行更新(自動求導),經過 session.run 進行調用更新。session
通常意義的 embedding 大可能是神經網絡倒數第二層的參數權重,只具備總體意義和相對意義,不具有局部意義和絕對含義,這與 embedding 的產生過程有關,app
任何 embedding 一開始都是一個隨機數,而後隨着優化算法,不斷迭代更新,最後網絡收斂中止迭代的時候,網絡各個層的參數就相對固化,獲得隱層權重表(此時就至關於獲得了咱們想要的 embedding),而後在經過查表能夠單獨查看每一個元素的 embedding。框架
DIN中對應代碼以下:dom
# 優化更新(自動求導) self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(self.loss) ...... # 經過 session.run 進行調用更新 def train(self, sess, inps): if self.use_negsampling: loss, accuracy, aux_loss, _ = sess.run([self.loss, self.accuracy, self.aux_loss, self.optimizer], feed_dict={ self.uid_batch_ph: inps[0], self.mid_batch_ph: inps[1], self.cat_batch_ph: inps[2], self.mid_his_batch_ph: inps[3], self.cat_his_batch_ph: inps[4], self.mask: inps[5], self.target_ph: inps[6], self.seq_len_ph: inps[7], self.lr: inps[8], self.noclk_mid_batch_ph: inps[9], self.noclk_cat_batch_ph: inps[10], }) return loss, accuracy, aux_loss else: loss, accuracy, _ = sess.run([self.loss, self.accuracy, self.optimizer], feed_dict={ self.uid_batch_ph: inps[0], self.mid_batch_ph: inps[1], self.cat_batch_ph: inps[2], self.mid_his_batch_ph: inps[3], self.cat_his_batch_ph: inps[4], self.mask: inps[5], self.target_ph: inps[6], self.seq_len_ph: inps[7], self.lr: inps[8], }) return loss, accuracy, 0
這涉及的部分不少,咱們須要一一闡釋。
大多數機器學習(深度學習)任務就是最小化損失,在損失函數定義好的狀況下,使用一種優化器進行求解最小損失。
而爲了讓loss降低,深度學習框架常見的優化方式通常採用的是梯度降低(Gradient Descent)算法,這要求對loss公式上的每一個op都須要求偏導,而後使用鏈式法則結合起來。
給定一個可微函數,理論上能夠用解析法找到它的最小值:函數的最小值是導數爲 0 的點,所以你只需找到全部導數爲 0 的點,而後計算函數在其中哪一個點具備最小值。
將這一方法應用於神經網絡,就是用解析法求出最小損失函數對應的全部權重值。能夠經過對方程 gradient(f)(W) = 0 求解 W 來實現這一方法。
即便用基於梯度的優化方式進行求解,基於當前在隨機數據批量上的損失,一點一點地對參數進行調節。因爲處理的是一個可微函數,你能夠計算出它的梯度,而後沿着梯度的反方向更新權重,損失每次都會變小一點。
W -= step * gradient
,從而使這批數據這就叫做小批量隨機梯度降低(mini-batch stochastic gradient descent,又稱爲小批量SGD)。
術語隨機(stochastic)是指每批數據都是隨機抽取的(stochastic 是random在科學上的同義詞)。
反向傳播 算法的訓練過程則是根據網絡計算獲得的 Y_out 和實際的真實結果 Y_label 來計算偏差,而且沿着網絡反向傳播來調整公式中的全部 Wi 和 bi,使偏差達到最小。強調一下,深度學習裏面 BP 的本質目標是讓偏差達到最小,因此要用偏差對中間出現過的全部影響因素求偏導。
經過反向傳播算法優化神經網絡是一個迭代的過程。
前向求導是從第一層開始,逐層計算梯度 ∂ / ∂X 到最後一層。反向求導是從最後一層開始,逐層計算梯度 ∂Z / ∂ 到第一層。前向求導關注的是輸入是怎麼影響到每一層的,反向求導則是關注於每一層是怎麼影響到最終的輸出結果的。
自動求導就是每個op/layer本身依據本身的輸入和輸出作前向計算/反向求導,而框架則負責組裝調度這些op/layer,表現出來就是你經過框架去定義網絡/計算圖,框架自動前向計算並自動求導。
常見的深度學習框架裏每一個op(op指的是最小的計算單元,caffe裏叫layer)都預先定義好了 forward 和backward(或者叫grad)兩個函數,這裏的 backward 也就是求導。也就是說每一個op的求導都是預先定義好的,或者說是人手推的。
當你定義好了一個神經網絡,常見的深度學習框架將其解釋爲一個dag(有向無環圖),dag裏每一個節點就是op,從loss function這個節點開始,經過鏈式法則一步一步從後往前計算每一層神經網絡的梯度,整個dag梯度計算的最小粒度就是op的 backward 函數(這裏是手動的),而鏈式法則則是自動的。
TensorFlow也是如此。
TensorFlow 提供的是聲明式的編程接口,用戶不須要關心求導的細節,只須要定義好模型獲得一個loss方程,而後使用TensorFlow實現的各類Optimizer來進行運算便可。
這要求TensorFlow自己提供了每一個op的求偏導方法,並且雖然咱們使用的是Python的加減乘除運算符,其實是TensorFlow重載了運算符實際上會建立「Square」這樣的op,能夠方便用戶更容易得構建表達式。
所以TensorFlow的求導,其實是先提供每個op求導的數學實現,而後使用鏈式法則求出整個表達式的導數。
具體咱們能夠參見RegisterGradient的實現,以及nn_grad.py,math_grad.py等幾個文件
這些文件的全部的函數都用RegisterGradient裝飾器包裝了起來,這些函數都接受兩個參數,op和grad。其餘的只要註冊了op的地方也有各類使用這個裝飾器,例如batch。
RegisterGradient使用舉例以下:
@ops.RegisterGradient("Abs") def _AbsGrad(op, grad): x = op.inputs[0] return grad * math_ops.sign(x)
RegisterGradient定義以下,就是註冊op梯度函數的裝飾器:
class RegisterGradient(object): def __init__(self, op_type): if not isinstance(op_type, six.string_types): raise TypeError("op_type must be a string") self._op_type = op_type def __call__(self, f): """Registers the function `f` as gradient function for `op_type`.""" _gradient_registry.register(f, self._op_type) return f
道理說着還不錯,可是神經網絡是究竟怎麼反向傳遞更新呢?這就須要看Optimizer了。
回到 TensorFlow 的 Python 代碼層面,自動求導的部分是靠各類各樣的 Optimizer 串起來的:
minimize()
方法就會自動完成反向部分的數據流圖構建。在DIEN這裏,代碼以下:
ctr_loss = - tf.reduce_mean(tf.log(self.y_hat) * self.target_ph) self.loss = ctr_loss if self.use_negsampling: self.loss += self.aux_loss self.optimizer = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(self.loss)
TF的optimizer都繼承自Optimizer這個類,這個類的方法很是多,幾個重要方法是 minimize、compute_gradients、apply_gradients、slot系列。
Optimizer 基類的這個方法爲每一個實現子類預留了_create_slots()
,_prepare()
,_apply_dense()
,_apply_sparse()
四個接口出來,後面新構建的 Optimizer 只須要重寫或者擴展 Optimizer 類的某幾個函數便可;
整個反向傳播過程可分爲三步,這三步僅需經過一個minimize()函數完成:
compute_gradients()
;apply_gradients();
即往最小化 loss 的方向更新 var_list 中的每個參數;代碼以下:
def minimize(self, loss, global_step=None, var_list=None, gate_gradients=GATE_OP, aggregation_method=None, colocate_gradients_with_ops=False, name=None, grad_loss=None): grads_and_vars = self.compute_gradients( loss, var_list=var_list, gate_gradients=gate_gradients, aggregation_method=aggregation_method, colocate_gradients_with_ops=colocate_gradients_with_ops, grad_loss=grad_loss) vars_with_grad = [v for g, v in grads_and_vars if g is not None] return self.apply_gradients(grads_and_vars, global_step=global_step, name=name)
該函數用於計算loss對於可訓練變量val_list的梯度,最終返回的是元組列表,即 [(gradient, variable),...]。
參數含義:
tf.Variable
to update to minimize loss
. Defaults to the list of variables collected in the graph under the key GraphKeys.TRAINABLE_VARIABLES
.基本邏輯以下:
其中,_get_processor函數可理解爲一種快速更新variables的方法,每一個processor都會包含一個update_op這樣的函數來進行variable更新操做。
變量更新公式:
代碼以下:
def compute_gradients(self, loss, var_list=None, gate_gradients=GATE_OP, aggregation_method=None, colocate_gradients_with_ops=False, grad_loss=None): self._assert_valid_dtypes([loss]) if grad_loss is not None: self._assert_valid_dtypes([grad_loss]) if var_list is None: var_list = ( variables.trainable_variables() + ops.get_collection(ops.GraphKeys.TRAINABLE_RESOURCE_VARIABLES)) else: var_list = nest.flatten(var_list) var_list += ops.get_collection(ops.GraphKeys._STREAMING_MODEL_PORTS) processors = [_get_processor(v) for v in var_list] var_refs = [p.target() for p in processors] grads = gradients.gradients( loss, var_refs, grad_ys=grad_loss, gate_gradients=(gate_gradients == Optimizer.GATE_OP), aggregation_method=aggregation_method, colocate_gradients_with_ops=colocate_gradients_with_ops) if gate_gradients == Optimizer.GATE_GRAPH: grads = control_flow_ops.tuple(grads) grads_and_vars = list(zip(grads, var_list)) return grads_and_vars
gradients 的實際定義在 tensorflow/python/ops/gradients_impl.py
中。把整個求導過程抽象成一個 ys=f(xs) 的函數。
簡單說,它就是爲了計算一組輸出張量ys = [y0, y1, ...]
對輸入張量xs = [x0, x1, ...]
的梯度,對每一個xi
有grad_i = sum[dy_j/dx_i for y_j in ys]
。默認狀況下,grad_loss
是None
,此時grad_ys
被初始化爲全1向量。
gradients 部分參數以下:
grad_ys
存儲計算出的梯度;gate_gradients
是一個布爾變量,指示全部梯度是否在使用前被算出,若是設爲True
,能夠避免競爭條件;這個方法會維護兩個重要變量
queue
,隊列裏存放計算圖裏全部出度爲0的操做符grads
,字典的鍵是操做符自己,值是該操做符每一個輸出端收到的梯度列表反向傳播求梯度時,每從隊列中彈出一個操做符,都會把它輸出變量的梯度加起來(對應全微分定理)獲得out_grads
,而後獲取對應的梯度計算函數grad_fn
。操做符op
自己和out_grads
會傳遞給grad_fn
作參數,求出輸入的梯度。
基本邏輯以下:
具體代碼以下:
def gradients(ys, xs, grad_ys=None, name="gradients", colocate_gradients_with_ops=False, gate_gradients=False, aggregation_method=None, stop_gradients=None): to_ops = [t.op for t in ys] from_ops = [t.op for t in xs] grads = {} # Add the initial gradients for the ys. for y, grad_y in zip(ys, grad_ys): _SetGrad(grads, y, grad_y) # Initialize queue with to_ops. queue = collections.deque() # Add the ops in 'to_ops' into the queue. to_ops_set = set() for op in to_ops: ready = (pending_count[op._id] == 0) if ready and op._id not in to_ops_set: to_ops_set.add(op._id) queue.append(op) while queue: # generate gradient subgraph for op. op = queue.popleft() with _maybe_colocate_with(op, colocate_gradients_with_ops): if loop_state: loop_state.EnterGradWhileContext(op, before=True) out_grads = _AggregatedGrads(grads, op, loop_state, aggregation_method) if loop_state: loop_state.ExitGradWhileContext(op, before=True) if has_out_grads and (op._id not in stop_ops): if is_func_call: func_call = ops.get_default_graph()._get_function(op.type) grad_fn = func_call.python_grad_func else: try: grad_fn = ops.get_gradient_function(op) for i, (t_in, in_grad) in enumerate(zip(op.inputs, in_grads)): if in_grad is not None: if (isinstance(in_grad, ops.Tensor) and t_in.dtype != dtypes.resource): try: in_grad.set_shape(t_in.get_shape()) _SetGrad(grads, t_in, in_grad) if loop_state: loop_state.ExitGradWhileContext(op, before=False)
該函數的做用是將compute_gradients()
返回的值做爲輸入參數對variable進行更新,即根據前面求得的梯度,把梯度進行方向傳播給weights和biases進行參數更新。
那爲何minimize()
會分開兩個步驟呢?緣由是由於在某些狀況下咱們須要對梯度作必定的修正,例如爲了防止梯度消失(gradient vanishing)或者梯度爆炸(gradient explosion),咱們須要事先干預一下以避免程序出現Nan的尷尬狀況;有的時候也許咱們須要給計算獲得的梯度乘以一個權重或者其餘亂七八糟的緣由,因此才分開了兩個步驟。
基本邏輯以下:
grad, var, processor in converted_grads_and_vars
,應用 ops.colocate_with(var),做用是保證每一個參數var的更新都在同一個device上;update_ops.append(processor.update_op(self, grad))
,若是有global_step
的話,global_step需加個1。train_op
。train_op
是一般訓練過程當中,client爲session的fetches提供的參數之一,也就是這個Operation被執行以後,模型的參數將會完成更新,並開始下一個batch的訓練。那麼這也就意味着,這個方法中涉及到的計算圖將會實現說明文檔中的訓練邏輯。具體代碼是:
def apply_gradients(self, grads_and_vars, global_step=None, name=None): grads_and_vars = tuple(grads_and_vars) # Make sure repeat iteration works. converted_grads_and_vars = [] for g, v in grads_and_vars: if g is not None: # Convert the grad to Tensor or IndexedSlices if necessary. g = ops.convert_to_tensor_or_indexed_slices(g) p = _get_processor(v) converted_grads_and_vars.append((g, v, p)) converted_grads_and_vars = tuple(converted_grads_and_vars) var_list = [v for g, v, _ in converted_grads_and_vars if g is not None] with ops.control_dependencies(None): self._create_slots([_get_variable_for(v) for v in var_list]) update_ops = [] with ops.name_scope(name, self._name) as name: self._prepare() for grad, var, processor in converted_grads_and_vars: if grad is None: continue scope_name = var.op.name if context.in_graph_mode() else "" with ops.name_scope("update_" + scope_name), ops.colocate_with(var): update_ops.append(processor.update_op(self, grad)) if global_step is None: apply_updates = self._finish(update_ops, name) else: with ops.control_dependencies([self._finish(update_ops, "update")]): with ops.colocate_with(global_step): apply_updates = state_ops.assign_add(global_step, 1, name=name).op train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP) if apply_updates not in train_op: train_op.append(apply_updates) return apply_updates
DIEN使用的是AdamOptimizer優化器。
Adam 這個名字來源於自適應矩估計(Adaptive Moment Estimation),也是梯度降低算法的一種變形,可是每次迭代參數的學習率都有必定的範圍,不會由於梯度很大而致使學習率(步長)也變得很大,參數的值相對比較穩定。
機率論中矩的含義是:若是一個隨機變量 X 服從某個分佈,X 的一階矩是 E(X),也就是樣本平均值,X 的二階矩就是 E(X^2),也就是樣本平方的平均值。
Adam 算法利用梯度的一階矩估計和二階矩估計動態調整每一個參數的學習率。TensorFlow提供的tf.train.AdamOptimizer可控制學習速度,通過偏置校訂後,每一次迭代學習率都有個肯定範圍,使得參數比較平穩。
在利用計算好的導數對權重進行修正時,對Embedding矩陣的梯度進行特殊處理,只更新局部,見optimization.py中Adagrad.update函數。
在_prepare
函數中經過convert_to_tensor
方法來存儲了輸入參數的 Tensor 版本。
def _prepare(self): self._lr_t = ops.convert_to_tensor(self._lr, name="learning_rate") self._beta1_t = ops.convert_to_tensor(self._beta1, name="beta1") self._beta2_t = ops.convert_to_tensor(self._beta2, name="beta2") self._epsilon_t = ops.convert_to_tensor(self._epsilon, name="epsilon")
_create_slots
函數用來建立參數,好比 _beta1_power,_beta2_power
def _create_slots(self, var_list): first_var = min(var_list, key=lambda x: x.name) create_new = self._beta1_power is None if not create_new and context.in_graph_mode(): create_new = (self._beta1_power.graph is not first_var.graph) if create_new: with ops.colocate_with(first_var): self._beta1_power = variable_scope.variable(self._beta1, name="beta1_power", trainable=False) self._beta2_power = variable_scope.variable(self._beta2, name="beta2_power", trainable=False) # Create slots for the first and second moments. for v in var_list: self._zeros_slot(v, "m", self._name) self._zeros_slot(v, "v", self._name)
函數_apply_dense
和_resource_apply_dense
的實現中分別使用了training_ops.apply_adam
和training_ops.resource_apply_adam
方法。
函數_apply_sparse
和_resource_apply_sparse
主要用在稀疏向量的更新操做上,而具體的實現是在函數_apply_sparse_shared
中。
_apply_sparse_shared
函數,首先獲取所須要的參數值並存儲到變量裏,接着按照 Adam 算法的流程,首先計算學習率,接着計算兩個 Momentum ,因爲是稀疏 tensor 的更新,因此在算出更新值以後要使用scatter_add
來完成加法操做, 最後將var_update
和m_t
、v_t
的更新操做放進control_flow_ops.group
中。
優化器已經搭建好,剩下就是調用 session.run
進行更新。
調用一次 run 是執行一遍數據流圖, 在 TensorFlow 的訓練代碼中一般是在一個循環中屢次調用 sess.run()
,一次 run 即爲訓練過程當中的一步。
fetches 是 run 方法的一個輸入參數,這個參數能夠是不少種形式的數據,run 最後的 返回值也會和 fetches 有相同的結構。
至此,DIN分析暫時告一段落,下篇開始 DIEN 的分析,敬請期待。
TensorFlow SyncReplicasOptimizer 解讀
tensorflow中有向圖(計算圖、Graph)、上下文環境(Session)和執行流程
TensorFlow 拆包(一):Session.Run ()
tensorflow源碼分析(五)session.run()
Tensorflow中優化器--AdamOptimizer詳解
【TensorFlow】優化器AdamOptimizer的源碼分析
TensorFlow中Session、Graph、Operation以及Tensor詳解
TensorFlow 拆包(二):TF 的數據流模型實現以及自動求導
分佈式Tensorflow中同步梯度更新tf.train.SyncReplicasOptimizer解讀(backup_worker的用法)
TensorFlow學習筆記之--[compute_gradients和apply_gradients原理淺析]