问题描述
我试图排队线程池中的任务,一旦工人变得免费,我已经找到了各种例子,但是在所有情况下,这些例子都已经设置为每个人都使用一个新的Worker实例工作,我想要持久的工作。
我试图做一个ftp备份工具,我有它的工作,但由于单个连接的限制,它很慢。我最想做的事情是建立一个单一的连接来扫描目录并建立一个文件列表,然后由四名工作人员来下载这些文件。
这是一个我的FTP的例子worker:
public class Worker实现Runnable {
protected FTPClient _ftp;
//连接详情
保护字符串_host =;
保护字符串_user =;
protected String _pass =;
//工作状态
保护布尔_working = false;
public Worker(String host,String user,String pass){
this._host = host;
this._user = user;
this._pass = pass;
}
//检查工作人员是否正在使用
public boolean inUse(){
return this._working;
$ b @Override
public void run(){
this._ftp = new FTPClient();
this._connect();
从ftp服务器下载文件
public boolean download(String base,String path,String file){
this._working = true;
布尔结果= true;
//创建目录(如果不存在)
文件pathDir = new File(base + path);
if(!pathDir.exists()){
pathDir.mkdirs();
}
//下载文件
尝试{
OutputStream output = new FileOutputStream(base + path + file);
this._ftp.retrieveFile(file,output);
output.close();
} catch(Exception e){
outcome = false;
} finally {
this._working = false;
返回结果;
//连接到服务器
protected boolean _connect(){
try {
this._ftp.connect( this._host);
this._ftp.login(this._user,this._pass);
} catch(Exception e){
return false;
}
返回this._ftp.isConnected();
断开与服务器的连接
protected void _disconnect(){
try {
this._ftp.disconnect();
} catch(Exception e){/ * do nothing * /}
}
}
每当工作人员可用而无需创建时,我希望能够为队列中的每个任务调用 Worker.download(...)
每次下载都有一个到ftp服务器的新连接。
任何帮助都会被赞赏,因为我之前从未使用过线程,现在我正在绕圈。
这是一个常见的问题,有几个不同的解决方案。你想要的是每个线程的 ,而不是每个 Runnable
或 Callable
提交给 ExecutorService
。
一个选项是使用 ThreadLocal
这会创建你的
ftp
实例。这不是最优的,因为当线程终止时,没有简单的方法来关闭ftp连接。然后,你可以通过限制线程池中运行的线程数来限制连接数。
我认为更好的解决方案是使用 ExecutorService
来分叉你的工作线程。为每个工作人员注入一个 BlockingQueue
,他们都使用它们去出队并执行他们需要做的任务。这与 ExecutorService
内部使用的队列是分开的。然后,您可以将任务添加到您的队列中,而不是添加到 ExecutorService
本身。
private static final BlockingQueue< FtpTask> taskQueue
= new ArrayBlockingQueue< FtpTask>();
所以你的任务对象应该是这样的:
public static class FtpTask {
String base;
字符串路径;
字符串文件;
然后 run()$ c $在你的
Worker
类中的c>方法会做类似的事情:
public void run(){
//使我们的永久ftp实例
this._ftp = new FTPClient();
//将它连接到此线程的生命
this._connect();
尝试{
//循环获取任务,直到被中断为止
//也可以使用volatile boolean!shutdown($!Thread.currentThread()。isInterrupted()){
FtpTask task = taskQueue.take();
//如果你正在使用毒丸
if(task == SHUTDOWN_TASK){
break;
}
//在这里下载
下载(task.base,task.path,task.file);
}
}最后{
this._disconnect();
$ b 同样,通过限制线程池中运行的线程数。
我会有一个 Executors.newFixedThreadPool(5);
并添加一个执行扫描/构建的线程和4个正在下载的工作线程。当工作线程从同一个队列中取出时,扫描线程将放入 BlockingQueue
中。
I'm trying to queue up tasks in a thread pool to be executed as soon as a worker becomes free, i have found various examples of this but in all cases the examples have been setup to use a new Worker instance for each job, i want persistent workers.
I'm trying to make a ftp backup tool, i have it working but because of the limitations of a single connection it is slow. What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.
Here is an example of my FTP worker:
public class Worker implements Runnable {
protected FTPClient _ftp;
// Connection details
protected String _host = "";
protected String _user = "";
protected String _pass = "";
// worker status
protected boolean _working = false;
public Worker(String host, String user, String pass) {
this._host = host;
this._user = user;
this._pass = pass;
}
// Check if the worker is in use
public boolean inUse() {
return this._working;
}
@Override
public void run() {
this._ftp = new FTPClient();
this._connect();
}
// Download a file from the ftp server
public boolean download(String base, String path, String file) {
this._working = true;
boolean outcome = true;
//create directory if not exists
File pathDir = new File(base + path);
if (!pathDir.exists()) {
pathDir.mkdirs();
}
//download file
try {
OutputStream output = new FileOutputStream(base + path + file);
this._ftp.retrieveFile(file, output);
output.close();
} catch (Exception e) {
outcome = false;
} finally {
this._working = false;
return outcome;
}
}
// Connect to the server
protected boolean _connect() {
try {
this._ftp.connect(this._host);
this._ftp.login(this._user, this._pass);
} catch (Exception e) {
return false;
}
return this._ftp.isConnected();
}
// Disconnect from the server
protected void _disconnect() {
try {
this._ftp.disconnect();
} catch (Exception e) { /* do nothing */ }
}
}
I want to be able to call Worker.download(...)
for each task in a queue whenever a worker becomes available without having to create a new connection to the ftp server for each download.
Any help would be appreciated as I've never used threads before and I'm going round in circles at the moment.
解决方案 This is a common question with a couple of different solutions. What you want is some context per thread as opposed to per Runnable
or Callable
that would be submitting to an ExecutorService
.
One option would be to have a ThreadLocal
which would create your ftp
instances. This is not optimal because there would be no easy way to shutdown the ftp connection when the thread is terminated. You would then limit the number of connections by limiting the number of threads running in your thread-pool.
I think a better solution would be to use the ExecutorService
only to fork your worker threads. For each worker, inject into them a BlockingQueue
that they all use to dequeue and perform the tasks they need to do. This is separate from the queue used internally by the ExecutorService
. You would then add the tasks to your queue and not to the ExecutorService
itself.
private static final BlockingQueue<FtpTask> taskQueue
= new ArrayBlockingQueue<FtpTask>();
So your task object would have something like:
public static class FtpTask {
String base;
String path;
String file;
}
Then the run()
method in your Worker
class would do something like:
public void run() {
// make our permanent ftp instance
this._ftp = new FTPClient();
// connect it for the life of this thread
this._connect();
try {
// loop getting tasks until we are interrupted
// could also use volatile boolean !shutdown
while (!Thread.currentThread().isInterrupted()) {
FtpTask task = taskQueue.take();
// if you are using a poison pill
if (task == SHUTDOWN_TASK) {
break;
}
// do the download here
download(task.base, task.path, task.file);
}
} finally {
this._disconnect();
}
}
Again, you limit the number of connections by limiting the number of threads running in your thread-pool.
I would have a Executors.newFixedThreadPool(5);
and add one thread which does the scanning/building and 4 worker threads that are doing the downloading. The scanning thread would be putting to the BlockingQueue
while the worker threads are taking from the same queue.
这篇关于具有持久工作者实例的线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!