本文介绍了Akka-Stream 实现比单线程实现慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时删除!!

2015 年 10 月 30 日更新

基于 Roland Kuhn Awnser:

based on Roland Kuhn Awnser:

Akka Streams 使用 Actors 之间的异步消息传递来实现流处理阶段.跨数据传递异步边界有一个你在这里看到的开销:你的计算似乎只需要大约 160ns(来自单线程测量),而流解决方案需要每个元素大约 1µs,主要由消息传递决定.

另一个误解是说流"意味着并行:在您的代码所有计算都在单个 Actor(地图阶段),因此无法预期比原始版本有任何好处单线程解决方案.

Another misconception is that saying "stream" implies parallelism: in your code all computation runs sequentially in a single Actor (the map stage), so no benefit can be expected over the primitive single-threaded solution.

为了从 Akka Streams 提供的并行性中受益,您需要有多个处理阶段,每个阶段执行

In order to benefit from the parallelism afforded by Akka Streams you need to have multiple processing stages that each perform tasks of

每个元素 1µs,另见文档.

我做了一些改变.我的代码现在看起来像:

I did some changes. My code now looks like:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
    import FlowGraph.Implicits._

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
    val mergeEvents = builder.add(Merge[Int](4))

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)

    (dispatchTuple.in, mergeEvents.out)
  }

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()
  }

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)
    actorSystem.shutdown
    actorSystem.awaitTermination
    System.exit(-1)
  }

  def main(args: Array[String]) {
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink)
   // in.via(eventChef).runWith(sink)
  }

}

我不确定我是否完全错误,但我使用 akka-streams 的实现仍然要慢得多(现在甚至比以前更慢)但我发现的是:如果我通过做一些除法来增加工作,例如使用 akka-streams 的实现变得更快.所以如果我做对了(否则纠正我),我的例子中似乎有太多的开销.因此,如果代码必须做繁重的工作,您只能从 akka-streams 中受益吗?

I not sure if I get something totally wrong, but still my implementation with akka-streams is much slower (now even slower as before) but what I found out is: If I increase the work for example by doing some division the implementation with akka-streams gets faster. So If I get it right (correct me otherwise) it seems there is too much overhead in my example. So you only get a benefit from akka-streams if the code has to do heavy work?

我在 Scala 和阿卡流.我写了一个小测试项目,它会创建一些事件,直到计数器达到特定数字.对于每个事件,正在计算事件一个字段的阶乘.我实施了两次.一次使用 akka-stream,一次不使用 akka-stream(单线程)并比较运行时间.

I'm relatively new in both scala & akka-stream. I wrote a little test project which creates some events until a counter has reached a specific number. For each event the factorial for one field of the event is being computed. I implemented this twice. One time with akka-stream and one time without akka-stream (single threaded) and compared the runtime.

我没想到:当我创建一个事件时,两个程序的运行时间几乎相同.但是,如果我创建 70,000,000 个事件,则没有 akka-streams 的实现要快得多.这是我的结果(以下数据基于 24 次测量):

I didn't expect that: When I create a single event the runtime of both programs are nearly the same. But if I create 70,000,000 events the implementation without akka-streams is much faster. Here are my results (the following data is based on 24 measurements):

  • 没有 akka-streams 的单个事件:403 (+- 2)ms
  • 使用 akka-streams 的单个事件:444 (+-13)ms

70Mio 事件没有 akka-streams:11778 (+-70)ms

所以我的问题是:发生了什么?为什么我使用 akka-stream 的实现速度较慢?

这里是我的代码:

用 Akka 实现

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()
  }

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)
    actorSystem.shutdown
    actorSystem.awaitTermination
    System.exit(-1)
  }

  def main(args: Array[String]) {
    import scala.concurrent.ExecutionContext.Implicits.global
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink).onComplete(_ => endAkka())
  }

}

没有 Akka 的实现

