在下面的代码片段中,我尝试查找,删除和创建相同的项目,但是要在2个不同的线程中进行2个不同的事务。
在线程1中,我创建事务1,找到该项目并将其删除。
完成后,我将允许线程2创建事务2,并尝试查找该项目。由于我使用了 Find()
选项,因此FOR UPDATE
方法在此处阻止。
回到线程1,重新创建该项目并提交事务1,这使线程2中的Find()
完成。这里是出现的问题:
如果我使用隔离级别“ReadCommitted”,则会收到未找到的错误-对我来说这没有意义,因为我认为ReadCommitted事务可以看到其他人应用的更新。
如果使用隔离级别“Serializable”,则会收到错误:pq: could not serialize access due to concurrent update
。
为什么我会看到这种行为?我认为在第二次查找后,它应该为我提供最新行。
我如何做到这一点,以便当一行正在被修改时,其他任何读取都将被锁定,并在其他线程完成后解锁以返回最新数据?
db, err := gorm.Open("postgres", "host=localHost port=5432 user=postgres dbname=test-rm password=postgres sslmode=disable")
if err != nil { panic("failed to connect database") }
db.SingularTable(true)
db.DropTableIfExists(&Product{})
db.AutoMigrate(&Product{})
db.Create(&Product{Code: "A", Price: 1000})
// SQL: INSERT INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"
txOpt := &sql.TxOptions{Isolation: sql.LevelSerializable}
doneTrans1 := make(chan struct{})
go func(){
item1 := &Product{}
tx1 := db.BeginTx(context.Background(), txOpt)
err = tx1.Set("gorm:query_option", "FOR UPDATE").Find(item1, "code = ?", "A").Error
// SQL: SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE
item1.Price = 3000
err = tx1.Delete(item1).Error
// SQL: DELETE FROM "product" WHERE "product"."id" = 1
doneTrans1<-struct{}{}
time.Sleep(time.Second * 3)
err = tx1.Create(item1).Error
// SQL: INSERT INTO "product" ("id","code","price") VALUES (1,'A',3000) RETURNING "product"."id"
tx1.Commit()
}()
// Ensure other trans work started
<-doneTrans1
time.Sleep(time.Second * 2)
item2 := &Product{}
tx2 := db.BeginTx(context.Background(), txOpt)
err = tx2.Set("gorm:query_option", "FOR UPDATE").Find(item2, "code = ?", "A").Error
// SQL: SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE
// ERROR occurs here
item2.Price = 5000
err = tx2.Delete(item2).Error
err = tx2.Create(item2).Error
tx2.Commit()
time.Sleep(time.Second * 5)
最佳答案
为了回答这个问题,我认为最好消除goroutine的复杂性(并且实际上消除所有复杂性),并专注于SQL。以下是SQL语句的运行顺序(错误发生后,我已经忽略了所有内容,因为这几乎无关紧要,执行顺序变得复杂/可变!)。
在主例程中
INSERT INTO "product" ("code","price") VALUES ('A',1000) RETURNING "products"."id"
在GoRoutine中
BEGIN TX1
SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE
DELETE FROM "product" WHERE "product"."id" = 1
在主例程中
BEGIN TX2
SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE -- ERROR occurs here
对您的问题。
问题1
从Read Committed Isolation Level的文档中:
因此TX2中的
SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE
将等待TX1完成。此时,TX1已删除产品A,因此该行将被忽略并且不返回任何结果。现在,我知道TX1也会重新创建产品A,但请记住,“SELECT查询(没有FOR UPDATE/SHARE子句)只能看到在查询开始之前提交的数据;”由于选择是在TX1重新创建记录之前开始的,因此将不会显示。问题2
从Repeatable Read Isolation Level的文档中(可序列化是一个较高的级别,因此这些规则以及一些更严格的规则适用):
在您的代码中,TX1更新产品A,这意味着TX2中的查询将被延迟,直到TX1提交,此时它会因错误而中止(如果TX1回滚,它将继续)。
如何进行第二次更新?*
维护事务完整性是一个难题,而PostgreSQL中的功能是一些非常聪明的人进行大量工作的结果。如果您发现自己在与数据库进行斗争,通常最好退一步,考虑是否需要更改方法(或者您认为存在的难题是一个真正的问题)。
在您的示例中,您有两个例程可以删除并重新创建相同的记录。我无法预料到您希望两笔交易都进行的情况。在可能的实际系统中,您不会精心安排计时器来确保首先开始一个事务。这意味着事务完成后数据库的状态将取决于首先到达
SELECT * FROM "product" WHERE (code = 'A') FOR UPDATE
的状态。因此,实际上,是否失败并不重要(因为在任何情况下结果都是随机的)。它实际上是更好的结果,因为您可以建议用户(谁可以检查记录并在需要时重新运行任务)。因此,在阅读其余内容之前,我建议您首先考虑一下这是否是一个问题(我没有您要完成的工作的背景,因此很难评论)。
如果您确实要确保更新能够继续进行,则有几种选择:
但是,我认为一种更好的方法是消除其中的大部分,并尝试在一个步骤中执行这样的更新(这可能意味着绕过ORM)。如果要最大程度地减少此类问题的发生,那么最小化锁的数量/持续时间非常重要,并且一步执行操作将大有帮助。对于复杂的操作,使用存储过程可以加快处理速度,但是仍然(减少)了与其他同时运行的操作发生冲突的机会。
您可能还想看看Optimistic Locking,因为在某些情况下这更有意义(例如,您在哪里读取信息,将其显示给用户并等待更改,但与此同时另一位用户可能已经进行了更改)。
关于postgresql - 维护同一行的并发更新的完整性,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/60331946/