三分鐘快速上手TensorFlow 2.0 (上)——前置基礎、模型創建與可視化

本文學習筆記參照來源:https://tf.wiki/zh/basic/basic.htmlhtml

學習筆記相似提綱,具體細節參照上文連接python

 

一些前置的基礎git

隨機數      tf.random uniform(shape())算法

兩個元素零向量  tf.zeros(shape=(2))api

2x2常量      tf.constant([1,2],[3,4])數組

查看形狀、類型、值     A.shape A.dtype A.numpy()網絡

矩陣相加         tf.add(A,B)app

矩陣相乘         tf.matmul(A,B)dom

 自動求導機制        tf.GradientTape()ide

import tensorflow as tf

x = tf.Variable(initial_value=3.)
with tf.GradientTape() as tape:     # 在 tf.GradientTape() 的上下文內,全部計算步驟都會被記錄以用於求導
    y = tf.square(x)
y_grad = tape.gradient(y, x)        # 計算y關於x的導數
print([y, y_grad])
單元函數求偏導
X = tf.constant([[1., 2.], [3., 4.]])
y = tf.constant([[1.], [2.]])
w = tf.Variable(initial_value=[[1.], [2.]])
b = tf.Variable(initial_value=1.)
with tf.GradientTape() as tape:
    L = 0.5 * tf.reduce_sum(tf.square(tf.matmul(X, w) + b - y))
w_grad, b_grad = tape.gradient(L, [w, b])        # 計算L(w, b)關於w, b的偏導數
print([L.numpy(), w_grad.numpy(), b_grad.numpy()])
多元函數求偏導

 

事例:線性迴歸(梯度降低)

import numpy as np

X_raw = np.array([2013, 2014, 2015, 2016, 2017], dtype=np.float32)
y_raw = np.array([12000, 14000, 15000, 16500, 17500], dtype=np.float32)

X = (X_raw - X_raw.min()) / (X_raw.max() - X_raw.min())
y = (y_raw - y_raw.min()) / (y_raw.max() - y_raw.min())
基本的歸一化操做
對於多元函數 f(x) 求局部極小值,梯度降低 的過程以下:

初始化自變量爲 x_0 , k=0
迭代進行下列步驟直到知足收斂條件:

求函數 f(x) 關於自變量的梯度 \nabla f(x_k)
更新自變量: x_{k+1} = x_{k} - \gamma \nabla f(x_k) 。這裏 \gamma 是學習率(也就是梯度降低一次邁出的 「步子」 大小)
k \leftarrow k+1
接下來,咱們考慮如何使用程序來實現梯度降低方法,求得線性迴歸的解 \min_{a, b} L(a, b) = \sum_{i=1}^n(ax_i + b - y_i)^2 。
梯度降低知識點
a, b = 0, 0

num_epoch = 10000
learning_rate = 1e-3
for e in range(num_epoch):
    # 手動計算損失函數關於自變量(模型參數)的梯度
    y_pred = a * X + b
    grad_a, grad_b = (y_pred - y).dot(X), (y_pred - y).sum()

    # 更新參數
    a, b = a - learning_rate * grad_a, b - learning_rate * grad_b

print(a, b)


# np.dot() 是求內積, np.sum() 是求和
# 手工求損失函數關於參數 a 和 b 的偏導數
NumPy 下的線性迴歸
#使用 tape.gradient(ys, xs) 自動計算梯度;
#使用 optimizer.apply_gradients(grads_and_vars) 自動更新模型參數。

X = tf.constant(X)
y = tf.constant(y)

a = tf.Variable(initial_value=0.)
b = tf.Variable(initial_value=0.)
variables = [a, b]

num_epoch = 10000
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
for e in range(num_epoch):
    # 使用tf.GradientTape()記錄損失函數的梯度信息
    with tf.GradientTape() as tape:
        y_pred = a * X + b
        loss = 0.5 * tf.reduce_sum(tf.square(y_pred - y))
    # TensorFlow自動計算損失函數關於自變量(模型參數)的梯度
    grads = tape.gradient(loss, variables)
    # TensorFlow自動根據梯度更新參數
    optimizer.apply_gradients(grads_and_vars=zip(grads, variables))

