我正试图找出如何在分布式训练中为tensorflow设置输入管道。还不清楚读卡器是从单个进程读取数据并将数据发送给所有工人,还是每个服务器都将启动自己的输入管道?我们如何确保每个员工都有不同的投入?
最佳答案
我将举例说明我是如何做到这一点的:
import tensorflow as tf
batch_size = 50
task_index = 2
num_workers = 10
input_pattern = "gs://backet/dir/part-00*"
获取bucket中与
input_pattern
对应的所有文件名files_names = tf.train.match_filenames_once(
input_pattern, name = "myFiles")
为worker
task_index
选择名称tf.strided_slice
就像列表的切片:一个[::,任务索引](为workertask_index
选择每个文件)to_process = tf.strided_slice(files_names, [task_index],
[999999999], strides=[num_workers])
filename_queue = tf.train.string_input_producer(to_process,
shuffle=True, #shufle files
num_epochs=num_epochs)
reader = tf.TextLineReader()
_ , value = reader.read(filename_queue)
col1,col2 = tf.decode_csv(value,
record_defaults=[[1],[1]], field_delim="\t")
train_inputs, train_labels = tf.train.shuffle_batch([col1,[col2]],
batch_size=batch_size,
capacity=50*batch_size,
num_threads=10,
min_after_dequeue = 10*batch_size,
allow_smaller_final_batch = True)
loss = f(...,train_inputs, train_labels)
optimizer = ...
with tf.train.MonitoredTrainingSession(...) as mon_sess:
coord = tf.train.Coordinator()
with coord.stop_on_exception():
_ = tf.train.start_queue_runners(sess = mon_sess, coord=coord)
while not coord.should_stop() and not mon_sess.should_stop():
optimizer.run()
在分布式TensorFlow实现的情况下,我不确定我的方法是否是实现输入管道的最佳方法,因为每个工人都读取bucket中所有文件的名称
关于tensorflow中输入管道的好讲座:http://web.stanford.edu/class/cs20si/lectures/notes_09.pdf