我试图让一些类似于 keras 的“fit_generator”方法的工作。基本上,我有一个(非常)大的小批量数据文件,我想让我的 CPU 抓取小批量并填充与我的 GPU 平行的队列,从队列中取出小批量并对其进行训练。通过让 CPU 与 GPU 并行工作(而不是让 CPU 抓取一个批次并使 GPU 在对该批次进行训练之前等待 CPU),我应该能够将我的训练时间减少大约一半。我已经对 CPU 获取 mini-batch 所需的时间进行了基准测试,它花费的时间与我的 GPU 训练一个 mini-batch 所需的时间相当,因此并行化 CPU 和 GPU 应该可以正常工作。我还没有在 pytorch 中找到一个内置的方法来做到这一点,如果有的话,请告诉我。

所以我尝试使用 torch.multiprocessing 模块来做我想做的事情,但我无法完成培训,因为我总是在培训完成之前遇到某种错误。 torch.multiprocessing 模块应该是一个包装器,它具有与常规 multiprocessing 模块基本相同的所有功能,除了它允许在进程之间共享 pytorch 张量。基本上,我已经将我的代码设置为具有 2 个函数,一个加载器函数和一个训练器函数,如下所示:

    def data_gen(que,PATH,epochs,steps_per_epoch,batch_size=32):
        for epoch in range(epochs):
            for j in range(steps_per_epoch):
                with h5py.File(PATH,'r') as f:
                    X = f['X'][j*batch_size:(j+1)*batch_size]
                    Y = f['Y'][j*batch_size:(j+1)*batch_size]

                    X = autograd.Variable(torch.Tensor(X).resize_(batch_size,256,25).cpu())
                    Y = autograd.Variable(torch.Tensor(Y).cpu())

                    que.put((X,Y))
            que.put('stop')
        que.close()
        return

    def train_network(que,net,optimizer,epochs):
        print('Training for %s epochs...' %epochs)
        for epoch in range(epochs):
            while(True):
                data = que.get()
                if(data == 'stop'):
                    break
                net.zero_grad()
                net.hid = net.init_hid()
                inp,labels = data
                inp   = inp.cuda()
                labels = labels.cuda()
                out,hid = net(inp)
                loss = F.binary_cross_entropy(out,labels)
                loss.backward()
                optimizer.step()
            print('Epoch end reached')
        return

然后我像这样并行运行这两个进程:
    if __name__ == '__main__':
        tmp.set_start_method('spawn')
        que = tmp.Queue(maxsize=10)
        loader = tmp.Process(target=data_gen, args=(que,PATH,epochs,steps), kwargs={'batch_size':batch_size})
        loader.start()
        trainer = tmp.Process(target=train_network, args=(que,net,optimizer,epochs,steps))
        trainer.start()
        loader.join()
        trainer.join()

我在每个 epoch 结束时将 que 放入一个“停止”值,这样我就可以跳出训练器中的循环并进入下一个 epoch。这种“毒丸”方法似乎有效,因为代码运行了多个 epoch,并且训练器实际上打印了 epoch 验证消息的结尾。代码运行了,它似乎确实加快了训练过程(我一直试图在一小部分数据上对这段代码进行原型(prototype)设计,所以有时很难说我得到了多少加速),但在训练结束(并且总是在最后,无论我指定多少个时期),我总是收到错误消息:
    Process Process-2:
    Traceback (most recent call last):
      File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
        self.run()
      File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "/media/digitalstorm/Storage/RNN_Prototype/Lazuli_rnnprototype.py", line 307, in train_network
        data = que.get()
      File "/usr/lib/python3.6/multiprocessing/queues.py", line 113, in get
        return _ForkingPickler.loads(res)
      File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
        fd = df.detach()
      File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 57, in detach
        with _resource_sharer.get_connection(self._id) as conn:
      File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 87, in get_connection
        c = Client(address, authkey=process.current_process().authkey)
      File "/usr/lib/python3.6/multiprocessing/connection.py", line 487, in Client
        c = SocketClient(address)
      File "/usr/lib/python3.6/multiprocessing/connection.py", line 614, in SocketClient
        s.connect(address)
    FileNotFoundError: [Errno 2] No such file or directory

或者,如果我在各种选项中混为一谈,我有时会收到这样的错误:
    Process Process-2:
    Traceback (most recent call last):
      File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
        self.run()
      File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "/media/digitalstorm/Storage/RNN_Prototype/Lazuli_rnnprototype.py", line 306, in train_network
        data = que.get()
      File "/usr/lib/python3.6/multiprocessing/queues.py", line 113, in get
        return _ForkingPickler.loads(res)
      File "/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
        fd = df.detach()
      File "/usr/lib/python3.6/multiprocessing/resource_sharer.py", line 58, in detach
        return reduction.recv_handle(conn)
      File "/usr/lib/python3.6/multiprocessing/reduction.py", line 182, in recv_handle
        return recvfds(s, 1)[0]
      File "/usr/lib/python3.6/multiprocessing/reduction.py", line 153, in recvfds
        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
    ConnectionResetError: [Errno 104] Connection reset by peer

我不知道我哪里出错了。诚然,我是多处理方面的新手,所以我很难调试到底出了什么问题。任何帮助将不胜感激,谢谢!

最佳答案

由于这个问题没有任何进展,我将发布我自己的解决方法来解决这个问题。基本上,加载器进程在完成处理和对示例进行排队后关闭队列。它没有等待训练器进程完成,所以当训练器进程要获取下一个 minibatch 时,它找不到它。我不太明白为什么加载进程过早地关闭了队列,que.close() 的文档说这应该只告诉队列没有更多的对象被发送到队列,但它实际上不应该关闭队列。此外,删除 que.close() 并没有解决问题,所以我认为错误与该命令无关。为我解决这个问题的是在 time.sleep(2) 命令之后放置一个 que.close() 命令。这会强制 que 在完成将所有内容放入 que 后休眠几秒钟,并允许程序完成并退出而不会出错。

关于Python (Pytorch) 多处理抛出错误 : Connection reset by peer and File Not Found,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47762973/

10-10 20:55