print(a, b)
TensorFlow 下的線性迴歸

 

TensorFlow 模型創建與訓練 

模型的構建: tf.keras.Model 和 tf.keras.layers

模型的損失函數: tf.keras.losses

模型的優化器: tf.keras.optimizer

模型的評估: tf.keras.metrics

 

Keras 模型以類的形式呈現

經過繼承 tf.keras.Model 這個 Python 類來定義本身的模型。

須要重寫 __init__() (構造函數,初始化)和 call(input)(模型調用)兩個方法

同時也能夠根據須要增長自定義的方法。

class MyModel(tf.keras.Model):
    def __init__(self):
        super().__init__()     # Python 2 下使用 super(MyModel, self).__init__()
        # 此處添加初始化代碼(包含 call 方法中會用到的層),例如
        # layer1 = tf.keras.layers.BuiltInLayer(...)
        # layer2 = MyCustomLayer(...)

    def call(self, input):
        # 此處添加模型調用的代碼(處理輸入並返回輸出),例如
        # x = layer1(input)
        # output = layer2(x)
        return output

    # 還能夠添加自定義的方法
繼承 tf.keras.Model

 

 

import tensorflow as tf

X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
y = tf.constant([[10.0], [20.0]])


class Linear(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = tf.keras.layers.Dense(
            units=1,
            activation=None,
            kernel_initializer=tf.zeros_initializer(),
            bias_initializer=tf.zeros_initializer()
        )

    def call(self, input):
        output = self.dense(input)
        return output


# 如下代碼結構與前節相似
model = Linear()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for i in range(100):
    with tf.GradientTape() as tape:
        y_pred = model(X)      # 調用模型 y_pred = model(X) 而不是顯式寫出 y_pred = a * X + b
        loss = tf.reduce_mean(tf.square(y_pred - y))
    grads = tape.gradient(loss, model.variables)    # 使用 model.variables 這一屬性直接得到模型中的全部變量
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
print(model.variables)
線性模型 y_pred = a * X + b


全鏈接層 是 Keras 中最基礎和經常使用的層之一

對輸入矩陣 A 進行 f(AW + b) 的線性變換 + 激活函數操做。

 

這裏的等號實際上是通過了一個激活函數後獲得最後的結果

 

主要參數以下:

  • units :輸出張量的維度;

  • activation :激活函數,對應於 f(AW + b) 中的 f ,默認爲無激活函數( a(x) x )。經常使用的激活函數包括 tf.nn.relu 、 tf.nn.tanh 和 tf.nn.sigmoid

  • use_bias :是否加入偏置向量 bias ,即 f(AW + b) 中的 b。默認爲 True ;

  • kernel_initializer 、 bias_initializer :權重矩陣 kernel 和偏置向量 bias 兩個變量的初始化器。默認爲 tf.glorot_uniform_initializer 1 。設置爲 tf.zeros_initializer 表示將兩個變量均初始化爲全 0;

該層包含權重矩陣 kernel [input_dim, units] 和偏置向量 bias [units] 2 兩個可訓練變量,對應於 f(AW + b) 中的 W 和 b

 

基礎示例:多層感知機MLP

數據獲取及預處理: tf.keras.datasets

tf.keras.datasets
這裏讀入的是灰度圖片,色彩通道數爲 1(彩色 RGB 圖像色彩通道數爲 3),因此咱們使用  np.expand_dims()函數爲圖像數據手動在最後添加一維通道。

模型的構建: tf.keras.Model 和 tf.keras.layers

class MLP(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten()    # Flatten層將除第一維(batch_size)之外的維度展平
        self.dense1 = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):         # [batch_size, 28, 28, 1]
        x = self.flatten(inputs)    # [batch_size, 784]
        x = self.dense1(x)          # [batch_size, 100]
        x = self.dense2(x)          # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output
tf.keras.Model 和 tf.keras.layers

輸出 「輸入圖片分別屬於 0 到 9 的機率」,也就是一個 10 維的離散機率分佈

這個 10 維向量至少知足兩個條件:

  • 該向量中的每一個元素均在 [0, 1] 之間;

  • 該向量的全部元素之和爲 1。

softmax 函數可以凸顯原始向量中最大的值,並抑制遠低於最大值的其餘份量

模型的訓練: tf.keras.losses 和 tf.keras.optimizer

num_epochs = 5
batch_size = 50
learning_rate = 0.001
定義一些模型超參數
model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
實例化模型和數據讀取類,並實例化一個 tf.keras.optimizer 的優化器
num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
迭代
    • 從 DataLoader 中隨機取一批訓練數據;

    • 將這批數據送入模型,計算出模型的預測值;

    • 將模型預測值與真實值進行比較,計算損失函數(loss)。這裏使用 tf.keras.losses中的交叉熵函數做爲損失函數;

    • 計算損失函數關於模型變量的導數;

    • 將求出的導數值傳入優化器,使用優化器的 apply_gradients 方法更新模型參數以最小化損失函數。

交叉熵做爲損失函數,在分類問題中被普遍應用。其離散形式爲 H(y, \hat{y}) = -\sum_{i=1}^{n}y_i \log(\hat{y_i}) ,其中 y 爲真實機率分佈, \hat{y} 爲預測機率分佈, n 爲分類任務的類別個數。預測機率分佈與真實分佈越接近,則交叉熵的值越小,反之則越大。

在 tf.keras 中,有兩個交叉熵相關的損失函數 tf.keras.losses.categorical_crossentropy 和 tf.keras.losses.sparse_categorical_crossentropy 。其中 sparse 的含義是,真實的標籤值 y_true 能夠直接傳入 int 類型的標籤類別。具體而言:

loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred) 

