我正在以一个在多线程环境中用户定义的批处理大小写入内存中的分布式数据库。但是我想限制写入ex的行数。 1000行/秒产生此要求的原因是我的生产者写得太快,而使用者却遇到了叶子内存错误。在对记录进行批处理时,是否有任何标准实践来执行节流。

dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
      val dbRecords = recordSet.map(m => (m, Events.transform(m)))
      dbRecords.map { record =>
        try {
          Events.setValues(eventInsert, record._2)
          eventInsert.addBatch
        } catch {
          case e: Exception =>
            logger.error(s"error adding batch: ${e.getMessage}")
            val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"event: $error_event")
        }
      }

      // Bulk Commit Records
      try {
        eventInsert.executeBatch
      } catch {
        case e: java.sql.BatchUpdateException =>
          val updates = e.getUpdateCounts
          logger.error(s"failed commit: ${updates.toString}")
          updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
            val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
            logger.error(s"insert error: $error")
            logger.error(e.getMessage)
          }
      }
      finally {
        connection.commit
        eventInsert.clearBatch
        logger.debug(s"committed: ${dbRecords.length.toString}")
      }
    }

我希望是否可以将用户定义的参数作为throttleMax传递,并且如果每个线程写入的总记录达到throttleMax,将调用thread.sleep()1秒钟。但这会使整个过程非常缓慢。可以使用其他有效方法将数据加载速度限制为1000行/秒吗?

最佳答案

正如其他人所建议的那样(请参阅问题注释),与在这里进行节流相比,您有更好的选择。但是,您可以使用一些简单的代码来限制Java中的操作,如下所示:

/**
 * Given an Iterator `inner`, returns a new Iterator which will emit items upon
 * request, but throttled to at most one item every `minDelayMs` milliseconds.
 */
public static <T> Iterator<T> throttledIterator(Iterator<T> inner, int minDelayMs) {
    return new Iterator<T>() {
        private long lastEmittedMillis = System.currentTimeMillis() - minDelayMs;

        @Override
        public boolean hasNext() {
            return inner.hasNext();
        }

        @Override
        public T next() {
            long now = System.currentTimeMillis();
            long requiredDelayMs = now - lastEmittedMillis;
            if (requiredDelayMs > 0) {
                try {
                    Thread.sleep(requiredDelayMs);
                } catch (InterruptedException e) {
                    // resume
                }
            }
            lastEmittedMillis = now;

            return inner.next();
        }
    };
}

上面的代码使用Thread.sleep,因此不适合在Reactive系统中使用。在这种情况下,您可能想使用该系统中提供的Throttle实现,例如 throttle in Akka

09-28 03:15