如何使从数据存储区获取和设置属性的操作安全,线程安全?

当前,我有将任务放入队列中的代码,每个任务执行一个任务,然后更新一个名为numberOfTasks的属性,该属性的类型为int。它基本上会获取此属性的当前值并对其进行递增。

但是,当任务在队列中执行时,由于线程问题,最终值将不正确。有时,两个任务试图同时更新属性,因此有时增量未完成。

任何人都可以帮助正确完成此工作吗?

数据存储区属性获取器方法:

private String doGet(String rowId) throws EntityNotFoundException {
    Key egsKey = KeyFactory.createKey(DATASTORE_KIND, rowId);

    Entity egsEntity = datastore.get(egsKey);

    // schema changed from String to Text type. Transparently handle that here.
    Object propertyValue = egsEntity.getProperty(PROPERTY_KEY);

    if (propertyValue instanceof String) {
        return (String) propertyValue;
    }

    Text text = (Text) propertyValue;

    return text.getValue();
}


数据存储区属性的设置方法:

private void doPut(String rowId, List<String> list) {
    Entity entity = new Entity(DATASTORE_KIND, rowId);
    entity.setProperty(PROPERTY_KEY, list);

    datastore.put(entity);
}


设置方法和获取方法:

public synchronized int getPendingUsersForProcessing() {
    String pendingUsersForProcessingAsString = null;
    try {
        pendingUsersForProcessingAsString = doGet(PENDING_USERS_FOR_PROCESSING);
        return Integer.valueOf(pendingUsersForProcessingAsString);
    } catch (NumberFormatException e) {
        throw new IllegalStateException("The num of last batches processed in Datastore is not a number: "
                + pendingUsersForProcessingAsString);
    } catch (EntityNotFoundException e) {
        return DEFAULT_PENDING_USERS_FOR_PROCESSING;
    }
}

/** {@inheritDoc } */
@Override
public synchronized void setPendingUsersForProcessing(int pendingUsersForProcessing) {
    doPut(PENDING_USERS_FOR_PROCESSING, String.valueOf(pendingUsersForProcessing));
    LOG.info("Number of Pending Users For Processing is set to : " + pendingUsersForProcessing);
}


我尝试更新属性的代码:

int pendingUsers = appProperties.getPendingUsersForProcessing();
int requestUsers = request.getUserKeys().size();
appProperties.setPendingUsersForProcessing(pendingUsers + requestUsers);

最佳答案

这并非完全是线程问题,因为您可能有多个应用程序实例在执行任务,并且这些实例彼此之间并不了解。因此,这是一个争用情况。

您有几种解决方法。


使用sharding for your counters
不用不断更新同一实体,而是使用一个任务完成的时间作为ID,为每个完成的任务创建一个新的实体。这种方法的优势在于,它创建了一个审计跟踪,并且您始终可以获取统计信息,例如今天,过去一小时内完成的任务数等。要计算实体数,您可以使用仅键查询,即几乎免费而且非常快。缺点是编写这些实体的成本较高-如果要完成的任务很多,这不是解决方案。
代替计算任务,而是计算这些任务的结果。例如,如果任务更新了用户状态,则可以使用免费且快速的仅键查询来计算处于“待定”状态的用户数。如果您已经拥有一个索引属性,可以将其用作标记来计算已完成任务的数量,那么这是一种非常好的方法。

07-25 22:51