我有一个巨大的培训csv文件(709m)和一个巨大的测试csv文件(125m),我想在使用高级TensorFlow API的上下文中发送到DNNClassifier
中。
似乎input_fn
和fit
接受的evaluate
参数必须将所有功能和标签数据保存在内存中,但我当前希望在本地计算机上运行它,因此如果我将这些文件读取到内存中,然后处理它们,它将很快耗尽内存。
我在streamed-reading of data上略读了文档,但用于读取CSV的示例代码似乎是用于低级TensorFlow API。
而且,如果您能原谅一些抱怨的话,那么对于将准备好的培训和测试数据文件发送到Estimator
这样一个微不足道的用例来说,似乎过于复杂了。尽管如此,也许在TensorFlow中培训和测试大量数据实际上需要这种复杂性?
在任何情况下,如果可能的话,我真的很欣赏将这种方法与高级API结合使用的例子,我开始怀疑这一点。
在四处搜寻之后,我确实找到了,并将尝试使用它进行培训。
如何使用这个方法的例子可以节省我一些时间,不过我希望在接下来的几个小时内能够找到正确的用法。
然而,似乎没有相应的DNNClassifier#partial_fit
。尽管我怀疑我可以将测试数据分解成更小的片段,并在每一批上连续运行DNNClassifier#partial_evaluate
,这实际上是一种很好的方法,因为我可以将测试数据分割成队列,从而获得每一队列的准确度。
===更新====
简短版本:
domjack的建议应该是公认的答案。
但是,我的Mac的16GB内存足以在内存中保存整个709MB的训练数据集而不会崩溃。所以,当我最终部署应用程序时,虽然我将使用数据集功能,但我还没有将其用于本地开发工作。
较长版本:
我开始使用如上所述的DNNClassifier#evaluate
API,但每次使用它都会发出警告。
因此,我查看了方法的源代码,发现它的完整实现如下所示:
logging.warning('The current implementation of partial_fit is not optimized'
' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
batch_size=batch_size, monitors=monitors)
…这让我想起了《搭便车指南》中的这个场景:
亚瑟·登特:如果我按这个按钮会怎么样?
福特州长:我不会的-
亚瑟·登特:哦。
福特州长:发生了什么事?
亚瑟·登特:一个牌子亮了,上面写着“请不要再按这个按钮”。
也就是说:
partial_fit
似乎是为了告诉你不要使用它而存在的。此外,在训练文件块上迭代使用
partial_fit
生成的模型比在整个训练文件上迭代使用partial_fit
生成的模型小得多,这有力地表明只有最后一个fit
训练块实际“占用”。 最佳答案
查看tf.data.Dataset
api。创建数据集有多种方法。我将概述四个-但您只需要实现一个。
我假设您的csv
文件的每一行都是n_features
浮点值,后跟一个int
值。
创建一个tf.data.Dataset
用Dataset.from_generator
最简单的开始方法是包装本机Python生成器。这可能会导致性能问题,但对于您的目的来说可能很好。
def read_csv(filename):
with open(filename, 'r') as f:
for line in f.readlines():
record = line.rstrip().split(',')
features = [float(n) for n in record[:-1]]
label = int(record[-1])
yield features, label
def get_dataset():
filename = 'my_train_dataset.csv'
generator = lambda: read_csv(filename)
return tf.data.Dataset.from_generator(
generator, (tf.float32, tf.int32), ((n_features,), ()))
这种方法非常通用,允许您独立于TensorFlow测试生成器函数(
read_csv
)。使用Tensorflow Datasets API
支持TensorFlow版本1.12+,TensorFlow数据集是我最喜欢的创建数据集的新方法。它自动序列化您的数据,收集统计信息,并通过
info
和builder
对象使其他元数据对您可用。它还可以处理自动下载和提取,从而简化协作。将TensorFlow数据集导入为TFD
class MyCsvDatasetBuilder(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version("0.0.1")
def _info(self):
return tfds.core.DatasetInfo(
builder=self,
description=(
"My dataset"),
features=tfds.features.FeaturesDict({
"features": tfds.features.Tensor(
shape=(FEATURE_SIZE,), dtype=tf.float32),
"label": tfds.features.ClassLabel(
names=CLASS_NAMES),
"index": tfds.features.Tensor(shape=(), dtype=tf.float32)
}),
supervised_keys=("features", "label"),
)
def _split_generators(self, dl_manager):
paths = dict(
train='/path/to/train.csv',
test='/path/to/test.csv',
)
# better yet, if the csv files were originally downloaded, use
# urls = dict(train=train_url, test=test_url)
# paths = dl_manager.download(urls)
return [
tfds.core.SplitGenerator(
name=tfds.Split.TRAIN,
num_shards=10,
gen_kwargs=dict(path=paths['train'])),
tfds.core.SplitGenerator(
name=tfds.Split.TEST,
num_shards=2,
gen_kwargs=dict(cvs_path=paths['test']))
]
def _generate_examples(self, csv_path):
with open(csv_path, 'r') as f:
for i, line in enumerate(f.readlines()):
record = line.rstrip().split(',')
features = [float(n) for n in record[:-1]]
label = int(record[-1])
yield dict(features=features, label=label, index=i)
用途:
builder = MyCsvDatasetBuilder()
builder.download_and_prepare() # will only take time to run first time
# as_supervised makes output (features, label) - good for model.fit
datasets = builder.as_dataset(as_supervised=True)
train_ds = datasets['train']
test_ds = datasets['test']
包装基于索引的python函数
上面的缺点之一是,使用大小为
n
的shuffle缓冲区对生成的数据集进行洗牌需要加载n
示例。这将在管道中创建周期性暂停(大的n
)或导致潜在的不良洗牌(小的n
)。def get_record(i):
# load the ith record using standard python, return numpy arrays
return features, labels
def get_inputs(batch_size, is_training):
def tf_map_fn(index):
features, labels = tf.py_func(
get_record, (index,), (tf.float32, tf.int32), stateful=False)
features.set_shape((n_features,))
labels.set_shape(())
# do data augmentation here
return features, labels
epoch_size = get_epoch_size()
dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
if is_training:
dataset = dataset.repeat().shuffle(epoch_size)
dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
dataset = dataset.batch(batch_size)
# prefetch data to CPU while GPU processes previous batch
dataset = dataset.prefetch(1)
# Also possible
# dataset = dataset.apply(
# tf.contrib.data.prefetch_to_device('/gpu:0'))
features, labels = dataset.make_one_shot_iterator().get_next()
return features, labels
简而言之,我们只创建一个数据集,其中包含记录索引(或者任何可以完全加载到内存中的小记录ID)。然后,我们对这个最小的数据集执行洗牌/重复操作,然后通过
map
和tf.data.Dataset.map
将索引转换为实际数据。有关用法,请参阅下面的tf.py_func
和Using with Estimators
部分。注意:这要求您的数据可以按行访问,因此您可能需要从Testing in isolation
转换为其他格式。文本行数据集
还可以使用
csv
直接读取csv
文件。def get_record_defaults():
zf = tf.zeros(shape=(1,), dtype=tf.float32)
zi = tf.ones(shape=(1,), dtype=tf.int32)
return [zf]*n_features + [zi]
def parse_row(tf_string):
data = tf.decode_csv(
tf.expand_dims(tf_string, axis=0), get_record_defaults())
features = data[:-1]
features = tf.stack(features, axis=-1)
label = data[-1]
features = tf.squeeze(features, axis=0)
label = tf.squeeze(label, axis=0)
return features, label
def get_dataset():
dataset = tf.data.TextLineDataset(['data.csv'])
return dataset.map(parse_row, num_parallel_calls=8)
tf.data.TextLineDataset
函数有点复杂,因为parse_row
需要批处理。如果在解析前对数据集进行批处理,可以使其稍微简单一些。def parse_batch(tf_string):
data = tf.decode_csv(tf_string, get_record_defaults())
features = data[:-1]
labels = data[-1]
features = tf.stack(features, axis=-1)
return features, labels
def get_batched_dataset(batch_size):
dataset = tf.data.TextLineDataset(['data.csv'])
dataset = dataset.batch(batch_size)
dataset = dataset.map(parse_batch)
return dataset
tf记录数据集
或者,您可以将
tf.decode_csv
文件转换为tfrecord文件并使用TFRecordDataset。有一个全面的教程。步骤1:将
csv
数据转换为tfrecords数据。下面的示例代码(请参见上面的csv
示例中的read_csv
)。with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
for features, labels in read_csv('my_train_dataset.csv'):
example = tf.train.Example()
example.features.feature[
"features"].float_list.value.extend(features)
example.features.feature[
"label"].int64_list.value.append(label)
writer.write(example.SerializeToString())
这只需要运行一次。
第2步:编写一个数据集来解码这些记录文件。
def parse_function(example_proto):
features = {
'features': tf.FixedLenFeature((n_features,), tf.float32),
'label': tf.FixedLenFeature((), tf.int64)
}
parsed_features = tf.parse_single_example(example_proto, features)
return parsed_features['features'], parsed_features['label']
def get_dataset():
dataset = tf.data.TFRecordDataset(['data.tfrecords'])
dataset = dataset.map(parse_function)
return dataset
使用带有估计量的数据集
def get_inputs(batch_size, shuffle_size):
dataset = get_dataset() # one of the above implementations
dataset = dataset.shuffle(shuffle_size)
dataset = dataset.repeat() # repeat indefinitely
dataset = dataset.batch(batch_size)
# prefetch data to CPU while GPU processes previous batch
dataset = dataset.prefetch(1)
# Also possible
# dataset = dataset.apply(
# tf.contrib.data.prefetch_to_device('/gpu:0'))
features, label = dataset.make_one_shot_iterator().get_next()
estimator.train(lambda: get_inputs(32, 1000), max_steps=1e7)
单独测试数据集
我强烈建议您独立于您的评估器来测试数据集。使用上面的命令,它应该像
batch_size = 4
shuffle_size = 100
features, labels = get_inputs(batch_size, shuffle_size)
with tf.Session() as sess:
f_data, l_data = sess.run([features, labels])
print(f_data, l_data) # or some better visualization function
性能
假设您使用GPU运行网络,除非
from_generator
文件的每一行都很大,并且网络很小,否则您可能不会注意到性能上的差异。这是因为get_inputs
实现强制在CPU上执行数据加载/预处理,而csv
意味着下一批可以在CPU上准备,因为当前批正在GPU上进行培训。唯一的例外是,如果在每个记录都有大量数据的数据集上有大量的无序排列大小,那么在通过GPU运行任何内容之前,首先要花一些时间加载大量示例。关于python - 将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45828616/