我试图让一些类似于 keras 的“fit_generator”方法的工作。基本上,我有一个(非常)大的小批量数据文件,我想让我的 CPU 抓取小批量并填充与我的 GPU 平行的队列,从队列中取出小批量并对其进行训练。通过让 CPU 与 GPU 并行工作(而不是让 CPU 抓取一个批次并使 GPU 在对该批次进行训练之前等待 CPU),我应该能够将我的训练时间减少大约一半。我已经对 CPU 获取 mini-batch 所需的时间进行了基准测试,它花费的时间与我的 GPU 训练一个 mini-batch 所需的时间相当,因此并行化 CPU 和 GPU 应该可以正常工作。我还没有在 pytorch 中找到一个内置的方法来做到这一点,如果有的话,请告诉我。
所以我尝试使用 torch.multiprocessing 模块来做我想做的事情,但我无法完成培训,因为我总是在培训完成之前遇到某种错误。 torch.multiprocessing 模块应该是一个包装器,它具有与常规 multiprocessing 模块基本相同的所有功能,除了它允许在进程之间共享 pytorch 张量。基本上,我已经将我的代码设置为具有 2 个函数,一个加载器函数和一个训练器函数,如下所示:
def data_gen(que,PATH,epochs,steps_per_epoch,batch_size=32):
for epoch in range(epochs):
for j in range(steps_per_epoch):
with h5py.File(PATH,'r') as f:
X = f['X'][j*batch_size:(j+1)*batch_size]
Y = f['Y'][j*batch_size:(j+1)*batch_size]
X = autograd.Variable(torch.Tensor(X).resize_(batch_size,256,25).cpu())
Y = autograd.Variable(torch.Tensor(Y).cpu())
que.put((X,Y))
que.put('stop')
que.close()
return
def train_network(que,net,optimizer,epochs):
print('Training for %s epochs...' %epochs)
for epoch in range(epochs):
while(True):
data = que.get()
if(data == 'stop'):
break
net.zero_grad()
net.hid = net.init_hid()
inp,labels = data
inp = inp.cuda()
labels = labels.cuda()
out,hid = net(inp)
loss = F.binary_cross_entropy(out,labels)
loss.backward()
optimizer.step()
print('Epoch end reached')
return
然后我像这样并行运行这两个进程:
if __name__ == '__main__':
tmp.set_start_method('spawn')
que = tmp.Queue(maxsize=10)
loader = tmp.Process(target=data_gen, args=(que,PATH,epochs,steps), kwargs={'batch_size':batch_size})
loader.start()
trainer = tmp.Process(target=train_network, args=(que,net,optimizer,epochs,steps))
trainer.start()
loader.join()
trainer.join()
我在每个 epoch 结束时将 que 放入一个“停止”值,这样我就可以跳出训练器中的循环并进入下一个 epoch。这种“毒丸”方法似乎有效,因为代码运行了多个 epoch,并且训练器实际上打印了 epoch 验证消息的结尾。代码运行了,它似乎确实加快了训练过程(我一直试图在一小部分数据上对这段代码进行原型(prototype)设计,所以有时很难说我得到了多少加速),但在训练结束(并且总是在最后,无论我指定多少个时期),我总是收到错误消息:
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/media/digitalstorm/Storage/RNN_Prototype/Lazuli_rnnprototype.py", line 307, in train_network
data = que.get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 113, in get
return _ForkingPickler.loads(res)
File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
fd = df.detach()
File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 57, in detach
with _resource_sharer.get_connection(self._id) as conn:
File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 87, in get_connection
c = Client(address, authkey=process.current_process().authkey)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 487, in Client
c = SocketClient(address)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 614, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
或者,如果我在各种选项中混为一谈,我有时会收到这样的错误:
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/media/digitalstorm/Storage/RNN_Prototype/Lazuli_rnnprototype.py", line 306, in train_network
data = que.get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 113, in get
return _ForkingPickler.loads(res)
File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
fd = df.detach()
File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 58, in detach
return reduction.recv_handle(conn)
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 182, in recv_handle
return recvfds(s, 1)[0]
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 153, in recvfds
msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
ConnectionResetError: [Errno 104] Connection reset by peer
我不知道我哪里出错了。诚然,我是多处理方面的新手,所以我很难调试到底出了什么问题。任何帮助将不胜感激,谢谢!
最佳答案
由于这个问题没有任何进展,我将发布我自己的解决方法来解决这个问题。基本上,加载器进程在完成处理和对示例进行排队后关闭队列。它没有等待训练器进程完成,所以当训练器进程要获取下一个 minibatch 时,它找不到它。我不太明白为什么加载进程过早地关闭了队列,que.close()
的文档说这应该只告诉队列没有更多的对象被发送到队列,但它实际上不应该关闭队列。此外,删除 que.close()
并没有解决问题,所以我认为错误与该命令无关。为我解决这个问题的是在 time.sleep(2)
命令之后放置一个 que.close()
命令。这会强制 que 在完成将所有内容放入 que 后休眠几秒钟,并允许程序完成并退出而不会出错。
关于Python (Pytorch) 多处理抛出错误 : Connection reset by peer and File Not Found,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47762973/