我像这样编写客户端 - 服务器应用程序:
客户端(c#) 服务器(扭曲;ftp 代理和附加功能) ftp 服务器

服务器有两个类:我自己的类协议(protocol)继承自 LineReceiever 协议(protocol)和 FTPClient 来自twisted.protocols.ftp。

但是当客户端发送或获取大文件 (10 Gb - 20 Gb) 时,服务器会捕获 MemoryError。我的代码中没有使用任何缓冲区。当调用 transport.write(data) 数据附加到 react 器编写器的内部缓冲区后(如果我错了,请纠正我)会发生这种情况。

我应该用什么来避免这个问题?或者我应该改变解决问题的方法?

我发现对于大流,我应该使用 IConsumer 和 IProducer 接口(interface)。但最终会调用transfer.write方法,效果是一样的。还是我错了?

UPD:

这是文件下载/上传的逻辑(从 ftp 通过 Twisted 服务器到 Windows 上的客户端):

客户端向 Twisted 服务器发送一些 header ,然后开始发送文件。 Twisted 服务器接收 header ,然后(如果需要)调用 setRawMode() ,打开 ftp 连接并从/向客户端接收/发送字节,然后关闭连接。这是上传文件的部分代码:

FTPManager 类

def _ftpCWDSuccees(self, protocol, fileName):
        self._ftpClientAsync.retrieveFile(fileName, FileReceiver(protocol))



class FileReceiver(Protocol):
    def __init__(self, proto):
        self.__proto = proto

    def dataReceived(self, data):
        self.__proto.transport.write(data)

    def connectionLost(self, why = connectionDone):
        self.__proto.connectionLost(why)

主要代理服务器类:
class SSDMProtocol(LineReceiver)
...

在 SSDMProtocol 对象(调用 obSSDMProtocol )解析 header 后,它会调用打开 ftp 连接的方法(来自 FTPClienttwisted.protocols.ftp)并设置 FTPManager 字段的对象 _ftpClientAsync 并使用 _ftpCWDSuccees(self, protocol, fileName) 调用 protocol = obSSDMProtocol,当收到的文件字节调用 FileReceive_code 对象时。

并且当 dataReceived(self, data) 调用时,数据附加到内部缓冲区的速度比发送回客户端的速度快,因此内存耗尽。也许我可以在缓冲区达到一定大小时停止读取并在缓冲区全部发送到客户端后恢复读取?或类似的东西?

最佳答案

如果您将 20 GB(千兆位?)的字符串传递给 transport.write ,您将需要至少 20 GB(千兆位?)的内存 - 由于在处理字符串时需要额外的复制,可能更像是 40 或 60 Python。

即使您从未将单个字符串传递给 20 GB(千兆位?)的 transport.write,如果您以比您的网络可以处理的速度更快的速度重复使用短字符串调用 transport.write,发送缓冲区最终会变得太大而无法容纳在内存中,并且你会遇到一个 MemoryError

这两个问题的解决方案是生产者/消费者系统。使用 IProducerIConsumer 的好处是,您永远不会有 20 GB(千兆位?)的字符串,而且您永远不会用太多较短的字符串填满发送缓冲区。网络将受到限制,因此读取字节的速度不会超过您的应用程序可以处理它们并忘记它们的速度。您的字符串最终将达到 16kB - 64kB 的数量级,这应该很容易放入内存中。

您只需要调整对 FileReceiver 的使用,将传入连接注册为传出连接的生产者:

class FileReceiver(Protocol):
    def __init__(self, outgoing):
        self._outgoing = outgoing

    def connectionMade(self):
        self._outgoing.transport.registerProducer(self.transport, streaming=True)

    def dataReceived(self, data):
        self._outgoing.transport.write(data)

现在每当 self._outgoing.transport 的发送缓冲区填满时,它都会告诉 self.transport 暂停。一旦发送缓冲区清空,它会告诉 self.transport 恢复。 self.transport 现在介绍了如何在 TCP 级别执行这些操作,以便降低进入服务器的数据速度。

关于扭曲的大文件传输,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/12891192/

10-12 02:12