我在用例中必须从mysql中读取具有rowsstatus = 0

表架构:

CREATE TABLE IF NOT EXISTS in_out_analytics(
                id INT AUTO_INCREMENT PRIMARY KEY,
                file_name VARCHAR(255),
                start_time BIGINT,
                end_time BIGINT,
                duration INT,
                in_count INT,
                out_count INT,
                status INT
            )


我正在使用以下代码从mysql读取数据。

sistence.py

import mysql
import mysql.connector
import conf

class DatabaseManager(object):
    # global vars to storing db connection details
    connection = None

    def __init__(self):
        self.ip = conf.db_ip
        self.user_name = conf.db_user
        self.password = conf.db_password
        self.db_name = conf.db_name

        # Initialize database only one time in application
        if not DatabaseManager.connection:
            self.connect()
        self.cursor = DatabaseManager.connection.cursor()
        self.create_schema()

    def connect(self):
        try:
            DatabaseManager.connection = mysql.connector.connect(
                    host= self.ip,
                    database = self.db_name,
                    user = self.user_name,
                    password = self.password
            )
            print(f"Successfully connected to { self.ip } ")
        except mysql.connector.Error as e:
            print(str(e))

    def create_schema(self):
        # Create database
        # sql = f"CREATE DATABASE { self.db_name} IF NOT EXIST"
        # self.cursor.execute(sql)

        # Create table
        sql = """
            CREATE TABLE IF NOT EXISTS in_out_analytics(
                id INT AUTO_INCREMENT PRIMARY KEY,
                file_name VARCHAR(255),
                start_time BIGINT,
                end_time BIGINT,
                duration INT,
                in_count INT,
                out_count INT,
                status INT
            )"""
        self.cursor.execute(sql)

    def read_unprocessed_rows(self):
        sql = "SELECT id, start_time, end_time FROM in_out_analytics WHERE status=0;"
        self.cursor.execute(sql)
        result_set = self.cursor.fetchall()
        rows = []
        for row in result_set:
            id = row[0]
            start_time = row[1]
            end_time = row[2]
            details = {
                'id' : id,
                'start_time' : start_time,
                'end_time' : end_time
            }
            rows.append(details)
        return rows


test.py

import time
from persistance import DatabaseManager


if __name__ == "__main__":

    # Rows which are inserted after application is started do not get processed if
    # 'DatabaseManager' is defined here
    # dm = DatabaseManager()

    while True:
        # Rows which are inserted after application is started do get processed if
        # 'DatabaseManager' is defined here
        dm = DatabaseManager()

        unprocessed_rows = dm.read_unprocessed_rows()
        print(f"unprocessed_rows: { unprocessed_rows }")
        time.sleep(2)


问题:

问题是,当我在while循环上方定义数据库对象dm = DatabaseManager()时,启动应用程序后插入的任何新行都不会得到处理,如果我在while循环内定义dm = DatabaseManager()则行即使在应用程序启动后插入的文件也将得到处理。

上面的代码有什么问题?

理想情况下,由于此类正在创建与MySQL的连接,因此我们应该仅使DatabaseManager的一个对象成为可能。因此,与任何数据库建立连接应该是理想的情况。

最佳答案

在这里做一个假设,因为我自己无法测试。

tl; dr:将DatabaseManager.connection.commit()添加到您的read_unprocessed_rows

当执行SELECT语句时,将使用默认隔离级别REPEATABLE READ隐式创建事务。这将在该时间点创建数据库的快照,并且该事务中的所有连续读取都将从第一次读取期间建立的快照中读取。 here描述了不同隔离级别的影响。要在REPEATABLE READ中刷新快照,可以在执行下一条语句之前提交当前事务。

因此,当您在循环中实例化DatabaseManager时,每个SELECT都会在新连接上启动一个新事务,因此每次都有一个新的快照。在循环外实例化Databasemanager时,第一个SELECT创建的事务为所有连续的SELECT保留相同的快照,并且该事务外部的更新保持不可见。

10-04 21:04