在下面的代码片段中,我尝试查找,删除和创建相同的项目,但是要在2个不同的线程中进行2个不同的事务。

在线程1中,我创建事务1,找到该项目并将其删除。

完成后,我将允许线程2创建事务2,并尝试查找该项目。由于我使用了 Find() 选项,因此FOR UPDATE方法在此处阻止。

回到线程1,重新创建该项目并提交事务1,这使线程2中的Find()完成。这里是出现的问题:

如果我使用隔离级别“ReadCommitted”,则会收到未找到的错误-对我来说这没有意义,因为我认为ReadCommitted事务可以看到其他人应用的更新。

如果使用隔离级别“Serializable”,则会收到错误:pq: could not serialize access due to concurrent update

为什么我会看到这种行为?我认为在第二次查找后,它应该为我提供最新行。

我如何做到这一点,以便当一行正在被修改时,其他任何读取都将被锁定,并在其他线程完成后解锁以返回最新数据?

db, err := gorm.Open("postgres", "host=localHost port=5432 user=postgres dbname=test-rm password=postgres sslmode=disable")
if err != nil { panic("failed to connect database") }
db.SingularTable(true)
db.DropTableIfExists(&Product{})
db.AutoMigrate(&Product{})

db.Create(&Product{Code: "A", Price: 1000})
// SQL: INSERT  INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"

txOpt := &sql.TxOptions{Isolation: sql.LevelSerializable}

doneTrans1 := make(chan struct{})

go func(){
    item1 := &Product{}

    tx1 := db.BeginTx(context.Background(), txOpt)

    err = tx1.Set("gorm:query_option", "FOR UPDATE").Find(item1, "code = ?", "A").Error
    // SQL: SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE

    item1.Price = 3000

    err = tx1.Delete(item1).Error
    // SQL: DELETE FROM "product"  WHERE "product"."id" = 1

    doneTrans1<-struct{}{}
    time.Sleep(time.Second * 3)

    err = tx1.Create(item1).Error
    // SQL: INSERT  INTO "product" ("id","code","price") VALUES (1,'A',3000) RETURNING "product"."id"

    tx1.Commit()
}()

// Ensure other trans work started
<-doneTrans1
time.Sleep(time.Second * 2)

item2 := &Product{}

tx2 := db.BeginTx(context.Background(), txOpt)

err = tx2.Set("gorm:query_option", "FOR UPDATE").Find(item2, "code = ?", "A").Error
// SQL: SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE
// ERROR occurs here

item2.Price = 5000
err = tx2.Delete(item2).Error
err = tx2.Create(item2).Error
tx2.Commit()
time.Sleep(time.Second * 5)

最佳答案

为了回答这个问题,我认为最好消除goroutine的复杂性(并且实际上消除所有复杂性),并专注于SQL。以下是SQL语句的运行顺序(错误发生后,我已经忽略了所有内容,因为这几乎无关紧要,执行顺序变得复杂/可变!)。

在主例程中

INSERT  INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"

在GoRoutine中
BEGIN TX1
SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE
DELETE FROM "product"  WHERE "product"."id" = 1

在主例程中
BEGIN TX2
SELECT * FROM "product"  WHERE (code = 'A') FOR UPDATE -- ERROR occurs here

对您的问题。

问题1



Read Committed Isolation Level的文档中:



因此TX2中的SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE将等待TX1完成。此时,TX1已删除产品A,因此该行将被忽略并且不返回任何结果。现在,我知道TX1也会重新创建产品A,但请记住,“SELECT查询(没有FOR UPDATE/SHARE子句)只能看到在查询开始之前提交的数据;”由于选择是在TX1重新创建记录之前开始的,因此将不会显示。

问题2



Repeatable Read Isolation Level的文档中(可序列化是一个较高的级别,因此这些规则以及一些更严格的规则适用):



在您的代码中,TX1更新产品A,这意味着TX2中的查询将被延迟,直到TX1提交,此时它会因错误而中止(如果TX1回滚,它将继续)。

如何进行第二次更新?*

维护事务完整性是一个难题,而PostgreSQL中的功能是一些非常聪明的人进行大量工作的结果。如果您发现自己在与数据库进行斗争,通常最好退一步,考虑是否需要更改方法(或者您认为存在的难题是一个真正的问题)。

在您的示例中,您有两个例程可以删除并重新创建相同的记录。我无法预料到您希望两笔交易都进行的情况。在可能的实际系统中,您不会精心安排计时器来确保首先开始一个事务。这意味着事务完成后数据库的状态将取决于首先到达SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE的状态。因此,实际上,是否失败并不重要(因为在任何情况下结果都是随机的)。它实际上是更好的结果,因为您可以建议用户(谁可以检查记录并在需要时重新运行任务)。

因此,在阅读其余内容之前,我建议您首先考虑一下这是否是一个问题(我没有您要完成的工作的背景,因此很难评论)。

如果您确实要确保更新能够继续进行,则有几种选择:
  • 如果使用“Serializable”,则需要检测故障并重试事务(如果那是业务逻辑的要求)
  • 如果使用“读已提交”,则用UPDATE替换DELETE/INSERT(在这种情况下,当第一个事务锁释放时,PostgreSQL将重新评估WHERE子句)。

  • 但是,我认为一种更好的方法是消除其中的大部分,并尝试在一个步骤中执行这样的更新(这可能意味着绕过ORM)。如果要最大程度地减少此类问题的发生,那么最小化锁的数量/持续时间非常重要,并且一步执行操作将大有帮助。对于复杂的操作,使用存储过程可以加快处理速度,但是仍然(减少)了与其他同时运行的操作发生冲突的机会。

    您可能还想看看Optimistic Locking,因为在某些情况下这更有意义(例如,您在哪里读取信息,将其显示给用户并等待更改,但与此同时另一位用户可能已经进行了更改)。

    关于postgresql - 维护同一行的并发更新的完整性,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/60331946/

    10-13 05:40