对象单线程{

  def main(args: Array[String]) {
    println("SingleThread started at: " + SharedFunctions.startTime)
    println("0%")
    val i = createEvent(0)
    val duration = new Duration(SharedFunctions.startTime, DateTime.now());
    println("Time: " + duration.getMillis + " || Data: " + i)
  }

  def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
    if (count == SharedFunctions.maxEventCount) count
    else {
      val e = SharedFunctions.transform((randDate, name, age, myFloat))
      SharedFunctions.transform2(e)
      val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
        DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
      createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)
    }
  }

  def createEvent(count: Int): Int = {
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)
  }
}

共享函数

object SharedFunctions {
  val maxEventCount = 70000000
  val startTime = DateTime.now

  def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
  def transform2(e : Event) : Int = factorial(e.getAgeYrs)

  def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100) / totalValue)
  def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
    val cProgress = calculatePercentage(fileSize, currentSize)
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")
    cProgress
  }

  private def factorialWorker(n1: Int, n2: Int): Int = {
    if (n1 == 0) n2
    else factorialWorker(n1 -1, n2*n1)
  }
  def factorial (n : Int): Int = {
    factorialWorker(n, 1)
  }
}

实施活动

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{"type":"record","name":"Event","namespace":"week2P2","fields":[{"name":"timestampMS","type":"long"},{"name":"name","type":"string"},{"name":"ageYrs","type":"int"},{"name":"sizeCm","type":"float"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public long timestampMS;
  @Deprecated public CharSequence name;
  @Deprecated public int ageYrs;
  @Deprecated public float sizeCm;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>.
   */
  public Event() {}

  /**
   * All-args constructor.
   */
  public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
    this.timestampMS = timestampMS;
    this.name = name;
    this.ageYrs = ageYrs;
    this.sizeCm = sizeCm;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call.
  public Object get(int field$) {
    switch (field$) {
    case 0: return timestampMS;
    case 1: return name;
    case 2: return ageYrs;
    case 3: return sizeCm;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }
  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, Object value$) {
    switch (field$) {
    case 0: timestampMS = (Long)value$; break;
    case 1: name = (CharSequence)value$; break;
    case 2: ageYrs = (Integer)value$; break;
    case 3: sizeCm = (Float)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'timestampMS' field.
   */
  public Long getTimestampMS() {
    return timestampMS;
  }

  /**
   * Sets the value of the 'timestampMS' field.
   * @param value the value to set.
   */
  public void setTimestampMS(Long value) {
    this.timestampMS = value;
  }

  /**
   * Gets the value of the 'name' field.
   */
  public CharSequence getName() {
    return name;
  }

  /**
   * Sets the value of the 'name' field.
   * @param value the value to set.
   */
  public void setName(CharSequence value) {
    this.name = value;
  }

  /**
   * Gets the value of the 'ageYrs' field.
   */
  public Integer getAgeYrs() {
    return ageYrs;
  }

  /**
   * Sets the value of the 'ageYrs' field.
   * @param value the value to set.
   */
  public void setAgeYrs(Integer value) {
    this.ageYrs = value;
  }

  /**
   * Gets the value of the 'sizeCm' field.
   */
  public Float getSizeCm() {
    return sizeCm;
  }

  /**
   * Sets the value of the 'sizeCm' field.
   * @param value the value to set.
   */
  public void setSizeCm(Float value) {
    this.sizeCm = value;
  }

  /** Creates a new Event RecordBuilder */
  public static Event.Builder newBuilder() {
    return new Event.Builder();
  }

  /** Creates a new Event RecordBuilder by copying an existing Builder */
  public static Event.Builder newBuilder(Event.Builder other) {
    return new Event.Builder(other);
  }

  /** Creates a new Event RecordBuilder by copying an existing Event instance */
  public static Event.Builder newBuilder(Event other) {
    return new Event.Builder(other);
  }

  /**
   * RecordBuilder for Event instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
    implements org.apache.avro.data.RecordBuilder<Event> {

    private long timestampMS;
    private CharSequence name;
    private int ageYrs;
    private float sizeCm;

    /** Creates a new Builder */
    private Builder() {
      super(Event.SCHEMA$);
    }

    /** Creates a Builder by copying an existing Builder */
    private Builder(Event.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;
      }
    }

    /** Creates a Builder by copying an existing Event instance */
    private Builder(Event other) {
            super(Event.SCHEMA$);
      if (isValidValue(fields()[0], other.timestampMS)) {
        this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.name)) {
        this.name = data().deepCopy(fields()[1].schema(), other.name);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.ageYrs)) {
        this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
        fieldSetFlags()[2] = true;
      }
      if (isValidValue(fields()[3], other.sizeCm)) {
        this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
        fieldSetFlags()[3] = true;
      }
    }

    /** Gets the value of the 'timestampMS' field */
    public Long getTimestampMS() {
      return timestampMS;
    }

    /** Sets the value of the 'timestampMS' field */
    public Event.Builder setTimestampMS(long value) {
      validate(fields()[0], value);
      this.timestampMS = value;
      fieldSetFlags()[0] = true;
      return this;
    }

    /** Checks whether the 'timestampMS' field has been set */
    public boolean hasTimestampMS() {
      return fieldSetFlags()[0];
    }

    /** Clears the value of the 'timestampMS' field */
    public Event.Builder clearTimestampMS() {
      fieldSetFlags()[0] = false;
      return this;
    }

    /** Gets the value of the 'name' field */
    public CharSequence getName() {
      return name;
    }

    /** Sets the value of the 'name' field */
    public Event.Builder setName(CharSequence value) {
      validate(fields()[1], value);
      this.name = value;
      fieldSetFlags()[1] = true;
      return this;
    }

    /** Checks whether the 'name' field has been set */
    public boolean hasName() {
      return fieldSetFlags()[1];
    }

    /** Clears the value of the 'name' field */
    public Event.Builder clearName() {
      name = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    /** Gets the value of the 'ageYrs' field */
    public Integer getAgeYrs() {
      return ageYrs;
    }

    /** Sets the value of the 'ageYrs' field */
    public Event.Builder setAgeYrs(int value) {
      validate(fields()[2], value);
      this.ageYrs = value;
      fieldSetFlags()[2] = true;
      return this;
    }

    /** Checks whether the 'ageYrs' field has been set */
    public boolean hasAgeYrs() {
      return fieldSetFlags()[2];
    }

    /** Clears the value of the 'ageYrs' field */
    public Event.Builder clearAgeYrs() {
      fieldSetFlags()[2] = false;
      return this;
    }

    /** Gets the value of the 'sizeCm' field */
    public Float getSizeCm() {
      return sizeCm;
    }

    /** Sets the value of the 'sizeCm' field */
    public Event.Builder setSizeCm(float value) {
      validate(fields()[3], value);
      this.sizeCm = value;
      fieldSetFlags()[3] = true;
      return this;
    }

    /** Checks whether the 'sizeCm' field has been set */
    public boolean hasSizeCm() {
      return fieldSetFlags()[3];
    }

    /** Clears the value of the 'sizeCm' field */
    public Event.Builder clearSizeCm() {
      fieldSetFlags()[3] = false;
      return this;
    }

    @Override
    public Event build() {
      try {
        Event record = new Event();
        record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
        record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
        record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
        record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }
}

推荐答案

除了 Roland 的解释,我完全同意,应该理解 akka Streams 不仅仅是一个并发编程框架.流还提供背压,这意味着事件仅在 Source 需要在 Sink 中处理它们时才生成.这种需求交流会在每个处理步骤中增加一些开销.

In addition to Roland's explanation, which I agree with fully, it should be understood that akka Streams are not just a concurrent programming framework. Streams also provide back pressure which means Events are only generated by the Source when there is demand to process them in the Sink. This communication of demand adds some overhead at each processing step.

因此您的单线程和多线程比较不是苹果对苹果".

Therefore your single-thread and multi-thread comparison is not "apples-to-apples".

如果您想要原始的多线程执行性能,那么 Futures/Actors 是更好的选择.

If you want raw multi-threaded execution performance then Futures/Actors are a better way to go.

这篇关于Akka-Stream 实现比单线程实现慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

1403页,肝出来的..

09-06 22:54