Microsoft Cognitive Toolkit 简明教程

CNTK - Out-of-Memory Datasets

在本章中,将阐明如何测量超出内存数据集的性能。

In this chapter, how to measure performance of out-of-memory datasets will be explained.

在前几部分中,我们已经讨论了验证我们 NN 性能的各种方法,但我们讨论的方法都是针对可以放入内存的数据集。

In previous sections, we have discussed about various methods to validate the performance of our NN, but the methods we have discussed, are ones that deals with the datasets that fit in the memory.

这里就会产生一个问题,超出内存的数据集怎么办,因为在生产场景中,我们需要大量数据来训练 NN 。在本部分中,我们将讨论在使用 Minibatch 群集和手动 Minibatch 循环后如何测量性能。

Here, the question arises what about out-of-memory datasets, because in production scenario, we need a lot of data to train NN. In this section, we are going to discuss how to measure performance when working with minibatch sources and manual minibatch loop.

Minibatch sources

在使用超出内存数据集(即 Minibatch 群集)时,我们需要设置一个与处理小型数据集(即内存中数据集)时不同的损失函数和度量值。首先,我们将了解如何设置一种方法来向 NN 模型的训练器输入数据。

While working with out-of-memory dataset, i.e. minibatch sources, we need slightly different setup for loss, as well as metric, than the setup we used while working with small datasets i.e. in-memory datasets. First, we will see how to set up a way to feed data to the trainer of NN model.

以下为实现步骤 −

Following are the implementation steps−

Step 1 − 首先,从 *cntk.*io 模块导入用于创建 Minibatch 群集的组件,如下所示 −

Step 1 − First, from *cntk.*io module import the components for creating the minibatch source as follows−

from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer,
 INFINITY_REPEAT

Step 2 − 接下来,创建一个名为 create_datasource 的新函数。此函数有 2 个参数,即 filename 和 limit,默认值为 INFINITELY_REPEAT

Step 2 − Next, create a new function named say create_datasource. This function will have two parameters namely filename and limit, with a default value of INFINITELY_REPEAT.

def create_datasource(filename, limit =INFINITELY_REPEAT)

Step 3 − 现在,在函数中,使用 StreamDef 类为标签创建流定义,读取具有 3 个特征的标签字段。我们还需要将 is_sparse 设置为 False ,如下所示 −

Step 3 − Now, within the function, by using StreamDef class crate a stream definition for the labels that reads from the labels field that has three features. We also need to set is_sparse to False as follows−

labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)

Step 4 − 接下来,创建用于从输入文件中读取特征字段的 StreamDef 的另一个实例,如下所示。

Step 4 − Next, create to read the features filed from the input file, create another instance of StreamDef as follows.

feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)

Step 5 − 现在,初始化 CTFDeserializer 实例类。指定我们需要反序列化的 filename 和流,如下所示 −

Step 5 − Now, initialise the CTFDeserializer instance class. Specify the filename and streams that we need to deserialize as follows −

deserializer = CTFDeserializer(filename, StreamDefs(labels=
label_stream, features=features_stream)

Step 6 − 接下来,我们需要使用反序列化器创建 minisourceBatch 的实例,如下所示 −

Step 6 − Next, we need to create instance of minisourceBatch by using deserializer as follows −

Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit)
return minibatch_source

Step 7 − 最后,我们需要提供我们在前几部分创建的训练和测试源。我们正在使用鸢尾属数据集。

Step 7 − At last, we need to provide training and testing source, which we created in previous sections also. We are using iris flower dataset.

training_source = create_datasource(‘Iris_train.ctf’)
test_source = create_datasource(‘Iris_test.ctf’, limit=1)

创建了 MinibatchSource 实例后,就需要对它进行训练了。我们可以使用与处理小型内存中数据集时相同的训练逻辑。在这里,我们将使用 MinibatchSource 实例作为损失函数训练方法的输入,如下所示 −

Once you create MinibatchSource instance, we need to train it. We can use the same training logic, as used when we worked with small in-memory datasets. Here, we will use MinibatchSource instance, as the input for the train method on loss function as follows −

以下为实现步骤 −

Following are the implementation steps−

Step 1 − 为了记录训练会话的输出,首先需要从 cntk.logging 模块中导入 ProgressPrinter ,如下所示 −

Step 1 − In order to log the output of the training session, first import the ProgressPrinter from cntk.logging module as follows −

from cntk.logging import ProgressPrinter

Step 2 − 接下來,要設定訓練階段,匯入 trainertraining_session *from *cntk.train 模組如下 −

