本文介绍了无响应的actor系统:ThreadPoolExecutor调度程序仅创建核心线程池,显然会忽略最大线程池大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

更新:我发现如果将ThreadPoolExecutor's核心池大小设置为与最大池大小相同(29个线程),则程序将保持响应状态.但是,如果我将核心池大小设置为11,最大池大小设置为29,则actor系统将仅创建11个线程.如何配置ActorSystem/ThreadPoolExecutor继续创建线程以超过核心线程数并保持在最大线程数之内?我宁愿不要将核心线程数设置为最大线程数,因为我只需要多余的线程来取消作业(这是罕见的事件).


我有一个针对Oracle数据库运行的批处理程序,该程序使用Java/Akka类型的actor和以下actor实现:

  1. BatchManager负责与REST控制器进行对话.它管理未初始化的批处理作业的Queue;当从队列中轮询未初始化的批处理作业时,它将变成JobManager actor并执行.
  2. JobManager维护存储过程队列和Workers池;它使用存储过程初始化每个Worker,当Worker完成时,它将过程的结果发送到JobManager,而JobManager发送另一个存储过程到Worker.当作业队列为空且所有Workers都空闲时,该批处理终止,此时JobManager将其结果报告给BatchManager,关闭其工作程序(通过TypedActor.context().stop()),然后关闭自身. JobManagerPromise<Status> completion在作业成功完成或由于取消或致命异常而终止作业时完成.
  3. Worker执行存储过程.它会创建 OracleConnection 和 CallableStatement 一个onFailure回调,其中JobManager.completionabort连接并使用cancel语句.此回调不使用actor系统的执行上下文,而是使用从BatchManager中创建的缓存执行程序服务创建的执行上下文.

我的配置是

{"akka" : { "actor" : { "default-dispatcher" : {
    "type" : "Dispatcher",
    "executor" : "default-executor",
    "throughput" : "1",
    "default-executor" : { "fallback" : "thread-pool-executor" }
    "thread-pool-executor" : {
        "keep-alive-time" : "60s",
        "core-pool-size-min" : coreActorCount,
        "core-pool-size-max" : coreActorCount,
        "max-pool-size-min" : maxActorCount,
        "max-pool-size-max" : maxActorCount,
        "task-queue-size" : "-1",
        "task-queue-type" : "linked",
        "allow-core-timeout" : "on"
}}}}}

在其他位置(当前为workerCount = 8)配置了工作程序数量; coreActorCountworkerCount + 3,而maxActorCountworkerCount * 3 + 5.我正在具有两个内核和8GB内存的Macbook Pro 10上对此进行测试;生产服务器要大得多.我正在交谈的数据库位于极慢的VPN后面.我正在使用Oracle的JavaSE 1.8 JVM运行所有这些程序.本地服务器是Tomcat7.OracleJDBC驱动程序是10.2版(我也许可以说服使用较新版本的能力).所有方法都返回voidFuture<>并且应该是非阻塞的.

当一个批次成功终止时,就没有问题-下一个批次将立即由完整的工作人员开始.但是,如果我通过JobManager#completion.tryFailure(new CancellationException("Batch cancelled"))终止当前批处理,则触发由Workers注册的onFailure回调,然后系统将变得无响应.调试printlns指示新批处理从八个正常工作的工作器中的三个开始,并且BatchManager变得完全无响应(我添加了一个Future<String> ping命令,该命令仅返回Futures.successful("ping"),并且也超时了). onFailure回调在单独的线程池上执行,即使它们在actor系统的线程池上,我也应该有足够高的max-pool-size来容纳原始的JobManager,其WorkersonFailure回调,第二个JobManagerWorkers.相反,我似乎要容纳原始的JobManager及其Workers,新的JobManager及其不到Workers的一半,而BatchManager.所剩无几资源短缺,但似乎应该可以运行十几个线程.

这是配置问题吗?这是由于JVM施加的限制和/或Tomcat施加的限制吗?这是因为我处理阻塞IO的方式有问题吗?可能还有其他几件事我可能做错了,这些都是我想到的.

