我正在使用etaty redisscala(https://github.com/etaty/rediscala)客户端。这是我的功能
private def getVersionTime(db: RedisClient, interval: Long)(implicit ec: ExecutionContext): Future[Long] = {
import akka.util.ByteString
import redis.ByteStringFormatter
implicit val byteStringLongFormatter = new ByteStringFormatter[Long] {
def serialize(data: Long): ByteString = ByteString(data.toString.getBytes)
def deserialize(bs: ByteString): Long = bs.utf8String.toLong
}
db.get[Long]("versionTime").map {
case Some(v) => loggerF.info(s"Retrieved version time ${v}")
v
case None => val current = System.currentTimeMillis()
db.setex[Long]("versionTime", (current / 1000) + interval, current)
loggerF.info(s"set version time ${current}")
current
}
}
这是我的测试。这个测试调用上面的方法
it("check with multiple tasks"){
val target = 10
val latch = new java.util.concurrent.CountDownLatch(target)
(1 to target).map{t =>
getVersionTime(prodDb, 10).map{r => print("\n" + r); latch.countDown()}
}
assert(latch.await(10, TimeUnit.SECONDS))
}
测试输出
14:52:46.692 [pool-1-thread-12] INFO EndToEndITTests-设置版本时间1548062566687
14:52:46.693 [pool-1-thread-6] INFO EndToEndITTests-设置版本时间1548062566687
14:52:46.693 [pool-1-thread-20] INFO EndToEndITTests-设置版本时间1548062566687
14:52:46.692 [pool-1-thread-2] INFO EndToEndITTests-设置版本时间1548062566686
14:52:46.692 [pool-1-thread-10] INFO EndToEndITTests-设置版本时间1548062566687
14:52:46.693 [pool-1-thread-8] INFO EndToEndITTests-设置版本时间1548062566687
14:52:46.692 [pool-1-thread-4] INFO EndToEndITTests-设置版本时间1548062566686
14:52:46.692 [pool-1-thread-11] INFO EndToEndITTests-设置版本时间1548062566687
14:52:46.692 [pool-1-thread-9] INFO EndToEndITTests-设置版本时间1548062566687
14:52:46.692 [pool-1-thread-7] INFO EndToEndITTests-设置版本时间1548062566687
预期的行为是-设置版本时间应该一次出现,其余线程应该打印检索到的版本时间。我想我需要在这里使用事务,以便将get和setex包裹在watch和exec中
private def getVersionTimeTrans(db: RedisClient, interval: Long): Long = {
import akka.util.ByteString
import redis.ByteStringFormatter
implicit val byteStringLongFormatter = new ByteStringFormatter[Long] {
def serialize(data: Long): ByteString = ByteString(data.toString.getBytes)
def deserialize(bs: ByteString): Long = bs.utf8String.toLong
}
val redisTransaction = db.transaction()
redisTransaction.watch("versionTime")
val result: Future[Long] = redisTransaction.get[Long]("versionTime").map {
case Some(v) => loggerF.info(s"Retrieved version time ${v}")
v
case None => val current = System.currentTimeMillis()
redisTransaction.setex[Long]("versionTime", (current / 1000) + interval, current)
loggerF.info(s"set version time ${current}")
current
}
redisTransaction.exec()
val r = for {
i <- result
} yield {
i
}
Await.result(r, 10 seconds)
}
测试
it("check with multiple threads "){
val target = 10
val latch = new java.util.concurrent.CountDownLatch(target)
(1 to target).map{t =>
Future(getVersionTimeTrans(prodDb, 10)).map{r => latch.countDown()}
}
assert(latch.await(10, TimeUnit.SECONDS))
}
对于此测试,输出也相同。我不知道如何将其正确包装在事务中。请帮忙。
最佳答案
在rediscala实现中,您似乎无法使用我在原始答案中建议的乐观锁定(请参阅下文),因为在rediscala中,TransactionBuilder
直到WATCH
命令才会发送EXEC
命令,这使其变得毫无用处。有一个旧的封闭bug on GitHub正是在这种情况下引用了另一个SO question,答案是
几个月后
自2014年以来,似乎处于同一状态,我认为它永远不会改变。
原始答案
我没有活着的Redis对其进行测试,但是看起来Redis transaction docs看起来Redis不支持SQL风格的事务,就像您想像的那样。它支持原子操作,但是您不能执行“启动事务获取数据检查可能的修改提交”循环。在最终EXEC
命令到达之前,所有“获取数据”命令都将排队。这意味着您不能在同一事务内的get和set之间进行任何检查。
如果您查看该文档的“使用检查和设置进行乐观锁定”部分,则可以看到实现行为的正确方法是:
watch
为密钥UNWATCH
)关于scala - Redis获取并设置交易的 key 到期时间,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54286920/