Step 2 − Next, to set up the training session, import the trainer and training_session *from *cntk.train module as follows−

from cntk.train import Trainer, training_session

Step 3 − 現在,我們需要定義一些常數組,如下列 minibatch_sizesamples_per_epochnum_epochs

Step 3 − Now, we need to define some set of constants like minibatch_size, samples_per_epoch and num_epochs as follows−

minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30
max_samples = samples_per_epoch * num_epochs

Step 4 − 接下來,為了瞭解如何在 CNTK 訓練期間讀取資料,我們需要定義網路輸入變數與 minibatch 來源中的串流之間的對應。

Step 4 − Next, in order to know how to read data during training in CNTK, we need to define a mapping between the input variable for the network and the streams in the minibatch source.

input_map = {
   features: training_source.streams.features,
   labels: training_source.streams.labels
}

Step 5 − 接著登入訓練程式的輸出,初始化 progress_printer *variable with a new *ProgressPrinter 實例。初始化 trainer 並提供模型如下 −

Step 5 − Next to log the output of the training process, initialize the progress_printer *variable with a new *ProgressPrinter instance. Also, initialize the trainer and provide it with the model as follows−

progress_writer = ProgressPrinter(0)
trainer: training_source.streams.labels

Step 6 − 最後,要開始訓練程式,我們需要呼叫 training_session 函式如下 −

Step 6 − At last, to start the training process, we need to invoke the training_session function as follows −

session = training_session(trainer,
   mb_source=training_source,
   mb_size=minibatch_size,
   model_inputs_to_streams=input_map,
   max_samples=max_samples,
   test_config=test_config)
session.train()

一旦訓練好模型,我們可透過 TestConfig 物件新增驗證至這個設定,並將它指派給 train_session 函式的 test_config 關鍵字引數。

Once we trained the model, we can add validation to this setup by using a TestConfig object and assign it to the test_config keyword argument of the train_session function.

以下为实现步骤 −

Following are the implementation steps−

Step 1 − 首先,我們需要從 cntk.train 模組中匯入 TestConfig 類別如下 −

Step 1 − First, we need to import the TestConfig class from the module cntk.train as follows−

from cntk.train import TestConfig

Step 2 − 現在,我們需要使用 test_source 作為輸入,建立一個新的 TestConfig 實例 −

Step 2 − Now, we need to create a new instance of the TestConfig with the test_source as input−

Test_config = TestConfig(test_source)

Complete Example