CancelableableStatement的要旨,其中CallableStatementOracleConnection被取消了

不可改变的要旨其中创建了CancellableStatements

JobManager清理代码要点

通过System.out.println(mergedConfig.toString());

获得的

配置转储


我相信我已将问题缩小到参与者系统(它的配置或它与阻止数据库调用的交互).我消除了Worker参与者,并将其工作量移至以固定大小ThreadPoolExecutor执行的Runnables,其中每个JobManager创建自己的ThreadPoolExecutor并在批处理完成时将其关闭(shutDown正常终止,shutDownNow(特殊终止).取消在BatchManager中实例化的缓存线程池上运行. actor系统的调度程序仍然是ThreadPoolExecutor,但仅分配了六个线程.使用此备用设置,取消将按预期执行-工作者在其数据库连接中止时终止,并且新的JobManager会立即以完全补充的工作者线程执行.这向我表明这不是硬件/JVM/Tomcat问题.


更新:我使用 Eclipse的内存分析器进行了线程转储.我发现取消线程挂在CallableStatement.close()上,因此我对取消进行了重新排序,以便OracleConnection.abort()CallableStatement.cancel()之前,这解决了问题-所有取消操作(显然)都正确执行.但是,Worker线程继续挂在语句上-我怀疑我的VPN可能部分或全部归咎于此.

PerformanceAsync-akka.actor.default-dispatcher-19
  at java.net.SocketInputStream.socketRead0(Ljava/io/FileDescriptor;[BIII)I (Native Method)
  at java.net.SocketInputStream.read([BIII)I (SocketInputStream.java:150)
  at java.net.SocketInputStream.read([BII)I (SocketInputStream.java:121)
  at oracle.net.ns.Packet.receive()V (Unknown Source)
  at oracle.net.ns.DataPacket.receive()V (Unknown Source)
  at oracle.net.ns.NetInputStream.getNextPacket()V (Unknown Source)
  at oracle.net.ns.NetInputStream.read([BII)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read([B)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read()I (Unknown Source)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1()S (T4CMAREngine.java:1109)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1()B (T4CMAREngine.java:1080)
  at oracle.jdbc.driver.T4C8Oall.receive()V (T4C8Oall.java:485)
  at oracle.jdbc.driver.T4CCallableStatement.doOall8(ZZZZ)V (T4CCallableStatement.java:218)
  at oracle.jdbc.driver.T4CCallableStatement.executeForRows(Z)V (T4CCallableStatement.java:971)
  at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout()V (OracleStatement.java:1192)
  at oracle.jdbc.driver.OraclePreparedStatement.executeInternal()I (OraclePreparedStatement.java:3415)
  at oracle.jdbc.driver.OraclePreparedStatement.execute()Z (OraclePreparedStatement.java:3521)
  at oracle.jdbc.driver.OracleCallableStatement.execute()Z (OracleCallableStatement.java:4612)
  at com.util.CPProcExecutor.execute(Loracle/jdbc/OracleConnection;Ljava/sql/CallableStatement;Lcom/controller/BaseJobRequest;)V (CPProcExecutor.java:57)

但是,即使在确定了取消订单之后,我仍然遇到问题,即参与者系统无法创建足够的线程:在新批次中,我仍然只有八分之三的工人,而新的工人被添加为被取消的工作人员的网络连接超时.总的来说,我有11个线程-我的核心池大小,而29个线程中-我的最大池大小.显然actor系统忽略了我的最大池大小参数,或者我没有正确配置最大池大小.

解决方案

(免责声明:我不知道Akka)

根据您的以下queue-size = -1配置,我想任务队列是无界的.

  "task-queue-size": "-1",
  "task-queue-type": "linked"

ThreadPoolExecutor 不会产生除非工作队列已满并且无法排队,否则将超出核心池的大小.仅当任务队列已满时,它才会开始生成最大线程数.

请检查是否可以解决有限的队列大小,并查看线程是否增加到最大线程数.谢谢.

Update: I've found that my program remains responsive if I set the ThreadPoolExecutor's core pool size to be the same as the max pool size (29 threads). However, if I set the core pool size to 11 and the max pool size to 29 then the actor system only ever creates 11 threads. How can I configure the ActorSystem / ThreadPoolExecutor to continue to create threads to exceed the core thread count and stay within the max thread count? I would prefer not to set the core thread count to the max thread count, as I only need the extra threads for a job cancellation (which should be a rare event).


I have a batch program running against an Oracle database, implemented using Java/Akka typed actors with the following actors:

  1. BatchManager is in charge of talking to the REST controller. It manages a Queue of uninitialized batch jobs; when an uninitialized batch job is polled from the queue then it is turned into a JobManager actor and executed.
  2. JobManager maintains a queue of stored procedures and a pool of Workers; it initializes each Worker with a stored procedure, and when a Worker finishes it sends the procedure's result to the JobManager, and the JobManager sends another stored procedure to the Worker. The batch terminates when the job queue is empty and all Workers are idle, at which point the JobManager reports its results to the BatchManager, shuts down its workers (via TypedActor.context().stop()), and then shuts itself down. The JobManager has a Promise<Status> completion that is completed when the job successfully finishes or else when the job is terminated due to cancellation or a fatal exception.
  3. Worker executes a stored procedure. It creates the OracleConnection and a CallableStatement used to execute the stored procedures, and registers an onFailure callback with JobManager.completion to abort the connection and cancel the statement. This callback doesn't use the actor system's execution context, instead it uses an execution context created from a cached executor service created in BatchManager.

My config is

{"akka" : { "actor" : { "default-dispatcher" : {
    "type" : "Dispatcher",
    "executor" : "default-executor",
    "throughput" : "1",
    "default-executor" : { "fallback" : "thread-pool-executor" }
    "thread-pool-executor" : {
        "keep-alive-time" : "60s",
        "core-pool-size-min" : coreActorCount,
        "core-pool-size-max" : coreActorCount,
        "max-pool-size-min" : maxActorCount,
        "max-pool-size-max" : maxActorCount,
        "task-queue-size" : "-1",
        "task-queue-type" : "linked",
        "allow-core-timeout" : "on"
}}}}}

The number of workers is configured elsewhere, currently workerCount = 8; coreActorCount is workerCount + 3 while maxActorCount is workerCount * 3 + 5. I'm testing this on a Macbook Pro 10 with two cores and 8GB of memory; the production server is considerably larger. The database I'm talking to is behind an extremely slow VPN. I'm running all of this using Oracle's JavaSE 1.8 JVM. The local server is Tomcat 7. The Oracle JDBC drivers are version 10.2 (I might be able to convince the powers that be to use a newer version). All methods either return void or Future<> and ought to be non-blocking.

When one batch terminates successfully then there is no issue - the next batch starts immediately with a full complement of workers. However, if I terminate the current batch via JobManager#completion.tryFailure(new CancellationException("Batch cancelled")) then the onFailure callbacks registered by the Workers fire off, and then the system becomes unresponsive. Debug printlns indicate that the new batch starts with three out of eight functioning workers, and the BatchManager becomes completely unresponsive (I added a Future<String> ping command that just returns a Futures.successful("ping") and this also times out). The onFailure callbacks are executing on a separate thread pool, and even if they were on the actor system's thread pool I should have a high enough max-pool-size to accommodate the original JobManager, its Workers, its onFailure callbacks, and a second JobManager and is Workers. Instead I seem to be accommodating the original JobManager and its Workers, the new JobManager and less than half of its Workers, and nothing left over for the BatchManager. The computer I'm running this on is short on resources, but it seems like it ought to be able to run more than a dozen threads.

Is this a configuration issue? Is this due to a JVM-imposed limit and/or a Tomcat-imposed limit? Is this due to a problem with how I'm handling blocking IO? There are probably several other things I could be doing wrong, these are just what came to mind.

Gist of CancellableStatement where the CallableStatement and OracleConnection are cancelled

Gist of Immutable where CancellableStatements are created

Gist of JobManager's cleanup code

Config dump obtained via System.out.println(mergedConfig.toString());


Edit: I believe that I've narrowed down the problem to the actor system (either its configuration or its interaction with blocking database calls). I eliminated the Worker actors and moved their workload to Runnables that execute on a fixed-size ThreadPoolExecutor, where each JobManager creates its own ThreadPoolExecutor and shuts it down when the batch completes (shutDown on normal termination, shutDownNow on exceptional termination). Cancellation runs on a cached thread pool instantiated in the BatchManager. The actor system's dispatcher is still a ThreadPoolExecutor but with only a half dozen threads allocated to it. Using this alternate setup, cancellation executes as expected - the workers terminate when their database connections are aborted, and the new JobManager executes immediately with a full complement of worker threads. This indicates to me that this is not a hardware/JVM/Tomcat issue.


Update: I did a thread dump using Eclipse's Memory Analyzer. I discovered that the cancellation threads were hanging on CallableStatement.close(), so I reordered the cancellation so that OracleConnection.abort() preceded CallableStatement.cancel() and this cleared up the problem - the cancellations all (apparently) executed correctly. The Worker threads continued to hang on their statements, though - I suspect that my VPN may be partially or totally to blame for this.

PerformanceAsync-akka.actor.default-dispatcher-19
  at java.net.SocketInputStream.socketRead0(Ljava/io/FileDescriptor;[BIII)I (Native Method)
  at java.net.SocketInputStream.read([BIII)I (SocketInputStream.java:150)
  at java.net.SocketInputStream.read([BII)I (SocketInputStream.java:121)
  at oracle.net.ns.Packet.receive()V (Unknown Source)
  at oracle.net.ns.DataPacket.receive()V (Unknown Source)
  at oracle.net.ns.NetInputStream.getNextPacket()V (Unknown Source)
  at oracle.net.ns.NetInputStream.read([BII)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read([B)I (Unknown Source)
  at oracle.net.ns.NetInputStream.read()I (Unknown Source)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1()S (T4CMAREngine.java:1109)
  at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1()B (T4CMAREngine.java:1080)
  at oracle.jdbc.driver.T4C8Oall.receive()V (T4C8Oall.java:485)
  at oracle.jdbc.driver.T4CCallableStatement.doOall8(ZZZZ)V (T4CCallableStatement.java:218)
  at oracle.jdbc.driver.T4CCallableStatement.executeForRows(Z)V (T4CCallableStatement.java:971)
  at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout()V (OracleStatement.java:1192)
  at oracle.jdbc.driver.OraclePreparedStatement.executeInternal()I (OraclePreparedStatement.java:3415)
  at oracle.jdbc.driver.OraclePreparedStatement.execute()Z (OraclePreparedStatement.java:3521)
  at oracle.jdbc.driver.OracleCallableStatement.execute()Z (OracleCallableStatement.java:4612)
  at com.util.CPProcExecutor.execute(Loracle/jdbc/OracleConnection;Ljava/sql/CallableStatement;Lcom/controller/BaseJobRequest;)V (CPProcExecutor.java:57)

However, even after fixing the cancellation order I still have the problem where the actor system isn't creating enough threads: I'm still only getting three out of eight workers in the new batch, with new workers being added as the cancelled workers have their network connections time out. In total I've got 11 threads - my core pool size, out of 29 threads - my max pool size. Apparently the actor system is ignoring my max pool size parameter, or I'm not configuring the max pool size correctly.

解决方案

(Disclaimer: I don't know Akka)

By your below configuration of queue-size=-1, I guess, the task queue is unbounded.

  "task-queue-size": "-1",
  "task-queue-type": "linked"

ThreadPoolExecutor will not spawn beyond core pool size unless the work queue is full and is not able to queue. Only if the task queue is full, it will start spawning upto max threads.

Please check if you can fix up a limited queue size and see if the threads are increasing to max threads. Thanks.

这篇关于无响应的actor系统:ThreadPoolExecutor调度程序仅创建核心线程池,显然会忽略最大线程池大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 06:21