问题描述
TF Map function supports parallel calls. I'm seeing no improvements passing num_parallel_calls
to map. With num_parallel_calls=1
and num_parallel_calls=10
, there is no improvement in performance run time. Here is a simple code
import time
def test_two_custom_function_parallelism(num_parallel_calls=1, batch=False,
batch_size=1, repeat=1, num_iterations=10):
tf.reset_default_graph()
start = time.time()
dataset_x = tf.data.Dataset.range(1000).map(lambda x: tf.py_func(
squarer, [x], [tf.int64]),
num_parallel_calls=num_parallel_calls).repeat(repeat)
if batch:
dataset_x = dataset_x.batch(batch_size)
dataset_y = tf.data.Dataset.range(1000).map(lambda x: tf.py_func(
squarer, [x], [tf.int64]), num_parallel_calls=num_parallel_calls).repeat(repeat)
if batch:
dataset_y = dataset_x.batch(batch_size)
X = dataset_x.make_one_shot_iterator().get_next()
Y = dataset_x.make_one_shot_iterator().get_next()
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
i = 0
while True:
try:
res = sess.run([X, Y])
i += 1
if i == num_iterations:
break
except tf.errors.OutOfRangeError as e:
pass
时间在这里
%timeit test_two_custom_function_parallelism(num_iterations=1000,
num_parallel_calls=2, batch_size=2, batch=True)
370ms
%timeit test_two_custom_function_parallelism(num_iterations=1000,
num_parallel_calls=5, batch_size=2, batch=True)
372ms
%timeit test_two_custom_function_parallelism(num_iterations=1000,
num_parallel_calls=10, batch_size=2, batch=True)
384ms
我在 Juypter 笔记本中使用了 %timeit
.我做错了什么?
I used %timeit
in Juypter notebook. What am I doing it wrong?
推荐答案
这里的问题是 Dataset.map()
函数中唯一的操作是一个 tf.py_func()
op.此操作回调本地 Python 解释器以在同一进程中运行函数.增加 num_parallel_calls
将增加尝试并发回调到 Python 的 TensorFlow 线程数.然而,Python 有一种叫做 "Global Interpreter Lock" 的东西,它可以防止多个线程执行代码立刻.结果,除了这些多个并行调用中的一个之外,所有其他调用都将被阻塞,等待获取全局解释器锁,并且几乎没有并行加速(甚至可能略有减速).
The problem here is that the only operation in the Dataset.map()
function is a tf.py_func()
op. This op calls back into the local Python interpreter to run a function in the same process. Increasing num_parallel_calls
will increase the number of TensorFlow threads that attempt to call back into Python concurrently. However, Python has something called the "Global Interpreter Lock" that prevents more than one thread from executing code at once. As a result, all but one of these multiple parallel calls will be blocked waiting to acquire the Global Interpreter Lock, and there will be almost no parallel speedup (and perhaps even a slight slowdown).
您的代码示例不包含 squarer()
函数的定义,但可以用纯 TensorFlow ops 替换 tf.py_func()
,它们是用 C++ 实现的,可以并行执行.例如—只是猜测名称—你可以用的调用替换它tf.square(x)
,然后你可能会享受一些并行加速.
Your code example didn't include the definition of the squarer()
function, but it might be possible to replace tf.py_func()
with pure TensorFlow ops, which are implemented in C++, and can execute in parallel. For example—and just guessing by the name—you could replace it with an invocation of tf.square(x)
, and you might then enjoy some parallel speedup.
但是请注意,如果函数中的工作量很小,例如对单个整数求平方,则加速可能不会很大.并行 Dataset.map()
对更重的操作更有用,例如使用 tf.parse_single_example()
解析 TFRecord 或作为数据增强管道的一部分执行一些图像失真.
Note however that if the amount of work in the function is small, like squaring a single integer, the speedup might not be very large. Parallel Dataset.map()
is more useful for heavier operations, like parsing a TFRecord with tf.parse_single_example()
or performing some image distortions as part of a data augmentation pipeline.
这篇关于并行性并没有减少数据集映射中的时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!