具有持久工作者实例的线程池

具有持久工作者实例的线程池

本文介绍了具有持久工作者实例的线程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图排队线程池中的任务,一旦工人变得免费,我已经找到了各种例子,但是在所有情况下,这些例子都已经设置为每个人都使用一个新的Worker实例工作,我想要持久的工作。



我试图做一个ftp备份工具,我有它的工作,但由于单个连接的限制,它很慢。我最想做的事情是建立一个单一的连接来扫描目录并建立一个文件列表,然后由四名工作人员来下载这些文件。



这是一个我的FTP的例子worker:

  public class Worker实现Runnable {
protected FTPClient _ftp;

//连接详情
保护字符串_host =;
保护字符串_user =;
protected String _pass =;

//工作状态
保护布尔_working = false;

public Worker(String host,String user,String pass){
this._host = host;
this._user = user;
this._pass = pass;
}

//检查工作人员是否正在使用
public boolean inUse(){
return this._working;

$ b @Override
public void run(){
this._ftp = new FTPClient();
this._connect();


从ftp服务器下载文件
public boolean download(String base,String path,String file){
this._working = true;
布尔结果= true;

//创建目录(如果不存在)
文件pathDir = new File(base + path);
if(!pathDir.exists()){
pathDir.mkdirs();
}

//下载文件
尝试{
OutputStream output = new FileOutputStream(base + path + file);
this._ftp.retrieveFile(file,output);
output.close();
} catch(Exception e){
outcome = false;
} finally {
this._working = false;
返回结果;



//连接到服务器
protected boolean _connect(){
try {
this._ftp.connect( this._host);
this._ftp.login(this._user,this._pass);
} catch(Exception e){
return false;
}
返回this._ftp.isConnected();


断开与服务器的连接
protected void _disconnect(){
try {
this._ftp.disconnect();
} catch(Exception e){/ * do nothing * /}
}
}

每当工作人员可用而无需创建时,我希望能够为队列中的每个任务调用 Worker.download(...)每次下载都有一个到ftp服务器的新连接。

任何帮助都会被赞赏,因为我之前从未使用过线程,现在我正在绕圈。

解决方案

这是一个常见的问题,有几个不同的解决方案。你想要的是每个线程的 ,而不是每个 Runnable Callable 提交给 ExecutorService



一个选项是使用 ThreadLocal 这会创建你的 ftp 实例。这不是最优的,因为当线程终止时,没有简单的方法来关闭ftp连接。然后,你可以通过限制线程池中运行的线程数来限制连接数。



我认为更好的解决方案是使用 ExecutorService 来分叉你的工作线程。为每个工作人员注入一个 BlockingQueue ,他们都使用它们去出队并执行他们需要做的任务。这与 ExecutorService 内部使用的队列是分开的。然后,您可以将任务添加到您的队列中,而不是添加到 ExecutorService 本身。

  private static final BlockingQueue< FtpTask> taskQueue 
= new ArrayBlockingQueue< FtpTask>();

所以你的任务对象应该是这样的:

  public static class FtpTask {
String base;
字符串路径;
字符串文件;

然后 run() Worker 类中的c>方法会做类似的事情:

  public void run(){
//使我们的永久ftp实例
this._ftp = new FTPClient();
//将它连接到此线程的生命
this._connect();
尝试{
//循环获取任务,直到被中断为止
//也可以使用volatile boolean!shutdown($!Thread.currentThread()。isInterrupted()){
FtpTask task = taskQueue.take();
//如果你正在使用毒丸
if(task == SHUTDOWN_TASK){
break;
}
//在这里下载
下载(task.base,task.path,task.file);
}
}最后{
this._disconnect();






$ b

同样,通过限制线程池中运行的线程数。

我会有一个 Executors.newFixedThreadPool(5); 并添加一个执行扫描/构建的线程和4个正在下载的工作线程。当工作线程从同一个队列中取出时,扫描线程将放入 BlockingQueue 中。


I'm trying to queue up tasks in a thread pool to be executed as soon as a worker becomes free, i have found various examples of this but in all cases the examples have been setup to use a new Worker instance for each job, i want persistent workers.

I'm trying to make a ftp backup tool, i have it working but because of the limitations of a single connection it is slow. What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.

Here is an example of my FTP worker:

public class Worker implements Runnable {
  protected FTPClient _ftp;

  // Connection details
  protected String _host = "";
  protected String _user = "";
  protected String _pass = "";

  // worker status
  protected boolean _working = false;

  public Worker(String host, String user, String pass) {
    this._host = host;
    this._user = user;
    this._pass = pass;
  }

   // Check if the worker is in use
  public boolean inUse() {
    return this._working;
  }

  @Override
  public void run() {
    this._ftp = new FTPClient();
    this._connect();
  }

  // Download a file from the ftp server
  public boolean download(String base, String path, String file) {
    this._working   = true;
    boolean outcome = true;

    //create directory if not exists
    File pathDir = new File(base + path);
    if (!pathDir.exists()) {
      pathDir.mkdirs();
    }

    //download file
    try {
      OutputStream output = new FileOutputStream(base + path + file);
      this._ftp.retrieveFile(file, output);
      output.close();
    } catch (Exception e) {
      outcome = false;
    } finally {
      this._working = false;
      return outcome;
    }
  }

  // Connect to the server
  protected boolean _connect() {
    try {
      this._ftp.connect(this._host);
      this._ftp.login(this._user, this._pass);
    } catch (Exception e) {
      return false;
    }
    return this._ftp.isConnected();
  }

  // Disconnect from the server
  protected void _disconnect() {
    try {
      this._ftp.disconnect();
    } catch (Exception e) { /* do nothing */ }
  }
}

I want to be able to call Worker.download(...) for each task in a queue whenever a worker becomes available without having to create a new connection to the ftp server for each download.

Any help would be appreciated as I've never used threads before and I'm going round in circles at the moment.

解决方案

This is a common question with a couple of different solutions. What you want is some context per thread as opposed to per Runnable or Callable that would be submitting to an ExecutorService.

One option would be to have a ThreadLocal which would create your ftp instances. This is not optimal because there would be no easy way to shutdown the ftp connection when the thread is terminated. You would then limit the number of connections by limiting the number of threads running in your thread-pool.

I think a better solution would be to use the ExecutorService only to fork your worker threads. For each worker, inject into them a BlockingQueue that they all use to dequeue and perform the tasks they need to do. This is separate from the queue used internally by the ExecutorService. You would then add the tasks to your queue and not to the ExecutorService itself.

private static final BlockingQueue<FtpTask> taskQueue
        = new ArrayBlockingQueue<FtpTask>();

So your task object would have something like:

public static class FtpTask {
     String base;
     String path;
     String file;
}

Then the run() method in your Worker class would do something like:

public void run() {
    // make our permanent ftp instance
    this._ftp = new FTPClient();
    // connect it for the life of this thread
    this._connect();
    try {
        // loop getting tasks until we are interrupted
        // could also use volatile boolean !shutdown
        while (!Thread.currentThread().isInterrupted()) {
            FtpTask task = taskQueue.take();
            // if you are using a poison pill
            if (task == SHUTDOWN_TASK) {
                break;
            }
            // do the download here
            download(task.base, task.path, task.file);
        }
    } finally {
        this._disconnect();
    }
}

Again, you limit the number of connections by limiting the number of threads running in your thread-pool.

I would have a Executors.newFixedThreadPool(5); and add one thread which does the scanning/building and 4 worker threads that are doing the downloading. The scanning thread would be putting to the BlockingQueue while the worker threads are taking from the same queue.

这篇关于具有持久工作者实例的线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 20:39
查看更多