loss = tf.keras.losses.categorical_crossentropy( y_true=tf.one_hot(y, depth=tf.shape(y_pred)[-1]), y_pred=y_pred ) 

的結果相同。

模型的評估: tf.keras.metrics

sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
    num_batches = int(data_loader.num_test_data // batch_size)
    for batch_index in range(num_batches):
        start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
        y_pred = model.predict(data_loader.test_data[start_index: end_index])
        sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
    print("test accuracy: %f" % sparse_categorical_accuracy.result())
tf.keras.metrics
每次經過 update_state() 方法向評估器輸入兩個參數: y_pred 和 y_true ,即模型預測出的結果和真實結果。

 

 

 

 

卷積神經網絡(CNN)

包含一個或多個卷積層(Convolutional Layer)、池化層(Pooling Layer)和全鏈接層(Fully-connected Layer)

class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=32,             # 卷積層神經元(卷積核)數目
            kernel_size=[5, 5],     # 感覺野大小
            padding='same',         # padding策略(vaild 或 same)
            activation=tf.nn.relu   # 激活函數
        )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.conv2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=[5, 5],
            padding='same',
            activation=tf.nn.relu
        )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7 * 7 * 64,))
        self.dense1 = tf.keras.layers.Dense(units=1024, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):
        x = self.conv1(inputs)                  # [batch_size, 28, 28, 32]
        x = self.pool1(x)                       # [batch_size, 14, 14, 32]
        x = self.conv2(x)                       # [batch_size, 14, 14, 64]
        x = self.pool2(x)                       # [batch_size, 7, 7, 64]
        x = self.flatten(x)                     # [batch_size, 7 * 7 * 64]
        x = self.dense1(x)                      # [batch_size, 1024]
        x = self.dense2(x)                      # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output
使用 Keras 實現卷積神經網絡

將模型的訓練中實例化模型的model MLP() 更換成 model CNN()便可

import tensorflow as tf
import tensorflow_datasets as tfds

num_batches = 1000
batch_size = 50
learning_rate = 0.001

dataset = tfds.load("tf_flowers", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(lambda img, label: (tf.image.resize(img, [224, 224]) / 255.0, label)).shuffle(1024).batch(32)
model = tf.keras.applications.MobileNetV2(weights=None, classes=5)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for images, labels in dataset:
    with tf.GradientTape() as tape:
        labels_pred = model(images)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=labels, y_pred=labels_pred)
        loss = tf.reduce_mean(loss)
        print("loss %f" % loss.numpy())
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
使用 Keras 中預約義的經典卷積神經網絡結構
model = tf.keras.applications.MobileNetV2() 實例化一個  網絡結構

