我目前正在使用服务器进程,该服务器进程旨在连接到Cassandra数据库并将信息转发给它。这个过程是作为一个类编写的,我的目标是为这个类创建一个Cassandra会话,它可以用来发送信息。但是,我遇到了一个问题。当我在类init方法中创建Cassandra会话,然后稍后尝试在另一种方法中使用该会话时,出现以下错误:errors={}, last_host=<server IP address>
。我目前可以通过在每次调用该方法时创建一个新的Cassandra会话来解决此问题,但这显然不是解决此问题的好方法。因此,我如何进行一个Cassandra会话,以便在整个课程中一致使用?
此代码不起作用:
from cassandra.cluster import Cluster
from multiprocessing import Process
class DataProcess(Process):
def __init__(self):
super(DataProcess,self).__init__()
# Do a few other irrelevant things ...
# Set up the Cassandra connection
self.cluster = Cluster(contact_points=[CASSANDRA_IP])
self.session = self.cluster.connect('some keyspace')
print "Connected to cassandra."
def callback(self,ch,method,props,body):
prepared_statement = self.session.prepare("Some CQL statement...")
bound_statement = prepared_statement.bind(some values)
self.session.execute(bound_statement)
Output:
"Connected to cassandra."
errors={}, last_host=<server IP address>
这段代码可以工作,但是这样做很愚蠢:
from cassandra.cluster import Cluster
from multiprocessing import Process
class DataProcess(Process):
def __init__(self):
super(DataProcess,self).__init__()
# Do a few irrelevant things ...
def callback(self,ch,method,props,body):
# Set up the Cassandra connection
cluster = Cluster(contact_points=[CASSANDRA_IP])
session = cluster.connect('some keyspace')
prepared_statement = session.prepare("Some CQL statement...")
bound_statement = prepared_statement.bind(some values)
session.execute(bound_statement)
其他相关信息:
使用python cassandra-driver版本2.5.1
Cassandra数据库版本2.1.8
编辑:答案
以下代码解决了该问题:
from cassandra.cluster import Cluster
from multiprocessing import Process
class DataProcess(Process):
def __init__(self):
super(DataProcess,self).__init__()
self.cluster = None
self.session = None
# Do a few irrelevant things ...
def callback(self,ch,method,props,body):
# Set up the Cassandra connection
cluster = Cluster(contact_points=[CASSANDRA_IP])
session = cluster.connect('some keyspace')
prepared_statement = session.prepare("Some CQL statement...")
bound_statement = prepared_statement.bind(some values)
session.execute(bound_statement)
def run(self):
self.cluster = Cluster(contact_points=[CASSANDRA_IP])
self.session = self.cluster.connect('some keyspace')
最佳答案
您是在派生之前创建集群和会话吗?这可能会引起类似您所看到的问题。关于如何使用python驱动程序在池中分配工作的信息,非常出色,写在here上。这可能正是您要寻找的。
如果不是这样,请给您更多有关您的流程运行方式的信息,因为在不知道您的流程生命周期的情况下很难复制。