我对Python和Twisted还是很陌生,所以我可能只是不太了解某些东西,但似乎停留在需要帮助的地方。
我想做的是在SSL连接上使用ReconnectingClientFactory。我已经全部运行了,但是如果删除了连接,则发送到传输的write()方法的所有数据都将被删除,而不会出现任何错误。实际调用的方法是twisted.protocols.tls.TLSMemoryBIOProtocol.write()。
这是我想发生的事情(从有效的连接开始):
连接断开
使用一些数据调用write()方法(source code here)
self.disconnecting是False
,因此数据传递到_write() method
_write方法进入True
的_lostTLSConnection,然后仅运行return
重新获得连接,但未发送任何数据,因为未在任何地方缓冲
这是客户端的简化版本:
from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct
class MetricsServer(Protocol):
streambuffer = bytearray()
def connectionMade(self):
self.transport.setTcpKeepAlive(True) # maintain the TCP connection
self.transport.setTcpNoDelay(False) # allow Nagle algorithm
print("connected to server")
def dataReceived(self, data):
print("from server:", data)
def connectionLost(self, reason):
self.connected = 0
print("server connection lost:", reason)
class MetricsServerFactory(ReconnectingClientFactory):
protocol = MetricsServer
maxDelay = 300 # maximum seconds between retries
factor = 1.6180339887498948
packet_sequence_number = 0
active_connection = None
def buildProtocol(self, addr):
self.resetDelay()
if self.active_connection == None:
self.active_connection = self.protocol()
return self.active_connection
def get_packet_sequence_number(self):
self.packet_sequence_number += 1
return self.packet_sequence_number
def send_data(self):
print ("sending ssl packet")
packet = struct.pack("!I", self.get_packet_sequence_number())
self.active_connection.transport.write(packet)
reactor.callLater(1.0, metrics_server.send_data)
class CtxFactory(ssl.ClientContextFactory):
def getContext(self):
self.method = SSL.TLSv1_METHOD
ctx = ssl.ClientContextFactory.getContext(self)
ctx.use_certificate_file('keys/client.crt')
ctx.use_privatekey_file('keys/client.key')
def verifyCallback(connection, x509, errnum, errdepth, ok):
return bool(ok)
ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
ctx.load_verify_locations("keys/ca.pem")
return ctx
if __name__ == "__main__":
metrics_server = MetricsServerFactory()
reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
reactor.callLater(3.0, metrics_server.send_data)
reactor.run()
这是一个简单的服务器,输出接收到的数据:
from OpenSSL import SSL
from twisted.internet import ssl, reactor
from twisted.internet.protocol import Factory, Protocol
class Echo(Protocol):
sent_back_data = False
def dataReceived(self, data):
print(' '.join("{0:02x}".format(x) for x in data))
def verifyCallback(connection, x509, errnum, errdepth, ok):
return bool(ok)
if __name__ == '__main__':
factory = Factory()
factory.protocol = Echo
myContextFactory = ssl.DefaultOpenSSLContextFactory(
'keys/server.key', 'keys/server.crt'
)
ctx = myContextFactory.getContext()
ctx.set_verify(
SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
verifyCallback
)
ctx.load_verify_locations("keys/ca.pem")
reactor.listenSSL(8000, factory, myContextFactory)
reactor.run()
重新创建问题的过程:
首先,您需要生成自己的证书和CA才能正常工作
首先运行服务器
运行客户端代码
等待服务器端的一些输出,然后结束程序
注意客户端继续尝试发送数据
重新启动服务器端
注意服务器端将继续接收数据包,但是丢失连接时发送的数据包将被丢弃
作为一种变通办法,我尝试实现自己的缓冲区以在重新连接时发送数据,但是遇到了另一个问题。我希望它在重新建立连接时发送数据,而我能看到的唯一钩子是Protocol.connectionMade()。但是,该方法在TLS握手实际完成之前被调用,因此最终被exception handler in _write()捕获并放置到另一个缓冲区中,以便稍后发送。但是,该缓冲区only seems to be sent if data is received from the other end(在我的情况下这种情况很少发生,并且还意味着数据可能以错误的顺序到达另一端,因为可能在接收数据之前调用了write())。我还认为在接收数据之前再次断开连接也会导致该数据缓冲区被擦除。
编辑:添加了第一个问题的示例代码。我在工厂中有
active_connection
可能很奇怪,但我正在尝试使其作为单例工作。 最佳答案
好的,我发现了解决方法的问题...我正在传递要写入传输的bytearray
,然后立即清除它,直到意识到清除缓冲区之后才意识到写入被推迟了。因此,我通过了bytearray
的副本,它现在似乎可以正常工作。
似乎还不太正确,每次对写入的调用都必须进行检查以查看它是否已连接,因为ReconnectingClientFactory
的整个想法是它会为您维护连接。另外,我认为可能会丢失该if
语句与实际运行write()
的连接,因此仍然有可能丢失数据。
from OpenSSL import SSL
from twisted.internet.protocol import (Protocol, ReconnectingClientFactory)
from twisted.internet import (reactor, ssl)
import struct
class MetricsServer(Protocol):
streambuffer = bytearray()
def connectionMade(self):
self.transport.setTcpKeepAlive(True) # maintain the TCP connection
self.transport.setTcpNoDelay(False) # allow Nagle algorithm
print("connected to server")
if len(self.transport.factory.wrappedFactory.send_buffer) > 0:
self.transport.write(bytes(self.transport.factory.wrappedFactory.send_buffer))
self.transport.factory.wrappedFactory.send_buffer.clear()
def dataReceived(self, data):
print("from server:", data)
def connectionLost(self, reason):
self.connected = 0
print("server connection lost:", reason)
class MetricsServerFactory(ReconnectingClientFactory):
protocol = MetricsServer
maxDelay = 300 # maximum seconds between retries
factor = 1.6180339887498948
packet_sequence_number = 0
active_connection = None
send_buffer = bytearray()
def buildProtocol(self, addr):
self.resetDelay()
if self.active_connection == None:
self.active_connection = self.protocol()
return self.active_connection
def get_packet_sequence_number(self):
self.packet_sequence_number += 1
return self.packet_sequence_number
def send_data(self):
print ("sending ssl packet")
packet = struct.pack("!I", self.get_packet_sequence_number())
if self.active_connection and self.active_connection.connected:
self.active_connection.transport.write(packet)
else:
self.send_buffer.extend(packet)
reactor.callLater(1.0, metrics_server.send_data)
class CtxFactory(ssl.ClientContextFactory):
def getContext(self):
self.method = SSL.TLSv1_METHOD
ctx = ssl.ClientContextFactory.getContext(self)
ctx.use_certificate_file('keys/client.crt')
ctx.use_privatekey_file('keys/client.key')
def verifyCallback(connection, x509, errnum, errdepth, ok):
return bool(ok)
ctx.set_verify(SSL.VERIFY_PEER, verifyCallback)
ctx.load_verify_locations("keys/ca.pem")
return ctx
if __name__ == "__main__":
metrics_server = MetricsServerFactory()
reactor.connectSSL('localhost', 8000, metrics_server, CtxFactory())
reactor.callLater(3.0, metrics_server.send_data)
reactor.run()