MobileNetV2

共通的經常使用參數以下:

  • input_shape :輸入張量的形狀(不含第一維的 Batch),大多默認爲 224 × 224 × 3。通常而言,模型對輸入張量的大小有下限,長和寬至少爲 32 × 32 或 75 × 75 ;

  • include_top :在網絡的最後是否包含全鏈接層,默認爲 True ;

  • weights :預訓練權值,默認爲 'imagenet' ,即爲當前模型載入在 ImageNet 數據集上預訓練的權值。如需隨機初始化變量可設爲 None ;

  • classes :分類數,默認爲 1000。修改該參數須要 include_top 參數爲 True 且 weights 參數爲 None 。

 卷積示意圖

一個單通道的 7×7 圖像在經過一個感覺野爲 3×3 ,參數爲 10 個的卷積層神經元后,獲得 5×5 的矩陣

在 tf.keras.layers.Conv2D 中 padding 參數設爲 same 時,會將周圍缺乏的部分使用 0 補齊,使得輸出的矩陣大小和輸入一致。

經過 tf.keras.layers.Conv2D 的 strides 參數便可設置步長(默認爲 1)。好比,在上面的例子中,若是咱們將步長設定爲 2,輸出的卷積結果即會是一個 3×3 的矩陣。

 

池化層(Pooling Layer)的理解則簡單得多,其能夠理解爲對圖像進行降採樣的過程,對於每一次滑動窗口中的全部值,輸出其中的最大值(MaxPooling)、均值或其餘方法產生的值。

例如,對於一個三通道的 16×16 圖像(即一個 16*16*3 的張量),通過感覺野爲 2×2,滑動步長爲 2 的池化層,則獲得一個 8*8*3 的張量。

 

 

 

循環神經網絡(RNN)

適宜於處理序列數據的神經網絡,被普遍用於語言模型、文本生成、機器翻譯等

來看一下尼采風格文本的自動生成

class DataLoader():
    def __init__(self):
        path = tf.keras.utils.get_file('nietzsche.txt',
            origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')
        with open(path, encoding='utf-8') as f:
            self.raw_text = f.read().lower()
        self.chars = sorted(list(set(self.raw_text)))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        self.text = [self.char_indices[c] for c in self.raw_text]

    def get_batch(self, seq_length, batch_size):
        seq = []
        next_char = []
        for i in range(batch_size):
            index = np.random.randint(0, len(self.text) - seq_length)
            seq.append(self.text[index:index+seq_length])
            next_char.append(self.text[index+seq_length])
        return np.array(seq), np.array(next_char)       # [batch_size, seq_length], [num_batch]
讀取文本,並以字符爲單位進行編碼
字符種類數爲 num_chars ,則每種字符賦予一個 0 到 num_chars 1 之間的惟一整數編號 i
class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units=self.num_chars)

    def call(self, inputs, from_logits=False):
        inputs = tf.one_hot(inputs, depth=self.num_chars)       # [batch_size, seq_length, num_chars]
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)
        logits = self.dense(output)
        if from_logits:
            return logits
        else:
            return tf.nn.softmax(logits)
模型的實現
在 __init__ 方法中咱們實例化一個經常使用的 LSTMCell 單元,以及一個線性變換用的全鏈接層,咱們首先對序列進行 「One Hot」 操做,即將序列中的每一個字符的編碼 i 均變換爲一個 num_char 維向量,其第 i 位爲 1,其他均爲 0。變換後的序列張量形狀爲 [seq_length, num_chars] 。而後,咱們初始化 RNN 單元的狀態,存入變量 state 中。接下來,將序列從頭至尾依次送入 RNN 單元,即在 t 時刻,將上一個時刻 t-1 的 RNN 單元狀態 state 和序列的第 t 個元素 inputs[t, :] 送入 RNN 單元,獲得當前時刻的輸出 output 和 RNN 單元狀態。取 RNN 單元最後一次的輸出,經過全鏈接層變換到 num_chars 維,即做爲模型的輸出。

 

 

