我有一个巨大的培训csv文件(709m)和一个巨大的测试csv文件(125m),我想在使用高级TensorFlow API的上下文中发送到DNNClassifier中。
似乎input_fnfit接受的evaluate参数必须将所有功能和标签数据保存在内存中,但我当前希望在本地计算机上运行它,因此如果我将这些文件读取到内存中,然后处理它们,它将很快耗尽内存。
我在streamed-reading of data上略读了文档,但用于读取CSV的示例代码似乎是用于低级TensorFlow API。
而且,如果您能原谅一些抱怨的话,那么对于将准备好的培训和测试数据文件发送到Estimator这样一个微不足道的用例来说,似乎过于复杂了。尽管如此,也许在TensorFlow中培训和测试大量数据实际上需要这种复杂性?
在任何情况下,如果可能的话,我真的很欣赏将这种方法与高级API结合使用的例子,我开始怀疑这一点。
在四处搜寻之后,我确实找到了,并将尝试使用它进行培训。
如何使用这个方法的例子可以节省我一些时间,不过我希望在接下来的几个小时内能够找到正确的用法。
然而,似乎没有相应的DNNClassifier#partial_fit。尽管我怀疑我可以将测试数据分解成更小的片段,并在每一批上连续运行DNNClassifier#partial_evaluate,这实际上是一种很好的方法,因为我可以将测试数据分割成队列,从而获得每一队列的准确度。
===更新====
简短版本:
domjack的建议应该是公认的答案。
但是,我的Mac的16GB内存足以在内存中保存整个709MB的训练数据集而不会崩溃。所以,当我最终部署应用程序时,虽然我将使用数据集功能,但我还没有将其用于本地开发工作。
较长版本:
我开始使用如上所述的DNNClassifier#evaluateAPI,但每次使用它都会发出警告。
因此,我查看了方法的源代码,发现它的完整实现如下所示:

logging.warning('The current implementation of partial_fit is not optimized'
                ' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
                batch_size=batch_size, monitors=monitors)

…这让我想起了《搭便车指南》中的这个场景:
亚瑟·登特:如果我按这个按钮会怎么样?
福特州长:我不会的-
亚瑟·登特:哦。
福特州长:发生了什么事?
亚瑟·登特:一个牌子亮了,上面写着“请不要再按这个按钮”。
也就是说:partial_fit似乎是为了告诉你不要使用它而存在的。
此外,在训练文件块上迭代使用partial_fit生成的模型比在整个训练文件上迭代使用partial_fit生成的模型小得多,这有力地表明只有最后一个fit训练块实际“占用”。

最佳答案

查看tf.data.Datasetapi。创建数据集有多种方法。我将概述四个-但您只需要实现一个。
我假设您的csv文件的每一行都是n_features浮点值,后跟一个int值。
创建一个tf.data.Dataset
Dataset.from_generator
最简单的开始方法是包装本机Python生成器。这可能会导致性能问题,但对于您的目的来说可能很好。

def read_csv(filename):
    with open(filename, 'r') as f:
        for line in f.readlines():
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield features, label

def get_dataset():
    filename = 'my_train_dataset.csv'
    generator = lambda: read_csv(filename)
    return tf.data.Dataset.from_generator(
        generator, (tf.float32, tf.int32), ((n_features,), ()))

这种方法非常通用,允许您独立于TensorFlow测试生成器函数(read_csv)。
使用Tensorflow Datasets API
支持TensorFlow版本1.12+,TensorFlow数据集是我最喜欢的创建数据集的新方法。它自动序列化您的数据,收集统计信息,并通过infobuilder对象使其他元数据对您可用。它还可以处理自动下载和提取,从而简化协作。
将TensorFlow数据集导入为TFD
class MyCsvDatasetBuilder(tfds.core.GeneratorBasedBuilder):
  VERSION = tfds.core.Version("0.0.1")

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        description=(
            "My dataset"),
        features=tfds.features.FeaturesDict({
            "features": tfds.features.Tensor(
              shape=(FEATURE_SIZE,), dtype=tf.float32),
            "label": tfds.features.ClassLabel(
                names=CLASS_NAMES),
            "index": tfds.features.Tensor(shape=(), dtype=tf.float32)
        }),
        supervised_keys=("features", "label"),
    )

  def _split_generators(self, dl_manager):
    paths = dict(
      train='/path/to/train.csv',
      test='/path/to/test.csv',
    )
    # better yet, if the csv files were originally downloaded, use
    # urls = dict(train=train_url, test=test_url)
    # paths = dl_manager.download(urls)
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            num_shards=10,
            gen_kwargs=dict(path=paths['train'])),
        tfds.core.SplitGenerator(
            name=tfds.Split.TEST,
            num_shards=2,
            gen_kwargs=dict(cvs_path=paths['test']))
    ]

  def _generate_examples(self, csv_path):
    with open(csv_path, 'r') as f:
        for i, line in enumerate(f.readlines()):
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield dict(features=features, label=label, index=i)

