本文介绍了多线程python psycopg2的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的 Python 程序中使用了多线程.我有3个队列.在其中之一中,我将数据插入 postgres 数据库.但在此之前,我需要检查数据库中是否已经存在具有特定域名的行.所以我有:

I'm using multi thread inside my program in python. I've got 3 queues. In one of them I'm inserting data to postgres database. But before, I need to check if in the database already exists row with specific domain name. So I've got:

class AnotherThread(threading.Thread):
    def __init__(self, another_queue):
        threading.Thread.__init__(self)
        self.another_queue = another_queue


    def run(self):
        while True:
            chunk = self.another_queue.get()
            if chunk is not '':
                dane = chunk[0].split(',',2)

                cur.execute("SELECT exists(SELECT 1 FROM global where domain = %s ) ", (domena,))
                jest = cur.fetchone()
                print(jest)

这是我的第三个队列的代码的一部分.我在这里连接到数据库(在 main() 函数中):

It's a part of code my third queue. I'm connecting to database here (in main() function):

queue = Queue.Queue()
out_queue = Queue.Queue()
another_queue = Queue.Queue()

for i in range(50):
    t = ThreadUrl(queue, out_queue)
    t.setDaemon(True)
    t.start()

for host in hosts:
    queue.put(host)

for i in range(50):
    dt = DatamineThread(out_queue,another_queue)
    dt.setDaemon(True)
    dt.start()

conn_str = "dbname='{db}' user='user' host='localhost' password='pass'"
conn = psycopg2.connect(conn_str.format(db='test'))
conn.autocommit = True
cur = conn.cursor()

for i in range(50):
    dt = AnotherThread(another_queue)
    dt.setDaemon(True)
    dt.start()



queue.join()
out_queue.join()
another_queue.join()

cur.close()
conn.close()

当我运行我的脚本时:

(False,)
(False,)
(False,)
(False,)
(False,)
(False,)
(False,)
(False,)
(False,)
Exception in thread Thread-128:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "domains.py", line 242, in run
    jest = cur.fetchone()
ProgrammingError: no results to fetch

Exception in thread Thread-127:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "domains.py", line 242, in run
    jest = cur.fetchone()
ProgrammingError: no results to fetch

(False,)
(False,)
(False,)

为什么其中一些我收到错误消息?

Why for some of them I'm getting an error?

推荐答案

这可能与所有线程共享相同的连接和游标有关.我可以想象这样一种情况,其中 cur.execute() 运行,然后 cur.fetchone() 由另一个线程运行,然后 cur.fetchone()> 再次由(另一个或相同或前一个)线程执行,中间没有 cur.execute.Python GIL 将在每行(语句)的线程之间切换.因此,第二次运行 fetchone() 时,不再有结果:最初只有一行要获取,现在已经用完了.
您可能想要隔离每个游标,或者以某种方式使 cur.execute(...);cur.fetchone() 命令原子.

This may have to do with the fact that all threads share the same connection and cursor. I could imagine a case where cur.execute() is run, then cur.fetchone() by another thread, then cur.fetchone() again by (yet another or the same or the previous) thread, with no cur.execute in between. The Python GIL would switch between threads per line (statement). Thus, that second time fetchone() is run, there are no results anymore: there's only one row to fetch initially, and that's now been exhausted.
You probably want to isolate each cursor, or somehow make the cur.execute(...); cur.fetchone() commands atomic.

问题的答案是 postgresql 中通过 psycopg2 每个游标或每个连接的事务(DBA StackExchange 链接)提到事务是每个连接,所以隔离游标可能不会帮助你.

The answer to the question are transactions in postgresql via psycopg2 per cursor or per connection (DBA StackExchange link) mentions transactions are per connection, so isolating cursors probably won't help you.

这篇关于多线程python psycopg2的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-12 22:32