问题描述
我尝试优化我的数据输入管道.该数据集是一组 450 个 TFRecord 文件,每个文件大小约为 70MB,托管在 GCS 上.该作业使用 GCP ML Engine 执行.没有 GPU.
这是管道:
def build_dataset(file_pattern):返回 tf.data.Dataset.list_files(文件模式). 交错(tf.data.TFRecordDataset,num_parallel_calls=tf.data.experimental.AUTOTUNE).洗牌(缓冲区大小=2048).批(批量大小=2048,drop_remainder=真,).缓存().重复().地图(map_func=_parse_example_batch,num_parallel_calls=tf.data.experimental.AUTOTUNE). 预取(缓冲区大小=1)
使用映射函数:
def _bit_to_float(string_batch: tf.Tensor):返回 tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))), tf.float32), 2), (tf.shape(string_batch)[0], -1))def _parse_example_batch(example_batch):preprocessed_sample_columns = {功能":tf.io.VarLenFeature(tf.float32),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_example(example_batch, preprocessed_sample_columns)密集浮动 = tf.sparse.to_dense(样本[特征"])bits_to_float = _bit_to_float(samples["booleanFeatures"])返回 (tf.concat([dense_float, bits_to_float], 1),tf.reshape(samples["label"], (-1, 1)))
我尝试遵循数据管道教程的最佳实践,并将我的映射函数(根据 mrry 的建议).
在此设置下,虽然数据以高速下载(带宽约为 200MB/s),但 CPU 使用率不足 (14%),训练速度非常慢(一个 epoch 超过 1 小时).
>我尝试了一些参数配置,更改了 interleave()
参数,例如 num_parallel_calls
或 cycle_length
或 TFRecordDataset
类似 num_parallel_calls
的参数.
最快的配置使用这组参数:
interleave.num_parallel_calls
: 1interleave.cycle_length
:8TFRecordDataset.num_parallel_calls
:8
有了这个,一个 epoch 只需大约 20 分钟即可运行.然而,CPU 使用率仅为 50%,而带宽消耗约为 55MB/s
问题:
- 如何优化管道以达到 100% 的 CPU 使用率(以及大约 100MB/s 的带宽消耗)?
- 为什么
tf.data.experimental.AUTOTUNE
没有找到加速训练的最佳值?
亲切,亚历克西斯.
编辑
经过一些更多的实验,我得出了以下解决方案.
- 如果
num_parallel_calls
大于 0,则移除interleave
步骤,该步骤已经由TFRecordDataset
处理. - 更新映射函数只做
parse_example
和decode_raw
,返回一个元组`((, ), ()) cache
在map
之后- 将
_bit_to_float
函数作为模型的一个组件移动
最后,这里是数据管道代码:
def build_dataset(file_pattern):返回 tf.data.TFRecordDataset(tf.data.Dataset.list_files(file_pattern),num_parallel_reads=multiprocessing.cpu_count(),缓冲区大小=70*1000*1000).洗牌(缓冲区大小=2048).地图(map_func=split,num_parallel_calls=tf.data.experimental.AUTOTUNE).批(批量大小=2048,drop_remainder=真,).缓存().重复(). 预取(缓冲区大小=32)定义拆分(示例):preprocessed_sample_columns = {功能":tf.io.VarLenFeature(tf.float32),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_single_example(例如,preprocessed_sample_columns)密集浮点 = tf.sparse.to_dense(样本[特征"])bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)返回 ((dense_float, bits_to_float),tf.reshape(samples["label"], (1,)))def build_model(input_shape):特征 = keras.Input(shape=(N,))bool_feature = keras.Input(shape=(M,), dtype="uint8")one_hot = dataset._bit_to_float(bool_feature)密集输入 = tf.reshape(keras.backend.concatenate([feature, one_hot], 1),输入形状)输出 = 实际模型(密集输入)模型 = keras.Model([feature, bool_feature], 输出)回报模式def _bit_to_float(string_batch: tf.Tensor):返回 tf.dtypes.cast(tf.reshape(tf.bitwise.bitwise_and(tf.bitwise.right_shift(tf.expand_dims(string_batch, 2),tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),(1, 1, 8)),),tf.constant(0x01,dtype=tf.uint8)),(tf.shape(string_batch)[0], -1)), tf.float32)
感谢所有这些优化:
- 带宽消耗约为 90MB/s
- CPU 使用率约为 20%
- 第一个 epoch 花费 20 分钟
- 连续的 epoch 每个花费 5 分钟
所以这似乎是一个很好的第一次设置.但是 CPU 和 BW 仍然没有被过度使用,所以仍然欢迎任何建议!
编辑 Bis
因此,经过一些基准测试后,我发现了我认为最好的输入管道:
def build_dataset(file_pattern):tf.data.Dataset.list_files(文件模式). 交错(TFRecordDataset,cycle_length=tf.data.experimental.AUTOTUNE,num_parallel_calls=tf.data.experimental.AUTOTUNE).洗牌(2048).批(批量大小=64,drop_remainder=真,).地图(map_func=parse_examples_batch,num_parallel_calls=tf.data.experimental.AUTOTUNE).缓存(). 预取(tf.data.experimental.AUTOTUNE)def parse_examples_batch(示例):preprocessed_sample_columns = {功能":tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_example(examples, preprocessed_sample_columns)bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)返回 ((samples['features'], bits_to_float),tf.expand_dims(样本[标签"],1))
那么,有什么新鲜事:
- 根据这个GitHub issue,
TFRecordDataset
interleaving 是一个遗留的,所以interleave
功能更好. batch
在map
之前是一个好习惯(向量化您的函数) 并减少调用映射函数的次数.- 不再需要
repeat
.从 TF2.0 开始,Keras 模型 API 支持数据集 API 并且可以使用缓存(参见 SO 帖子) - 从
VarLenFeature
切换到FixedLenSequenceFeature
,删除对tf.sparse.to_dense
的无用调用.
希望这能有所帮助.仍然欢迎提供建议.
在回答部分提及解决方案和@AlexisBRENON 的重要观察,以造福社区.p>
下面提到的是重要的观察:
- 根据这个GitHub issue,
TFRecordDataset
interleaving
是遗留的,所以interleave
功能更好. batch
在map
之前是一个好习惯(向量化您的函数) 并减少调用映射函数的次数.- 不再需要
repeat
.从 TF2.0 开始,Keras 模型 API 支持数据集 API 并且可以使用缓存(参见 SO post) - 从
VarLenFeature
切换到FixedLenSequenceFeature
,删除对tf.sparse.to_dense
的无用调用.
流水线代码,具有改进的性能,符合上述观察结果如下:
def build_dataset(file_pattern):tf.data.Dataset.list_files(文件模式). 交错(TFRecordDataset,cycle_length=tf.data.experimental.AUTOTUNE,num_parallel_calls=tf.data.experimental.AUTOTUNE).洗牌(2048).批(批量大小=64,drop_remainder=真,).地图(map_func=parse_examples_batch,num_parallel_calls=tf.data.experimental.AUTOTUNE).缓存(). 预取(tf.data.experimental.AUTOTUNE)def parse_examples_batch(示例):preprocessed_sample_columns = {功能":tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),标签":tf.io.FixedLenFeature((), tf.float32, -1)}样本 = tf.io.parse_example(examples, preprocessed_sample_columns)bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)返回 ((samples['features'], bits_to_float),tf.expand_dims(样本[标签"],1))
I try to optimize my data input pipeline.The dataset is a set of 450 TFRecord files of size ~70MB each, hosted on GCS.The job is executed with GCP ML Engine. There is no GPU.
Here is the pipeline:
def build_dataset(file_pattern):
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=1
)
With the mapped function:
def _bit_to_float(string_batch: tf.Tensor):
return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
), tf.float32), 2), (tf.shape(string_batch)[0], -1))
def _parse_example_batch(example_batch):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = _bit_to_float(samples["booleanFeatures"])
return (
tf.concat([dense_float, bits_to_float], 1),
tf.reshape(samples["label"], (-1, 1))
)
I tried to follow the best practices of the data pipeline tutorial, and vectorize my mapped function (as advised by mrry).
With this settings, while data are downloaded at high-speed (bandwidth is around 200MB/s) the CPU is under-used (14%) and the training is very slow (more than 1hour for a epoch).
I tried some parameters configuration, changing the interleave()
arguments like num_parallel_calls
or cycle_length
or the TFRecordDataset
arguments like num_parallel_calls
.
The fastest configuration uses this set of parameters:
interleave.num_parallel_calls
: 1interleave.cycle_length
: 8TFRecordDataset.num_parallel_calls
: 8
With this one, one epoch only take ~20 minutes to run. However, CPU usage is only at 50% while bandwidth consumption is around 55MB/s
Questions:
- How to optimize the pipeline to reach 100% CPU usage (and something like 100MB/s of bandwidth consumption)?
- Why does
tf.data.experimental.AUTOTUNE
not find best value to speed up the training?
Kind,Alexis.
Edit
After some more experimentations, I came to the following solution.
- Remove the
interleave
step which is already handled byTFRecordDataset
ifnum_parallel_calls
is greater than 0. - Update the mapped function to only do
parse_example
anddecode_raw
, returning a tuple `((, ), ()) cache
after themap
- Move the
_bit_to_float
function as a component of the model
Finally, here is the data pipeline code:
def build_dataset(file_pattern):
return tf.data.TFRecordDataset(
tf.data.Dataset.list_files(file_pattern),
num_parallel_reads=multiprocessing.cpu_count(),
buffer_size=70*1000*1000
).shuffle(
buffer_size=2048
).map(
map_func=split,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).prefetch(
buffer_size=32
)
def split(example):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(dense_float, bits_to_float),
tf.reshape(samples["label"], (1,))
)
def build_model(input_shape):
feature = keras.Input(shape=(N,))
bool_feature = keras.Input(shape=(M,), dtype="uint8")
one_hot = dataset._bit_to_float(bool_feature)
dense_input = tf.reshape(
keras.backend.concatenate([feature, one_hot], 1),
input_shape)
output = actual_model(dense_input)
model = keras.Model([feature, bool_feature], output)
return model
def _bit_to_float(string_batch: tf.Tensor):
return tf.dtypes.cast(tf.reshape(
tf.bitwise.bitwise_and(
tf.bitwise.right_shift(
tf.expand_dims(string_batch, 2),
tf.reshape(
tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
(1, 1, 8)
),
),
tf.constant(0x01, dtype=tf.uint8)
),
(tf.shape(string_batch)[0], -1)
), tf.float32)
Thanks to all these optimizations:
- Bandwidth consumption is around 90MB/s
- CPU usage is around 20%
- First epoch spends 20 minutes
- Successives epochs spend 5 minutes each
So this seems to be a good first setup. But CPU and BW are still not overused, so any advice is still welcomed!
Edit Bis
So, after some benchmarking I came accross what I think is our best input pipeline:
def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)
def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)
So, what's new:
- According to this GitHub issue, the
TFRecordDataset
interleaving is a legacy one, sointerleave
function is better. batch
beforemap
is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.- No need of
repeat
anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post) - Switch from a
VarLenFeature
to aFixedLenSequenceFeature
, removing a useless call totf.sparse.to_dense
.
Hope this can help. Advices are still welcomed.
Mentioning the Solution and the Important observations of @AlexisBRENON in the Answer Section, for the benefit of the Community.
Below mentioned are the Important Observations:
- According to this GitHub issue, the
TFRecordDataset
interleaving
is a legacy one, sointerleave
function is better. batch
beforemap
is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.- No need of
repeat
anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post) - Switch from a
VarLenFeature
to aFixedLenSequenceFeature
, removing a useless call totf.sparse.to_dense
.
Code for the Pipeline, with improved performance, in line with above observations is mentioned below:
def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)
def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)
这篇关于如何提高数据输入管道的性能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!