Microsoft Cognitive Toolkit 简明教程
CNTK - Out-of-Memory Datasets
在本章中,将阐明如何测量超出内存数据集的性能。
在前几部分中,我们已经讨论了验证我们 NN 性能的各种方法,但我们讨论的方法都是针对可以放入内存的数据集。
这里就会产生一个问题,超出内存的数据集怎么办,因为在生产场景中,我们需要大量数据来训练 NN 。在本部分中,我们将讨论在使用 Minibatch 群集和手动 Minibatch 循环后如何测量性能。
Minibatch sources
在使用超出内存数据集(即 Minibatch 群集)时,我们需要设置一个与处理小型数据集(即内存中数据集)时不同的损失函数和度量值。首先,我们将了解如何设置一种方法来向 NN 模型的训练器输入数据。
以下为实现步骤 −
Step 1 − 首先,从 *cntk.*io 模块导入用于创建 Minibatch 群集的组件,如下所示 −
from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer,
INFINITY_REPEAT
Step 2 − 接下来,创建一个名为 create_datasource 的新函数。此函数有 2 个参数,即 filename 和 limit,默认值为 INFINITELY_REPEAT 。
def create_datasource(filename, limit =INFINITELY_REPEAT)
Step 3 − 现在,在函数中,使用 StreamDef 类为标签创建流定义,读取具有 3 个特征的标签字段。我们还需要将 is_sparse 设置为 False ,如下所示 −
labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
Step 4 − 接下来,创建用于从输入文件中读取特征字段的 StreamDef 的另一个实例,如下所示。
feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
Step 5 − 现在,初始化 CTFDeserializer 实例类。指定我们需要反序列化的 filename 和流,如下所示 −
deserializer = CTFDeserializer(filename, StreamDefs(labels=
label_stream, features=features_stream)
Step 6 − 接下来,我们需要使用反序列化器创建 minisourceBatch 的实例,如下所示 −
Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit)
return minibatch_source
Step 7 − 最后,我们需要提供我们在前几部分创建的训练和测试源。我们正在使用鸢尾属数据集。
training_source = create_datasource(‘Iris_train.ctf’)
test_source = create_datasource(‘Iris_test.ctf’, limit=1)
创建了 MinibatchSource 实例后,就需要对它进行训练了。我们可以使用与处理小型内存中数据集时相同的训练逻辑。在这里,我们将使用 MinibatchSource 实例作为损失函数训练方法的输入,如下所示 −
以下为实现步骤 −
Step 1 − 为了记录训练会话的输出,首先需要从 cntk.logging 模块中导入 ProgressPrinter ,如下所示 −
from cntk.logging import ProgressPrinter
Step 2 − 接下來,要設定訓練階段,匯入 trainer 及 training_session *from *cntk.train 模組如下 −
from cntk.train import Trainer, training_session
Step 3 − 現在,我們需要定義一些常數組,如下列 minibatch_size 、 samples_per_epoch 及 num_epochs −
minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30
max_samples = samples_per_epoch * num_epochs
Step 4 − 接下來,為了瞭解如何在 CNTK 訓練期間讀取資料,我們需要定義網路輸入變數與 minibatch 來源中的串流之間的對應。
input_map = {
features: training_source.streams.features,
labels: training_source.streams.labels
}
Step 5 − 接著登入訓練程式的輸出,初始化 progress_printer *variable with a new *ProgressPrinter 實例。初始化 trainer 並提供模型如下 −
progress_writer = ProgressPrinter(0)
trainer: training_source.streams.labels
Step 6 − 最後,要開始訓練程式,我們需要呼叫 training_session 函式如下 −
session = training_session(trainer,
mb_source=training_source,
mb_size=minibatch_size,
model_inputs_to_streams=input_map,
max_samples=max_samples,
test_config=test_config)
session.train()
一旦訓練好模型,我們可透過 TestConfig 物件新增驗證至這個設定,並將它指派給 train_session 函式的 test_config 關鍵字引數。
以下为实现步骤 −
Step 1 − 首先,我們需要從 cntk.train 模組中匯入 TestConfig 類別如下 −
from cntk.train import TestConfig
Step 2 − 現在,我們需要使用 test_source 作為輸入,建立一個新的 TestConfig 實例 −
Test_config = TestConfig(test_source)
Complete Example
from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT
def create_datasource(filename, limit =INFINITELY_REPEAT)
labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
deserializer = CTFDeserializer(filename, StreamDefs(labels=label_stream, features=features_stream)
Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit)
return minibatch_source
training_source = create_datasource(‘Iris_train.ctf’)
test_source = create_datasource(‘Iris_test.ctf’, limit=1)
from cntk.logging import ProgressPrinter
from cntk.train import Trainer, training_session
minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30
max_samples = samples_per_epoch * num_epochs
input_map = {
features: training_source.streams.features,
labels: training_source.streams.labels
}
progress_writer = ProgressPrinter(0)
trainer: training_source.streams.labels
session = training_session(trainer,
mb_source=training_source,
mb_size=minibatch_size,
model_inputs_to_streams=input_map,
max_samples=max_samples,
test_config=test_config)
session.train()
from cntk.train import TestConfig
Test_config = TestConfig(test_source)
Output
-------------------------------------------------------------------
average since average since examples
loss last metric last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.57 1.57 0.214 0.214 16
1.38 1.28 0.264 0.289 48
[………]
Finished Evaluation [1]: Minibatch[1-1]:metric = 69.65*30;
Manual minibatch loop
如同我們上面看到的,透過 CNTK 以一般 API 進行訓練時,利用量測值,可以在訓練期間及之後輕鬆量測神經網路模型的執行效能。但是,另一方面,使用手動 minibatch 迴圈時,事情就不是那麼容易。
在這裡,我們使用下面這個具有 4 個輸入及 3 個輸出,也在前一段建立的鳶尾花資料集的模型 −
from cntk import default_options, input_variable
from cntk.layers import Dense, Sequential
from cntk.ops import log_softmax, relu, sigmoid
from cntk.learners import sgd
model = Sequential([
Dense(4, activation=sigmoid),
Dense(3, activation=log_softmax)
])
features = input_variable(4)
labels = input_variable(3)
z = model(features)
接著,定義模型的損失,為交叉熵損失函式及在先前部分中使用的 F 量測值的組合。我們將使用 criterion_factory 工具程式,建立為 CNTK 函式物件,如下所示 −
import cntk
from cntk.losses import cross_entropy_with_softmax, fmeasure
@cntk.Function
def criterion_factory(outputs, targets):
loss = cross_entropy_with_softmax(outputs, targets)
metric = fmeasure(outputs, targets, beta=1)
return loss, metric
loss = criterion_factory(z, labels)
learner = sgd(z.parameters, 0.1)
label_mapping = {
'Iris-setosa': 0,
'Iris-versicolor': 1,
'Iris-virginica': 2
}
現在,由於我們定義了損失函式,我們將瞭解如何將它用於訓練器中,設定手動訓練階段。
以下是執行步驟 −
Step 1 − 首先,我們需要匯入必要的套件,例如 numpy 及 pandas ,以載入及預處理資料。
import pandas as pd
import numpy as np
Step 2 − 接下來,為了登入訓練期間的資訊,匯入 ProgressPrinter 類別如下 −
from cntk.logging import ProgressPrinter
Step 3 − 然後,需要從 cntk.train 模組匯入訓練器模組如下 −
from cntk.train import Trainer
Step 4 − 接著,建立 ProgressPrinter 的新實例如下 −
progress_writer = ProgressPrinter(0)
Step 5 − 現在,我們需要使用損失、學習器及 progress_writer 等參數,來初始化訓練器如下 −
trainer = Trainer(z, loss, learner, progress_writer)
Step 6 − 接下來,為了訓練模型,我們將建立一個迴圈,迴圈將會對資料集重複 30 次。這將會是外部訓練迴圈。
for _ in range(0,30):
Step 7 - 现在,我们需要使用 pandas 从磁盘中加载数据。然后,为了加载 mini-batches 中的数据集,将 chunksize 关键字参数设为 16。
input_data = pd.read_csv('iris.csv',
names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
index_col=False, chunksize=16)
Step 8 - 现在,创建一个内部训练循环以遍历每个 mini-batches 。
for df_batch in input_data:
Step 9 - 现在在这个循环内,使用 iloc *indexer, as the *features 读取前四列以进行训练,并将它们转换为 float32 −
feature_values = df_batch.iloc[:,:4].values
feature_values = feature_values.astype(np.float32)
Step 10 - 现在,按照如下方式读取最后一列作为训练标签 −
label_values = df_batch.iloc[:,-1]
Step 11 - 接下来的,我们将使用独热向量将标签字符串转换为它们的数字演示,如下所示 −
label_values = label_values.map(lambda x: label_mapping[x])
Step 12 - 在此之后,获得标签的数字演示。然后,将它们转换为 numpy 数组,这样可以更轻松地使用它们,如下所示 −
label_values = label_values.values
Step 13 - 现在,我们需要创建一个新的 numpy 数组,它与我们转换的标签值具有相同数量的行。
encoded_labels = np.zeros((label_values.shape[0], 3))
Step 14 - 现在,为了创建独热编码的标签,基于数字标签值选择列。
encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.
Step 15 - 最后,我们需要在训练器上调用 train_minibatch 方法,并为小批量提供已处理的特征和标签。
trainer.train_minibatch({features: feature_values, labels: encoded_labels})
Complete Example
from cntk import default_options, input_variable
from cntk.layers import Dense, Sequential
from cntk.ops import log_softmax, relu, sigmoid
from cntk.learners import sgd
model = Sequential([
Dense(4, activation=sigmoid),
Dense(3, activation=log_softmax)
])
features = input_variable(4)
labels = input_variable(3)
z = model(features)
import cntk
from cntk.losses import cross_entropy_with_softmax, fmeasure
@cntk.Function
def criterion_factory(outputs, targets):
loss = cross_entropy_with_softmax(outputs, targets)
metric = fmeasure(outputs, targets, beta=1)
return loss, metric
loss = criterion_factory(z, labels)
learner = sgd(z.parameters, 0.1)
label_mapping = {
'Iris-setosa': 0,
'Iris-versicolor': 1,
'Iris-virginica': 2
}
import pandas as pd
import numpy as np
from cntk.logging import ProgressPrinter
from cntk.train import Trainer
progress_writer = ProgressPrinter(0)
trainer = Trainer(z, loss, learner, progress_writer)
for _ in range(0,30):
input_data = pd.read_csv('iris.csv',
names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
index_col=False, chunksize=16)
for df_batch in input_data:
feature_values = df_batch.iloc[:,:4].values
feature_values = feature_values.astype(np.float32)
label_values = df_batch.iloc[:,-1]
label_values = label_values.map(lambda x: label_mapping[x])
label_values = label_values.values
encoded_labels = np.zeros((label_values.shape[0], 3))
encoded_labels[np.arange(label_values.shape[0]),
label_values] = 1.
trainer.train_minibatch({features: feature_values, labels: encoded_labels})
Output
-------------------------------------------------------------------
average since average since examples
loss last metric last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.45 1.45 -0.189 -0.189 16
1.24 1.13 -0.0382 0.0371 48
[………]
在以上输出中,我们获得了损失和训练期间指标的输出。这是因为我们在一个 function 对象中合并了一个指标和损失,并在训练器配置中使用了进度打印机。
现在,为了评估模型性能,我们需要执行与训练模型相同的任务,但这次,我们需要使用 Evaluator 实例来测试模型。在下面的 Python 代码中展示了这一点−
from cntk import Evaluator
evaluator = Evaluator(loss.outputs[1], [progress_writer])
input_data = pd.read_csv('iris.csv',
names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
index_col=False, chunksize=16)
for df_batch in input_data:
feature_values = df_batch.iloc[:,:4].values
feature_values = feature_values.astype(np.float32)
label_values = df_batch.iloc[:,-1]
label_values = label_values.map(lambda x: label_mapping[x])
label_values = label_values.values
encoded_labels = np.zeros((label_values.shape[0], 3))
encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.
evaluator.test_minibatch({ features: feature_values, labels:
encoded_labels})
evaluator.summarize_test_progress()
现在,我们将获得类似于以下内容的输出−