本文介绍了Akka-Stream实现比单线程实现慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时删除!!

从2015-10-30起更新

基于Roland Kuhn Awnser:

based on Roland Kuhn Awnser:

另一个误解是说流意味着并行性:在
中,您的代码全部计算按顺序在单个Actor中运行(在地图
阶段),因此与原始
单线程解决方案相比,没有任何好处。

Another misconception is that saying "stream" implies parallelism: in your code all computation runs sequentially in a single Actor (the map stage), so no benefit can be expected over the primitive single-threaded solution.

为了从Akka Streams提供的并行性中受益,您
需要具有多个处理阶段,每个阶段执行以下任务:

In order to benefit from the parallelism afforded by Akka Streams you need to have multiple processing stages that each perform tasks of


我做了一些更改。我的代码现在看起来像:

I did some changes. My code now looks like:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
    import FlowGraph.Implicits._

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
    val mergeEvents = builder.add(Merge[Int](4))

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)

    (dispatchTuple.in, mergeEvents.out)
  }

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()
  }

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)
    actorSystem.shutdown
    actorSystem.awaitTermination
    System.exit(-1)
  }

  def main(args: Array[String]) {
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink)
   // in.via(eventChef).runWith(sink)
  }

}

我不确定我是否完全犯了错,但是我使用akka-streams的实现仍然要慢得多(现在比以前还要慢),但是我发现:如果我增加了工作量,例如我用akka-streams划分实施速度会更快。因此,如果我做对了(否则请纠正我),在我的示例中似乎有太多的开销。因此,仅当代码需要进行繁重的工作时,您才可以从Akka流中受益。

I not sure if I get something totally wrong, but still my implementation with akka-streams is much slower (now even slower as before) but what I found out is: If I increase the work for example by doing some division the implementation with akka-streams gets faster. So If I get it right (correct me otherwise) it seems there is too much overhead in my example. So you only get a benefit from akka-streams if the code has to do heavy work?

I'在Scala& amp; Akka流。我写了一个小测试项目,它创建一些事件,直到计数器达到特定数目为止。对于每个事件,将计算该事件一个字段的阶乘。我执行了两次。一次使用akka流,一次不使用akka流(单线程),并比较运行时。

I'm relatively new in both scala & akka-stream. I wrote a little test project which creates some events until a counter has reached a specific number. For each event the factorial for one field of the event is being computed. I implemented this twice. One time with akka-stream and one time without akka-stream (single threaded) and compared the runtime.

我没想到:当我创建一个事件时两个程序的运行时间几乎相同。但是,如果我创建了70,000,000个事件,则没有akka流的实现会更快。这是我的结果(以下数据基于24个测量值):

I didn't expect that: When I create a single event the runtime of both programs are nearly the same. But if I create 70,000,000 events the implementation without akka-streams is much faster. Here are my results (the following data is based on 24 measurements):


  • 没有akka流的单个事件 403(+-2)ms

  • 有akka流的单个事件 444(+ -13)ms

没有Akka流的70Mio事件 11778(+ -70)ms

所以我的问题是:怎么回事?为什么我用akka-stream的实现速度较慢?

此处是我的代码:

使用Akka实施

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()
  }

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)
    actorSystem.shutdown
    actorSystem.awaitTermination
    System.exit(-1)
  }

  def main(args: Array[String]) {
    import scala.concurrent.ExecutionContext.Implicits.global
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink).onComplete(_ => endAkka())
  }

}

没有Akka的实现

对象SingleThread {

object SingleThread {

  def main(args: Array[String]) {
    println("SingleThread started at: " + SharedFunctions.startTime)
    println("0%")
    val i = createEvent(0)
    val duration = new Duration(SharedFunctions.startTime, DateTime.now());
    println("Time: " + duration.getMillis + " || Data: " + i)
  }

  def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
    if (count == SharedFunctions.maxEventCount) count
    else {
      val e = SharedFunctions.transform((randDate, name, age, myFloat))
      SharedFunctions.transform2(e)
      val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
        DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
      createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)
    }
  }

  def createEvent(count: Int): Int = {
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)
  }
}

SharedFunctions

object SharedFunctions {
  val maxEventCount = 70000000
  val startTime = DateTime.now

  def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
  def transform2(e : Event) : Int = factorial(e.getAgeYrs)

  def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100) / totalValue)
  def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
    val cProgress = calculatePercentage(fileSize, currentSize)
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")
    cProgress
  }

  private def factorialWorker(n1: Int, n2: Int): Int = {
    if (n1 == 0) n2
    else factorialWorker(n1 -1, n2*n1)
  }
  def factorial (n : Int): Int = {
    factorialWorker(n, 1)
  }
}