num_batches = 1000
seq_length = 40
batch_size = 50
learning_rate = 1e-3
定義一些模型超參數
data_loader = DataLoader()
    model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(seq_length, batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
模型訓練
    • 從 DataLoader 中隨機取一批訓練數據;

    • 將這批數據送入模型,計算出模型的預測值;

    • 將模型預測值與真實值進行比較,計算損失函數(loss);

    • 計算損失函數關於模型變量的導數;

    • 使用優化器更新模型參數以最小化損失函數。

 def predict(self, inputs, temperature=1.):
        batch_size, _ = tf.shape(inputs)
        logits = self(inputs, from_logits=True)
        prob = tf.nn.softmax(logits / temperature).numpy()
        return np.array([np.random.choice(self.num_chars, p=prob[i, :])
                         for i in range(batch_size.numpy())])

X_, _ = data_loader.get_batch(seq_length, 1)
    for diversity in [0.2, 0.5, 1.0, 1.2]:
        X = X_
        print("diversity %f:" % diversity)
        for t in range(400):
            y_pred = model.predict(X, diversity)
            print(data_loader.indices_char[y_pred[0]], end='', flush=True)
            X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
        print("\n")
連續預測獲得生成文本
使用 tf.argmax() 函數,將對應機率最大的值做爲預測值對於文本生成而言,這樣的預測方式過於絕對,會使得生成的文本失去豐富性。
使用 np.random.choice() 函數按照生成的機率分佈取樣。這樣,即便是對應機率較小的字符,也有機會被取樣到。
同時,加入一個 temperature 參數控制分佈的形狀,參數值越大則分佈越平緩(最大值和最小值的差值越小),生成文本的豐富度越高;參數值越小則分佈越陡峭,生成文本的豐富度越低。

 

 

 

 

 

 

深度強化學習(DRL)

強調如何基於環境而行動,以取得最大化的預期利益。

使用深度強化學習玩 CartPole(倒立擺)遊戲

import gym

env = gym.make('CartPole-v1')       # 實例化一個遊戲環境,參數爲遊戲名稱
state = env.reset()                 # 初始化環境,得到初始狀態
while True:
    env.render()                    # 對當前幀進行渲染,繪圖到屏幕
    action = model.predict(state)   # 假設咱們有一個訓練好的模型,可以經過當前狀態預測出這時應該進行的動做
    next_state, reward, done, info = env.step(action)   # 讓環境執行動做,得到執行完動做的下一個狀態,動做的獎勵,遊戲是否已結束以及額外信息
    if done:                        # 若是遊戲結束則退出循環
        break
Gym 的基本調用方法
import tensorflow as tf
import numpy as np
import gym
import random
from collections import deque

num_episodes = 500              # 遊戲訓練的總episode數量
num_exploration_episodes = 100  # 探索過程所佔的episode數量
max_len_episode = 1000          # 每一個episode的最大回合數
batch_size = 32                 # 批次大小
learning_rate = 1e-3            # 學習率
gamma = 1.                      # 折扣因子
initial_epsilon = 1.            # 探索起始時的探索率
final_epsilon = 0.01            # 探索終止時的探索率
定義一些模型超參數
class QNetwork(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense3 = tf.keras.layers.Dense(units=2)

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def predict(self, inputs):
        q_values = self(inputs)
        return tf.argmax(q_values, axis=-1)
tf.keras.Model 創建一個 Q 函數網絡(Q-network)
使用較簡單的多層全鏈接神經網絡進行擬合。該網絡輸入當前狀態,輸出各個動做下的 Q-value(CartPole 下爲 2 維,即向左和向右推進小車)。
if __name__ == '__main__':
    env = gym.make('CartPole-v1')       # 實例化一個遊戲環境,參數爲遊戲名稱
    model = QNetwork()
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    replay_buffer = deque(maxlen=10000) # 使用一個 deque 做爲 Q Learning 的經驗回放池
    epsilon = initial_epsilon
    for episode_id in range(num_episodes):
        state = env.reset()             # 初始化環境,得到初始狀態
        epsilon = max(                  # 計算當前探索率
            initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
            final_epsilon)
        for t in range(max_len_episode):
            env.render()                                # 對當前幀進行渲染,繪圖到屏幕
            if random.random() < epsilon:               # epsilon-greedy 探索策略,以 epsilon 的機率選擇隨機動做
                action = env.action_space.sample()      # 選擇隨機動做(探索)
            else:
                action = model.predict(np.expand_dims(state, axis=0)).numpy()   # 選擇模型計算出的 Q Value 最大的動做
                action = action[0]

            # 讓環境執行動做,得到執行完動做的下一個狀態,動做的獎勵,遊戲是否已結束以及額外信息
            next_state, reward, done, info = env.step(action)
            # 若是遊戲Game Over,給予大的負獎勵
            reward = -10. if done else reward
            # 將(state, action, reward, next_state)的四元組(外加 done 標籤表示是否結束)放入經驗回放池
            replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
            # 更新當前 state
            state = next_state

            if done:                                    # 遊戲結束則退出本輪循環,進行下一個 episode
                print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
                break

            if len(replay_buffer) >= batch_size:
                # 從經驗回放池中隨機取一個批次的四元組,並分別轉換爲 NumPy 數組
                batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
                    *random.sample(replay_buffer, batch_size))
                batch_state, batch_reward, batch_next_state, batch_done = \
                    [np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
                batch_action = np.array(batch_action, dtype=np.int32)

                q_value = model(batch_next_state)
                y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done)  # 計算 y 值
                with tf.GradientTape() as tape:
                    loss = tf.keras.losses.mean_squared_error(  # 最小化 y 和 Q-value 的距離
                        y_true=y,
                        y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
                    )
                grads = tape.gradient(loss, model.variables)
                optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))       # 計算梯度並更新參數
