eue与PipedOutputStream和PipedInput

eue与PipedOutputStream和PipedInput

本文介绍了BlockingQueue与PipedOutputStream和PipedInputStream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道使用BlockingQueue代替(PipedOutputStreamPipedInputStream)的优点

I want to know Advantages to use BlockingQueue instead of (PipedOutputStream and PipedInputStream)

import java.io.*;
import java.util.concurrent.*;


public class PipedStreamVsBlocking {

  public static void main(String... args) {

    BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
    ExecutorService executor = Executors.newFixedThreadPool(4);
    Runnable producerTask = () -> {
      try {
        while (true) {
          int value = ThreadLocalRandom.current().nextInt(0, 1000);
          blockingQueue.put(value);
          System.out.println("BlockingQueue.Produced " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    };
    Runnable consumerTask = () -> {
      try {
        while (true) {
          int value = blockingQueue.take();
          System.out.println("BlockingQueue.Consume " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    };

    PipedOutputStream pipedSrc = new PipedOutputStream();
    PipedInputStream pipedSnk = new PipedInputStream();
    try {
      pipedSnk.connect(pipedSrc);
    } catch (IOException e) {
      e.printStackTrace();
    }

    Runnable runnablePut2 = () -> {
      try {
        ObjectOutputStream oos = new ObjectOutputStream(pipedSrc);
        while (true) {
          int value = ThreadLocalRandom.current().nextInt(0, 1000);
          oos.writeInt(value);
          oos.flush();
          System.out.println("PipedStream.Produced " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    };

    Runnable runnableGet2 = () -> {
      try {
        ObjectInputStream ois = new ObjectInputStream(pipedSnk);
        while (true) {
          int value = ois.readInt();
          System.out.println("PipedStream.Consume " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    };
    executor.execute(producerTask);
    executor.execute(consumerTask);
    executor.execute(runnablePut2);
    executor.execute(runnableGet2);
    executor.shutdown();
  }

}

此代码的输出为:

BlockingQueue.Consume 298
BlockingQueue.Produced 298
PipedStream.Produced 510
PipedStream.Consume 510
BlockingQueue.Produced 536
BlockingQueue.Consume 536
PipedStream.Produced 751
PipedStream.Consume 751
PipedStream.Produced 619
BlockingQueue.Produced 584
BlockingQueue.Consume 584
PipedStream.Consume 619
BlockingQueue.Produced 327
PipedStream.Produced 72
BlockingQueue.Consume 327
PipedStream.Consume 72
BlockingQueue.Produced 823
BlockingQueue.Consume 823
PipedStream.Produced 544
PipedStream.Consume 544
BlockingQueue.Produced 352
BlockingQueue.Consume 352
PipedStream.Produced 134
PipedStream.Consume 134

我认为使用PipedStream(PipedOutputStreamPipedInputStream)具有优势,我知道何时直接生成/处理数据.

I think that to use PipedStream (PipedOutputStream and PipedInputStream) have advantages, I know when the data is produced/Processed directly.

可能是我错了,并且此建议使用BlockingQueue而不是Pipe.

May be I wrong, And this recommendation to use BlockingQueue instead of Pipe.

但是,在文档中找不到您的评论/建议.因此,我需要知道我错过了什么.

But, your comments/recommendations are not found in the documentation.For this reason, I need to know what I missed.

为什么我应该使用BlockingQueue而不是Piped?

推荐答案

像任何Java Collection一样,BlockingQueue存储对对象的引用,因此线程从中检索对象接收完全相同的运行时对象,将生成线程放入其中.

Like any Java Collection, a BlockingQueue stores references to objects, so the thread(s) retrieving objects from it receive precisely the same runtime objects, the producing thread(s) put into it.

相反,序列化将持久性格式存储到字节流中,该格式仅适用于Serializable对象,并将导致在接收端创建副本.在某些情况下,这些对象之后可能会被规范对象替换,但是,整个过程比仅传输引用要昂贵得多.

In contrast, Serialization stores a persistent form into the byte stream, which only works for Serializable objects and will lead to the creation of copies at the receiving end. In some cases, the objects may get replaced by canonical objects afterwards, still, the entire procedure is significantly more expensive than just transferring references.

在您的示例情况下,在您传输int值的情况下,对象标识并不重要,但是对Integer实例进行装箱,序列化,反序列化和取消装箱的开销更为可疑.

In your example case, where you transfer int values, the object identity doesn’t matter, but the overhead of boxing, serializing, deserializing, and unboxing Integer instances is even more questionable.

如果您不使用序列化,而是直接将int值作为四个byte数量传输,则使用PipedOutputStreamPipedInputStream有一点,因为它是传输大量原始数据的好工具.它还具有通过关闭管道来标记数据结束的内在支持.

If you didn’t use Serialization, but transferred int values as four byte quantities directly, using PipedOutputStream and PipedInputStream had a point, as it is a good tool for transferring large quantities of primitive data. It also has an intrinsic support for marking the end of data by closing the pipe.

这些管道对于那些对于流程甚至运行生产者或消费者的计算机都应该是不可知的软件来说,也是正确的工具,例如,当您希望能够在管道之间实际使用相同的软件时,甚至是网络连接.这也可以证明使用序列化(就像JMX连接一样).

These pipes would also be the right tool for software that ought to be agnostic regarding processes or even the computers running the producer or consumer, i.e. when you want to be able to use the same software when the pipe is actually between processes or even a network connection. That would also justify using Serialization (as JMX connections do).

但是,除非您真正传输的单个字节在被撕裂时仍保留其含义,否则存在一个固有的限制,即只有一个生产者可以写入管道,只有一个使用者可以读取数据.

But unless you’re truly transferring single bytes that retain their meaning when being torn apart, there’s the intrinsic limitation that only one producer can write into a pipe and only one consumer can read the data.

这篇关于BlockingQueue与PipedOutputStream和PipedInputStream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-28 06:50