用途:
builder = MyCsvDatasetBuilder()
builder.download_and_prepare()  # will only take time to run first time
# as_supervised makes output (features, label) - good for model.fit
datasets = builder.as_dataset(as_supervised=True)

train_ds = datasets['train']
test_ds = datasets['test']

包装基于索引的python函数
上面的缺点之一是,使用大小为n的shuffle缓冲区对生成的数据集进行洗牌需要加载n示例。这将在管道中创建周期性暂停(大的n)或导致潜在的不良洗牌(小的n)。
def get_record(i):
    # load the ith record using standard python, return numpy arrays
    return features, labels

def get_inputs(batch_size, is_training):

    def tf_map_fn(index):
        features, labels = tf.py_func(
            get_record, (index,), (tf.float32, tf.int32), stateful=False)
        features.set_shape((n_features,))
        labels.set_shape(())
        # do data augmentation here
        return features, labels

    epoch_size = get_epoch_size()
    dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
    if is_training:
        dataset = dataset.repeat().shuffle(epoch_size)
    dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
    dataset = dataset.batch(batch_size)
    # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, labels = dataset.make_one_shot_iterator().get_next()
    return features, labels

简而言之,我们只创建一个数据集,其中包含记录索引(或者任何可以完全加载到内存中的小记录ID)。然后,我们对这个最小的数据集执行洗牌/重复操作,然后通过maptf.data.Dataset.map将索引转换为实际数据。有关用法,请参阅下面的tf.py_funcUsing with Estimators部分。注意:这要求您的数据可以按行访问,因此您可能需要从Testing in isolation转换为其他格式。
文本行数据集
还可以使用csv直接读取csv文件。
def get_record_defaults():
  zf = tf.zeros(shape=(1,), dtype=tf.float32)
  zi = tf.ones(shape=(1,), dtype=tf.int32)
  return [zf]*n_features + [zi]

def parse_row(tf_string):
    data = tf.decode_csv(
        tf.expand_dims(tf_string, axis=0), get_record_defaults())
    features = data[:-1]
    features = tf.stack(features, axis=-1)
    label = data[-1]
    features = tf.squeeze(features, axis=0)
    label = tf.squeeze(label, axis=0)
    return features, label

def get_dataset():
    dataset = tf.data.TextLineDataset(['data.csv'])
    return dataset.map(parse_row, num_parallel_calls=8)

tf.data.TextLineDataset函数有点复杂,因为parse_row需要批处理。如果在解析前对数据集进行批处理,可以使其稍微简单一些。
def parse_batch(tf_string):
    data = tf.decode_csv(tf_string, get_record_defaults())
    features = data[:-1]
    labels = data[-1]
    features = tf.stack(features, axis=-1)
    return features, labels

def get_batched_dataset(batch_size):
    dataset = tf.data.TextLineDataset(['data.csv'])
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(parse_batch)
    return dataset

tf记录数据集
或者,您可以将tf.decode_csv文件转换为tfrecord文件并使用TFRecordDataset。有一个全面的教程。
步骤1:将csv数据转换为tfrecords数据。下面的示例代码(请参见上面的csv示例中的read_csv)。
with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
    for features, labels in read_csv('my_train_dataset.csv'):
        example = tf.train.Example()
        example.features.feature[
            "features"].float_list.value.extend(features)
        example.features.feature[
            "label"].int64_list.value.append(label)
        writer.write(example.SerializeToString())

这只需要运行一次。
第2步:编写一个数据集来解码这些记录文件。
def parse_function(example_proto):
    features = {
        'features': tf.FixedLenFeature((n_features,), tf.float32),
        'label': tf.FixedLenFeature((), tf.int64)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features['features'], parsed_features['label']

def get_dataset():
    dataset = tf.data.TFRecordDataset(['data.tfrecords'])
    dataset = dataset.map(parse_function)
    return dataset

使用带有估计量的数据集
def get_inputs(batch_size, shuffle_size):
    dataset = get_dataset()  # one of the above implementations
    dataset = dataset.shuffle(shuffle_size)
    dataset = dataset.repeat()  # repeat indefinitely
    dataset = dataset.batch(batch_size)
            # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, label = dataset.make_one_shot_iterator().get_next()

estimator.train(lambda: get_inputs(32, 1000), max_steps=1e7)

单独测试数据集
我强烈建议您独立于您的评估器来测试数据集。使用上面的命令,它应该像
batch_size = 4
shuffle_size = 100
features, labels = get_inputs(batch_size, shuffle_size)
with tf.Session() as sess:
    f_data, l_data = sess.run([features, labels])
print(f_data, l_data)  # or some better visualization function

性能
假设您使用GPU运行网络,除非from_generator文件的每一行都很大,并且网络很小,否则您可能不会注意到性能上的差异。这是因为get_inputs实现强制在CPU上执行数据加载/预处理,而csv意味着下一批可以在CPU上准备,因为当前批正在GPU上进行培训。唯一的例外是,如果在每个记录都有大量数据的数据集上有大量的无序排列大小,那么在通过GPU运行任何内容之前,首先要花一些时间加载大量示例。

关于python - 将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45828616/

10-14 18:25
查看更多