from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT
def create_datasource(filename, limit =INFINITELY_REPEAT)
labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
deserializer = CTFDeserializer(filename, StreamDefs(labels=label_stream, features=features_stream)
Minibatch_source = MinibatchSource(deserializer, randomize=True, max_sweeps=limit)
return minibatch_source
training_source = create_datasource(‘Iris_train.ctf’)
test_source = create_datasource(‘Iris_test.ctf’, limit=1)
from cntk.logging import ProgressPrinter
from cntk.train import Trainer, training_session
minbatch_size = 16
samples_per_epoch = 150
num_epochs = 30
max_samples = samples_per_epoch * num_epochs
input_map = {
   features:   training_source.streams.features,
   labels: training_source.streams.labels
 }
progress_writer = ProgressPrinter(0)
trainer: training_source.streams.labels
session = training_session(trainer,
   mb_source=training_source,
   mb_size=minibatch_size,
   model_inputs_to_streams=input_map,
   max_samples=max_samples,
   test_config=test_config)
session.train()
from cntk.train import TestConfig
Test_config = TestConfig(test_source)

Output

-------------------------------------------------------------------
average   since   average   since  examples
loss      last    metric    last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.57      1.57     0.214    0.214   16
1.38      1.28     0.264    0.289   48
[………]
Finished Evaluation [1]: Minibatch[1-1]:metric = 69.65*30;

Manual minibatch loop

如同我們上面看到的,透過 CNTK 以一般 API 進行訓練時,利用量測值,可以在訓練期間及之後輕鬆量測神經網路模型的執行效能。但是,另一方面,使用手動 minibatch 迴圈時,事情就不是那麼容易。

As we see above, it is easy to measure the performance of our NN model during and after training, by using the metrics when training with regular APIs in CNTK. But, on the other side, things will not be that easy while working with a manual minibatch loop.

在這裡,我們使用下面這個具有 4 個輸入及 3 個輸出,也在前一段建立的鳶尾花資料集的模型 −

Here, we are using the model given below with 4 inputs and 3 outputs from Iris Flower dataset, created in previous sections too−

from cntk import default_options, input_variable
from cntk.layers import Dense, Sequential
from cntk.ops import log_softmax, relu, sigmoid
from cntk.learners import sgd
model = Sequential([
   Dense(4, activation=sigmoid),
   Dense(3, activation=log_softmax)
])
features = input_variable(4)
labels = input_variable(3)
z = model(features)

接著,定義模型的損失,為交叉熵損失函式及在先前部分中使用的 F 量測值的組合。我們將使用 criterion_factory 工具程式,建立為 CNTK 函式物件,如下所示 −

Next, the loss for the model is defined as the combination of the cross-entropy loss function, and the F-measure metric as used in previous sections. We are going to use the criterion_factory utility, to create this as a CNTK function object as shown below−

import cntk
from cntk.losses import cross_entropy_with_softmax, fmeasure
@cntk.Function
def criterion_factory(outputs, targets):
   loss = cross_entropy_with_softmax(outputs, targets)
   metric = fmeasure(outputs, targets, beta=1)
   return loss, metric
loss = criterion_factory(z, labels)
learner = sgd(z.parameters, 0.1)
label_mapping = {
   'Iris-setosa': 0,
   'Iris-versicolor': 1,
   'Iris-virginica': 2
}

現在,由於我們定義了損失函式,我們將瞭解如何將它用於訓練器中,設定手動訓練階段。

Now, as we have defined the loss function, we will see how we can use it in the trainer, to set up a manual training session.

以下是執行步驟 −

Following are the implementation steps −

Step 1 − 首先,我們需要匯入必要的套件,例如 numpypandas ,以載入及預處理資料。

Step 1 − First, we need to import the required packages like numpy and pandas to load and preprocess the data.

import pandas as pd
import numpy as np

Step 2 − 接下來,為了登入訓練期間的資訊,匯入 ProgressPrinter 類別如下 −

Step 2 − Next, in order to log information during training, import the ProgressPrinter class as follows−

from cntk.logging import ProgressPrinter

Step 3 − 然後,需要從 cntk.train 模組匯入訓練器模組如下 −

Step 3 − Then, we need to import the trainer module from cntk.train module as follows −

from cntk.train import Trainer

Step 4 − 接著,建立 ProgressPrinter 的新實例如下 −

Step 4 − Next, create a new instance of ProgressPrinter as follows −

progress_writer = ProgressPrinter(0)

Step 5 − 現在,我們需要使用損失、學習器及 progress_writer 等參數,來初始化訓練器如下 −

Step 5 − Now, we need to initialise trainer with the parameters the loss, the learner and the progress_writer as follows −

trainer = Trainer(z, loss, learner, progress_writer)

Step 6 − 接下來,為了訓練模型,我們將建立一個迴圈,迴圈將會對資料集重複 30 次。這將會是外部訓練迴圈。

Step 6 −Next, in order to train the model, we will create a loop that will iterate over the dataset thirty times. This will be the outer training loop.

for _ in range(0,30):

Step 7 - 现在,我们需要使用 pandas 从磁盘中加载数据。然后,为了加载 mini-batches 中的数据集,将 chunksize 关键字参数设为 16。

Step 7 − Now, we need to load the data from disk using pandas. Then, in order to load the dataset in mini-batches, set the chunksize keyword argument to 16.

input_data = pd.read_csv('iris.csv',
names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
index_col=False, chunksize=16)

Step 8 - 现在,创建一个内部训练循环以遍历每个 mini-batches

Step 8 − Now, create an inner training for loop to iterate over each of the mini-batches.

for df_batch in input_data:

Step 9 - 现在在这个循环内,使用 iloc *indexer, as the *features 读取前四列以进行训练,并将它们转换为 float32 −

Step 9 − Now inside this loop, read the first four columns using the iloc *indexer, as the *features to train from and convert them to float32 −

feature_values = df_batch.iloc[:,:4].values
feature_values = feature_values.astype(np.float32)

Step 10 - 现在,按照如下方式读取最后一列作为训练标签 −

Step 10 − Now, read the last column as the labels to train from, as follows −

label_values = df_batch.iloc[:,-1]

Step 11 - 接下来的,我们将使用独热向量将标签字符串转换为它们的数字演示,如下所示 −

Step 11 − Next, we will use one-hot vectors to convert the label strings to their numeric presentation as follows −

label_values = label_values.map(lambda x: label_mapping[x])

Step 12 - 在此之后,获得标签的数字演示。然后,将它们转换为 numpy 数组,这样可以更轻松地使用它们,如下所示 −

Step 12 − After that, take the numeric presentation of the labels. Next, convert them to a numpy array, so it is easier to work with them as follows −

label_values = label_values.values

Step 13 - 现在,我们需要创建一个新的 numpy 数组,它与我们转换的标签值具有相同数量的行。

Step 13 − Now, we need to create a new numpy array that has the same number of rows as the label values that we have converted.

encoded_labels = np.zeros((label_values.shape[0], 3))

Step 14 - 现在,为了创建独热编码的标签,基于数字标签值选择列。

Step 14 − Now, in order to create one-hot encoded labels, select the columns based on the numeric label values.

encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.

Step 15 - 最后,我们需要在训练器上调用 train_minibatch 方法,并为小批量提供已处理的特征和标签。

Step 15 − At last, we need to invoke the train_minibatch method on the trainer and provide the processed features and labels for the minibatch.

trainer.train_minibatch({features: feature_values, labels: encoded_labels})

Complete Example

from cntk import default_options, input_variable
from cntk.layers import Dense, Sequential
from cntk.ops import log_softmax, relu, sigmoid
from cntk.learners import sgd
model = Sequential([
   Dense(4, activation=sigmoid),
   Dense(3, activation=log_softmax)
])
features = input_variable(4)
labels = input_variable(3)
z = model(features)
import cntk
from cntk.losses import cross_entropy_with_softmax, fmeasure
@cntk.Function
def criterion_factory(outputs, targets):
   loss = cross_entropy_with_softmax(outputs, targets)
   metric = fmeasure(outputs, targets, beta=1)
   return loss, metric
loss = criterion_factory(z, labels)
learner = sgd(z.parameters, 0.1)
label_mapping = {
   'Iris-setosa': 0,
   'Iris-versicolor': 1,
   'Iris-virginica': 2
}
import pandas as pd
import numpy as np
from cntk.logging import ProgressPrinter
from cntk.train import Trainer
progress_writer = ProgressPrinter(0)
trainer = Trainer(z, loss, learner, progress_writer)
for _ in range(0,30):
   input_data = pd.read_csv('iris.csv',
      names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
      index_col=False, chunksize=16)
for df_batch in input_data:
   feature_values = df_batch.iloc[:,:4].values
   feature_values = feature_values.astype(np.float32)
   label_values = df_batch.iloc[:,-1]
label_values = label_values.map(lambda x: label_mapping[x])
label_values = label_values.values
   encoded_labels = np.zeros((label_values.shape[0], 3))
   encoded_labels[np.arange(label_values.shape[0]),
label_values] = 1.
   trainer.train_minibatch({features: feature_values, labels: encoded_labels})

Output

-------------------------------------------------------------------
average    since    average   since  examples
loss       last      metric   last
------------------------------------------------------
Learning rate per minibatch: 0.1
1.45       1.45     -0.189    -0.189   16
1.24       1.13     -0.0382    0.0371  48
[………]

在以上输出中,我们获得了损失和训练期间指标的输出。这是因为我们在一个 function 对象中合并了一个指标和损失,并在训练器配置中使用了进度打印机。

In the above output, we got both the output for the loss and the metric during training. It is because we combined a metric and loss in a function object and used a progress printer in the trainer configuration.

现在,为了评估模型性能,我们需要执行与训练模型相同的任务,但这次,我们需要使用 Evaluator 实例来测试模型。在下面的 Python 代码中展示了这一点−

Now, in order to evaluate the model performance, we need to perform same task as with training the model, but this time, we need to use an Evaluator instance to test the model. It is shown in the following Python code−

from cntk import Evaluator
evaluator = Evaluator(loss.outputs[1], [progress_writer])
input_data = pd.read_csv('iris.csv',
   names=['sepal_length', 'sepal_width','petal_length','petal_width', 'species'],
index_col=False, chunksize=16)
for df_batch in input_data:
   feature_values = df_batch.iloc[:,:4].values
   feature_values = feature_values.astype(np.float32)
   label_values = df_batch.iloc[:,-1]
   label_values = label_values.map(lambda x: label_mapping[x])
   label_values = label_values.values
   encoded_labels = np.zeros((label_values.shape[0], 3))
   encoded_labels[np.arange(label_values.shape[0]), label_values] = 1.
   evaluator.test_minibatch({ features: feature_values, labels:
      encoded_labels})
evaluator.summarize_test_progress()

现在,我们将获得类似于以下内容的输出−

Now, we will get the output something like the following−

Output

Finished Evaluation [1]: Minibatch[1-11]:metric = 74.62*143;