主程序中實現 Q Learning 算法
將上面的 QNetwork 更換爲 CNN 網絡,並對狀態作一些修改,便可用於玩一些簡單的視頻遊戲

 

 以上示例均使用了 Keras 的 Subclassing API 創建模型,即對 tf.keras.Model 類進行擴展以定義本身的新模型,同時手工編寫了訓練和評估模型的流程。

 

Keras Pipeline *

只須要創建一個結構相對簡單和典型的神經網絡(好比上文中的 MLP 和 CNN),並使用常規的手段進行訓練。這時,Keras 也給咱們提供了另外一套更爲簡單高效的內置方法來創建、訓練和評估模型。

model = tf.keras.models.Sequential([
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(100, activation=tf.nn.relu),
            tf.keras.layers.Dense(10),
            tf.keras.layers.Softmax()
        ])
Keras Sequential/Functional API 模式創建模型
經過向 tf.keras.models.Sequential() 提供一個層的列表,就能快速地創建一個 tf.keras.Model 模型並返回
 
 
這種層疊結構並不能表示任意的神經網絡結構,Functional API,幫助咱們創建更爲複雜的模型
inputs = tf.keras.Input(shape=(28, 28, 1))
        x = tf.keras.layers.Flatten()(inputs)
        x = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)(x)
        x = tf.keras.layers.Dense(units=10)(x)
        outputs = tf.keras.layers.Softmax()(x)
        model = tf.keras.Model(inputs=inputs, outputs=outputs)
創建更爲複雜的模型
使用方法是將層做爲可調用的對象並返回張量(這點與以前章節的使用方法一致),並將輸入向量和輸出向量提供給 tf.keras.Model 的 inputs 和 outputs 參數
model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )
模型創建完成後,經過 tf.keras.Model 的 compile 方法配置訓練過程

接受 3 個重要的參數:

  • oplimizer :優化器,可從 tf.keras.optimizers 中選擇;

  • loss :損失函數,可從 tf.keras.losses 中選擇;

  • metrics :評估指標,可從 tf.keras.metrics 中選擇

