我有一个休眠课程,需要3个不同的会话。它目前使用2个会话,并且运行良好。第一个会话用于从外部数据库读取数据。第二个会话用于将数据保存到我们的内部数据库。我要添加第三个会话,因为无论主事务是否成功(XXXXUpdate对象),我们都需要跟踪事务。我的问题是新会话挂在tx.commit()上。

private synchronized void executeUpdate(Long manualUpdateTagIndex) throws Exception {
    LogPersistenceLoggingContext ctx = new LogPersistenceThreadContext().getLogPersistenceLoggingContext();

    DateTime minTriggerDate = parseDateTimeIfNotNull(minTriggerTime);
    DateTime maxTriggerDate = parseDateTimeIfNotNull(maxTriggerTime);
    Session webdataSession = null;
    Session XXXXUpdateSession = null;
    XXXXUpdate update = new XXXXUpdate();
    update.setExecutedAt(new DateTime());
    update.setStatus(WebdataUpdateStatus.Success);

    boolean commit = true;
    int tagCount = 0;
    List<Period> tagPeriods = new ArrayList<>();
    Map<Long, DateTime> tagIndexes = new LinkedHashMap<>();

    try {

        XXXXUpdateSession = accountingService.openUnmanagedSession();
        XXXXUpdateSession.getTransaction().begin();
        XXXXUpdateSession.save(update);

        HierarchicalLogContext logCtx = new HierarchicalLogContext(String.valueOf(update.getId()));
        ctx.pushLoggingContext(logCtx);

        ctx.log(logger, Level.INFO, new XXXXLogMarker(), "Executing XXXX data transfer", new Object[]{});
        if (webdataSessionFactory == null){
            throw new Exception("Failed to obtain webdata session factory. See earlier log entries");
        }
        try {
            webdataSession = webdataSessionFactory.openSession();
        } catch (Exception ex) {
            update.setStatus(WebdataUpdateStatus.ConnectionError);
            throw new Exception("Failed to obtain webdata connection", ex);
        }

        webdataSession.getTransaction().begin();

        if (manualUpdateTagIndex == null) { // automatic tags update

            XXXXUpdate lastUpdate = (XXXXUpdate) HibernateUtil.getCurrentSpringManagedSession()
                    .createCriteria(XXXXUpdate.class)
                    .add(Restrictions.isNotNull("latestTriggerTimestamp"))
                    .add(Restrictions.eq("status", WebdataUpdateStatus.Success))
                    .add(Restrictions.eq("manualUpdate", false))
                    .addOrder(Order.desc("latestTriggerTimestamp"))
                    .setMaxResults(1).uniqueResult();

            DateTime lastUpdatedDate = Period.defaultEffectiveInstant;
            if (minTriggerDate != null) {
                lastUpdatedDate = minTriggerDate;
            }

            if (lastUpdate != null && lastUpdate.getLatestTriggerTimestamp() != null) {
                lastUpdatedDate = lastUpdate.getLatestTriggerTimestamp();
                ctx.log(logger, Level.INFO, new XXXXLogMarker(),
                        "Querying for tag event triggers newer than last update timestamp [" + lastUpdate.getLatestTriggerTimestamp() + "]", new Object[]{});
            } else {
                ctx.log(logger, Level.INFO, new XXXXLogMarker(), "Update has never run. Catching up with history", new Object[]{});
            }

            @SuppressWarnings("unchecked")
            List<XXXXProcessedTagRequest> processedReqs = HibernateUtil.getCurrentSpringManagedSession()
                    .createCriteria(XXXXProcessedTagRequest.class).list();

            Query triggerQuery = webdataSession.createQuery(
                    "select trigger, "
                            + "trigger.TagIndex,"
                            + "req  "
                            + "from XXXXTagEventTrigger as trigger "
                            + "join trigger.req as req "
                            + "where trigger.EventType in (:eventTypes) "
                            + "and trigger.timestamp > :lastUpdateMinusDelta "
                            + (maxTriggerDate != null?"and trigger.timestamp < :maxDate ":"")
                            + "and req.CurrentState = :currentState "
                            + "order by trigger.timestamp,trigger.reqIndex");

            triggerQuery.setParameterList("eventTypes", new Object[]{5, 9});
            triggerQuery.setParameter("lastUpdateMinusDelta", lastUpdatedDate.minusHours(hoursToKeepProcessedReqs) );
            if (maxTriggerDate != null){
                triggerQuery.setParameter("maxDate", maxTriggerDate);
            }
            triggerQuery.setParameter("currentState", 2);

            @SuppressWarnings("unchecked")
            List<Object[]> allTriggers = triggerQuery.list();

            List<Object[]> unprocessedTriggers = removeProcessedTags(new ArrayList<Object[]>(allTriggers),processedReqs,ctx);

            for (Object[] row : unprocessedTriggers) {
                XXXXTagEventTrigger trigger = (XXXXTagEventTrigger) row[0];

                if (lastUpdatedDate == null || lastUpdatedDate.isBefore(trigger.getTimestamp().getMillis())) {
                    lastUpdatedDate = new DateTime(trigger.getTimestamp());
                }

                tagIndexes.put((Long) row[1], new DateTime(trigger.getTimestamp()));

                XXXXProcessedTagRequest processedReq = new XXXXProcessedTagRequest();
                processedReq.setReqIndex(((XXXXTagReq)row[2]).getReqIndex());
                processedReq.setTimestamp(trigger.getTimestamp());

                HibernateUtil.getCurrentSpringManagedSession().save(processedReq);
            }

            ctx.log(logger, Level.INFO, new XXXXLogMarker(),
                    "Found [" + unprocessedTriggers.size() + "] tag event triggers on [" + tagIndexes.size() + "] tags", new Object[]{});

            update.setLatestTriggerTimestamp(lastUpdatedDate);
        } else { // manual tag update
            ctx.log(logger, Level.INFO, new XXXXLogMarker(), "Executing manual update for tag index [" + manualUpdateTagIndex + "]", new Object[]{});

            DateTime now = new DateTime();
            tagIndexes.put(manualUpdateTagIndex, now);
            update.setLatestTriggerTimestamp(now);
            update.setManualUpdate(true);
        }

        if (tagIndexes.size() > 0) {

            int totalTagCount = tagIndexes.size();

            while (!tagIndexes.isEmpty()) {
                List<Long> batchIndexes = new ArrayList<>();
                Iterator<Map.Entry<Long, DateTime>> indexIt = tagIndexes.entrySet().iterator();

                while (indexIt.hasNext() && batchIndexes.size() < tagBatchSize) {
                    batchIndexes.add(indexIt.next().getKey());
                    indexIt.remove();
                }

                Map<Long, LocalTag> existingTags = new HashMap<>();
                @SuppressWarnings("unchecked")
                List<LocalTag> existingTagIds = HibernateUtil.getCurrentSpringManagedSession()
                        .createCriteria(LocalTag.class)
                        .add(Restrictions.in("tagIndex", batchIndexes))
                        .add(Restrictions.eq("currentVersion", true)).list();

                for (LocalTag lt : existingTagIds) {
                    existingTags.put(lt.getTagIndex(), lt);
                }

                ctx.log(logger, Level.INFO, new XXXXLogMarker(),
                        "Processing tag updates [" + tagCount + "-" + (tagCount + batchIndexes.size()) + "] of [" + totalTagCount + "]", new Object[]{});

                Criteria tagCriteria = webdataSession.createCriteria(XXXXTag.class);
                tagCriteria.add(Restrictions.in("TagIndex", batchIndexes));
                if (!includeTestTags) {
                    tagCriteria.add(Restrictions.eq("TestTag", "0"));
                }
                tagCriteria.setFetchMode("XXXXTagMS", FetchMode.JOIN);
                tagCriteria.setFetchMode("XXXXTagPS", FetchMode.JOIN);
                tagCriteria.setFetchMode("XXXXTagCCList", FetchMode.JOIN);
                tagCriteria.setFetchMode("XXXXTagTA", FetchMode.JOIN);
                tagCriteria.setFetchMode("XXXXTagCP", FetchMode.JOIN);
                tagCriteria.setResultTransformer(CriteriaSpecification.DISTINCT_ROOT_ENTITY);

                @SuppressWarnings("unchecked")
                List<XXXXTag> tags = tagCriteria.list();

                if (manualUpdateTagIndex != null && tags.isEmpty()) {
                    throw new ValidationException("No tag found for manual update tag index [" + manualUpdateTagIndex + "]");
                }

                for (XXXXTag tag : tags) {
                    update.getProcessedTags().add(updateTag(tag, tagIndexes.get(tag.getTagIndex()), existingTags));
                    tagCount++;
                    if (fireEventLastActions.contains(tag.getLastAction().trim())) {
                        tagPeriods.add(new Period(tag.getStartTime().getMillis(), tag.getStopTime().getMillis()));
                    }
                }

                HibernateUtil.getCurrentSpringManagedSession().flush();
                HibernateUtil.getCurrentSpringManagedSession().clear();

                webdataSession.clear();
            }
        } else {
            ctx.log(logger, Level.INFO, new XXXXLogMarker(), "No updates found", new Object[]{});
        }

        HibernateUtil.getCurrentSpringManagedSession()
        .createQuery("delete XXXXUpdate where executedAt < :purgeDate")
        .setParameter("purgeDate", new DateTime().minusDays(daysToKeepUpdateHistory))
        .executeUpdate();

        HibernateUtil.getCurrentSpringManagedSession()
        .createQuery("delete XXXXProcessedTagRequest where timestamp < :purgeDate")
        .setParameter("purgeDate", new DateTime().minusHours(hoursToKeepProcessedReqs))
        .executeUpdate();

        update.setStatus(WebdataUpdateStatus.Success);
        update.setTagCount(update.getProcessedTags().size());

        tagPeriods = Period.merge(tagPeriods);

        for (Period p : tagPeriods) {
            XXXXUpdatePeriod oup = new XXXXUpdatePeriod();
            oup.setXXXXUpdate(update);
            oup.setStartDate(p.getStart());
            oup.setEndDate(p.getEnd());
            update.getPeriods().add(oup);
        }

        HibernateUtil.getCurrentSpringManagedSession().flush();

        ctx.log(logger, Level.INFO, new XXXXLogMarker(), "XXXX data transfer complete. Transferred [" + tagCount + "] tag updates", new Object[]{});

        ctx.popLoggingContext(logCtx);
    } catch (Exception ex) {
        HibernateUtil.getCurrentSpringManagedSession().clear();
        update.getProcessedTags().clear();
        update.setTagCount(0);
        update.setStatus(WebdataUpdateStatus.TransferError);
        commit = false;
        ctx.log(logger, Level.ERROR, new XXXXLogMarker(), "XXXX data transfer failed", new Object[]{}, ex);
        throw new Exception("XXXX data transfer failed", ex);
    } finally {
        try {

            XXXXUpdateSession.saveOrUpdate(update);
            XXXXUpdateSession.getTransaction().commit();

        } catch (Exception ex) {
            commit = false;
            ctx.log(logger, Level.ERROR, new XXXXLogMarker(), "Failed to save XXXX transfer update record", new Object[]{}, ex);
            throw new Exception("Failed to save XXXX transfer update record", ex);
        } finally {
            if (!commit) {
                webdataSession.getTransaction().rollback();
            } else {
                webdataSession.getTransaction().commit();
            }
            ResourceDisposer.dispose(webdataSession);
        }

    }

}


新的会话是XXXXUpdateSession。唯一的新代码是与此会话相关的代码。这是某种时序问题,因为当我使用休眠调试日志记录时,tx提交没有问题。当我尝试调试休眠的commit()时,它也会提交。我没有冬眠的丰富经验,因此,我可能缺少明显的东西。任何帮助将不胜感激。谢谢。

最佳答案

您已经打开了导致问题的两个事务webdataSession.getTransaction().begin();(以上代码中的20和37行)。

您可以在提交第一笔交易后打开第二笔交易。

另外,最好不要采用长方法,如调试问题将变得非常困难,并成为维护/支持项目的噩梦。

10-07 19:00
查看更多