实施活动

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public long timestampMS;
  @Deprecated public CharSequence name;
  @Deprecated public int ageYrs;
  @Deprecated public float sizeCm;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public Event() {}

  /**
   * All-args constructor.
   */
  public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
    this.timestampMS = timestampMS;
    this.name = name;
    this.ageYrs = ageYrs;
    this.sizeCm = sizeCm;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public Object get(int field$) {
    switch (field$) {
    case 0: return timestampMS;
    case 1: return name;
    case 2: return ageYrs;
    case 3: return sizeCm;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }
  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, Object value$) {
    switch (field$) {
    case 0: timestampMS = (Long)value$; break;
    case 1: name = (CharSequence)value$; break;
    case 2: ageYrs = (Integer)value$; break;
    case 3: sizeCm = (Float)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'timestampMS' field.
   */
  public Long getTimestampMS() {
    return timestampMS;
  }

  /**
   * Sets the value of the 'timestampMS' field.
   * @param value the value to set.
   */
  public void setTimestampMS(Long value) {
    this.timestampMS = value;
  }

  /**
   * Gets the value of the 'name' field.
   */
  public CharSequence getName() {
    return name;
  }

  /**
   * Sets the value of the 'name' field.
   * @param value the value to set.
   */
  public void setName(CharSequence value) {
    this.name = value;
  }

  /**
   * Gets the value of the 'ageYrs' field.
   */
  public Integer getAgeYrs() {
    return ageYrs;
  }

  /**
   * Sets the value of the 'ageYrs' field.
   * @param value the value to set.
   */
  public void setAgeYrs(Integer value) {
    this.ageYrs = value;
  }

  /**
   * Gets the value of the 'sizeCm' field.
   */
  public Float getSizeCm() {
    return sizeCm;
  }

  /**
   * Sets the value of the 'sizeCm' field.
   * @param value the value to set.
   */
  public void setSizeCm(Float value) {
    this.sizeCm = value;
  }

  /** Creates a new Event RecordBuilder */
  public static Event.Builder newBuilder() {
    return new Event.Builder();
  }

  /** Creates a new Event RecordBuilder by copying an existing Builder */
  public static Event.Builder newBuilder(Event.Builder other) {
    return new Event.Builder(other);
  }

  /** Creates a new Event RecordBuilder by copying an existing Event instance */
  public static Event.Builder newBuilder(Event other) {
    return new Event.Builder(other);
  }

  /**
   * RecordBuilder for Event instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
    implements org.apache.avro.data.RecordBuilder<Event> {

    private long timestampMS;
    private CharSequence name;
    private int ageYrs;
    private float sizeCm;

    /** Creates a new Builder */
    private Builder() {
      super(Event.SCHEMA$);
    }

    /** Creates a Builder by copying an existing Builder */
    private Builder(Event.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;
      }
    }

    /** Creates a Builder by copying an existing Event instance */
    private Builder(Event other) {
            super(Event.SCHEMA$);
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;
      }
    }

    /** Gets the value of the 'timestampMS' field */
    public Long getTimestampMS() {
      return timestampMS;
    }

    /** Sets the value of the 'timestampMS' field */
    public Event.Builder setTimestampMS(long value) {
      validate(fields()[0], value);
      this.timestampMS = value;
      fieldSetFlags()[0] = true;
      return this;
    }

    /** Checks whether the 'timestampMS' field has been set */
    public boolean hasTimestampMS() {
      return fieldSetFlags()[0];
    }

    /** Clears the value of the 'timestampMS' field */
    public Event.Builder clearTimestampMS() {
      fieldSetFlags()[0] = false;
      return this;
    }

    /** Gets the value of the 'name' field */
    public CharSequence getName() {
      return name;
    }

    /** Sets the value of the 'name' field */
    public Event.Builder setName(CharSequence value) {
      validate(fields()[1], value);
      this.name = value;
      fieldSetFlags()[1] = true;
      return this;
    }

    /** Checks whether the 'name' field has been set */
    public boolean hasName() {
      return fieldSetFlags()[1];
    }

    /** Clears the value of the 'name' field */
    public Event.Builder clearName() {
      name = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    /** Gets the value of the 'ageYrs' field */
    public Integer getAgeYrs() {
      return ageYrs;
    }

    /** Sets the value of the 'ageYrs' field */
    public Event.Builder setAgeYrs(int value) {
      validate(fields()[2], value);
      this.ageYrs = value;
      fieldSetFlags()[2] = true;
      return this;
    }

    /** Checks whether the 'ageYrs' field has been set */
    public boolean hasAgeYrs() {
      return fieldSetFlags()[2];
    }

    /** Clears the value of the 'ageYrs' field */
    public Event.Builder clearAgeYrs() {
      fieldSetFlags()[2] = false;
      return this;
    }

    /** Gets the value of the 'sizeCm' field */
    public Float getSizeCm() {
      return sizeCm;
    }

    /** Sets the value of the 'sizeCm' field */
    public Event.Builder setSizeCm(float value) {
      validate(fields()[3], value);
      this.sizeCm = value;
      fieldSetFlags()[3] = true;
      return this;
    }

    /** Checks whether the 'sizeCm' field has been set */
    public boolean hasSizeCm() {
      return fieldSetFlags()[3];
    }

    /** Clears the value of the 'sizeCm' field */
    public Event.Builder clearSizeCm() {
      fieldSetFlags()[3] = false;
      return this;
    }

    @Override
    public Event build() {
      try {
        Event record = new Event();
        record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
        record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
        record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
        record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }
}


推荐答案

除了我完全同意的Roland的解释之外,应该理解akka Streams不仅仅是并发编程框架。流还提供反压力,这意味着事件仅在需要在接收器中进行处理时由 Source 生成。 。这种需求沟通在每个处理步骤都会增加一些开销。

In addition to Roland's explanation, which I agree with fully, it should be understood that akka Streams are not just a concurrent programming framework. Streams also provide back pressure which means Events are only generated by the Source when there is demand to process them in the Sink. This communication of demand adds some overhead at each processing step.

因此,您的单线程和多线程比较不是 apples-to-apples。

Therefore your single-thread and multi-thread comparison is not "apples-to-apples".

如果您想要原始的多线程执行性能,那么期货/演员是更好的选择。

If you want raw multi-threaded execution performance then Futures/Actors are a better way to go.

这篇关于Akka-Stream实现比单线程实现慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

1403页,肝出来的..

09-06 22:54