model.fit(data_loader.train_data, data_loader.train_label, epochs=num_epochs, batch_size=batch_size)
使用 tf.keras.Model 的 fit 方法訓練模型

接受 5 個重要的參數:

  • x :訓練數據;

  • y :目標數據(數據標籤);

  • epochs :將訓練數據迭代多少遍;

  • batch_size :批次的大小;

  • validation_data :驗證數據,可用於在訓練過程當中監控模型的性能。

print(model.evaluate(data_loader.test_data, data_loader.test_label))
使用 tf.keras.Model.evaluate 評估訓練效果
提供測試數據及標籤便可

 

若是現有的這些層沒法知足個人要求,我須要定義本身的層怎麼辦?

事實上,不只能夠繼承 tf.keras.Model 編寫本身的模型類,也能夠繼承 tf.keras.layers.Layer 編寫本身的層。

class MyLayer(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
        # 初始化代碼

    def build(self, input_shape):     # input_shape 是一個 TensorShape 類型對象,提供輸入的形狀
        # 在第一次使用該層的時候調用該部分代碼,在這裏建立變量可使得變量的形狀自適應輸入的形狀
        # 而不須要使用者額外指定變量形狀。
        # 若是已經能夠徹底肯定變量的形狀,也能夠在__init__部分建立變量
        self.variable_0 = self.add_weight(...)
        self.variable_1 = self.add_weight(...)

    def call(self, inputs):
        # 模型調用的代碼(處理輸入並返回輸出)
        return output
自定義層
須要繼承 tf.keras.layers.Layer 類,並重寫 __init__ 、 build 和 call 三個方法
 
事例
class LinearLayer(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.units = units

    def build(self, input_shape):     # 這裏 input_shape 是第一次運行call()時參數inputs的形狀
        self.w = self.add_variable(name='w',
            shape=[input_shape[-1], self.units], initializer=tf.zeros_initializer())
        self.b = self.add_variable(name='b',
            shape=[self.units], initializer=tf.zeros_initializer())

    def call(self, inputs):
        y_pred = tf.matmul(inputs, self.w) + self.b
        return y_pred
本身實現一個前文提到的的全鏈接層( tf.keras.layers.Dense )
在 build 方法中建立兩個變量,並在 call 方法中使用建立的變量進行運算
class LinearModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.layer = LinearLayer(units=1)

    def call(self, inputs):
        output = self.layer(inputs)
        return output
定義模型的時候,咱們即可以如同 Keras 中的其餘層同樣,調用咱們自定義的層 LinearLayer

 

class MeanSquaredError(tf.keras.losses.Loss):
    def call(self, y_true, y_pred):
        return tf.reduce_mean(tf.square(y_pred - y_true))
自定義損失函數
須要繼承 tf.keras.losses.Loss 類,重寫 call 方法便可,輸入真實值 y_true 和模型預測值 y_pred ,輸出模型預測值和真實值之間經過自定義的損失函數計算出的損失值。上面的示例爲均方差損失函數

 

class SparseCategoricalAccuracy(tf.keras.metrics.Metric):
    def __init__(self):
        super().__init__()
        self.total = self.add_weight(name='total', dtype=tf.int32, initializer=tf.zeros_initializer())
        self.count = self.add_weight(name='count', dtype=tf.int32, initializer=tf.zeros_initializer())

    def update_state(self, y_true, y_pred, sample_weight=None):
        values = tf.cast(tf.equal(y_true, tf.argmax(y_pred, axis=-1, output_type=tf.int32)), tf.int32)
        self.total.assign_add(tf.shape(y_true)[0])
        self.count.assign_add(tf.reduce_sum(values))

    def result(self):
        return self.count / self.total
自定義評估指標
須要繼承 tf.keras.metrics.Metric 類,並重寫 __init__ 、 update_state 和 result 三個方法。上面的示例對前面用到的 SparseCategoricalAccuracy評估指標類作了一個簡單的重實現:

 

下一篇:

三分鐘快速上手TensorFlow 2.0 (中)——經常使用模塊和模型的部署

相關文章
相關標籤/搜索