从2015-10-30更新

根据Roland Kuhn Awnser:



我做了一些改变。我的代码现在看起来像:

object MultiThread {
  implicit val actorSystem = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  var counter = 0
  var oldProgess = 0

  //RunnableFlow: in -> flow -> sink
  val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))

  val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))

  val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)

  val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)

  val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder =>
    import FlowGraph.Implicits._

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4))
    val mergeEvents = builder.add(Merge[Int](4))

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0)
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1)
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2)
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3)

    (dispatchTuple.in, mergeEvents.out)
  }

  val sink = Sink.foreach[Int]{
    v => counter += 1
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
    if(counter == SharedFunctions.maxEventCount) endAkka()
  }

  def endAkka() = {
    val duration = new Duration(SharedFunctions.startTime, DateTime.now)
    println("Time: " + duration.getMillis + " || Data: " + counter)
    actorSystem.shutdown
    actorSystem.awaitTermination
    System.exit(-1)
  }

  def main(args: Array[String]) {
    println("MultiThread started: " + SharedFunctions.startTime)
    in.via(flow).runWith(sink)
   // in.via(eventChef).runWith(sink)
  }

}

我不确定我是否完全犯了错,但是我使用akka-streams的实现仍然慢得多(现在比以前还要慢),但是我发现:如果我增加工作量,例如通过对akka进行划分来增加工作量-流变得更快。因此,如果我做对了(否则请纠正我),在我的示例中似乎有太多的开销。因此,仅当代码必须进行繁重的工作时,您才可以从Akka流中获得 yield 吗?



我在scala和akka-stream中都相对较新。我写了一个小测试项目,该项目创建一些事件,直到计数器达到特定数目为止。对于每个事件,将计算该事件一个字段的阶乘。我执行了两次。一次使用akka-stream,一次不使用akka-stream(单线程),并比较运行时。

我没想到:当我创建一个事件时,两个程序的运行时间几乎相同。但是,如果我创建了70,000,000个事件,则没有akka流的实现会更快。这是我的结果(以下数据基于24个测量值):

  • 没有akka流的单个事件: 403(+-2)ms
  • 具有akka流的单个事件: 444(+ -13)ms

  • 不带流的70Mio事件: 11778(+ -70)ms
  • 具有akka Steam 的70Mio事件: 75424(+-2959)ms


  • 所以我的问题是:怎么回事?为什么我使用akka-stream的实现速度较慢?

    这是我的代码:

    使用Akka实现
    object MultiThread {
      implicit val actorSystem = ActorSystem("Sys")
      implicit val materializer = ActorMaterializer()
    
      var counter = 0
      var oldProgess = 0
    
      //RunnableFlow: in -> flow -> sink
      val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
    
      val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
    
      val sink = Sink.foreach[Int]{
        v => counter += 1
        oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter,
        DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
        if(counter == SharedFunctions.maxEventCount) endAkka()
      }
    
      def endAkka() = {
        val duration = new Duration(SharedFunctions.startTime, DateTime.now)
        println("Time: " + duration.getMillis + " || Data: " + counter)
        actorSystem.shutdown
        actorSystem.awaitTermination
        System.exit(-1)
      }
    
      def main(args: Array[String]) {
        import scala.concurrent.ExecutionContext.Implicits.global
        println("MultiThread started: " + SharedFunctions.startTime)
        in.via(flow).runWith(sink).onComplete(_ => endAkka())
      }
    
    }
    

    没有Akka的实现

    对象SingleThread {
      def main(args: Array[String]) {
        println("SingleThread started at: " + SharedFunctions.startTime)
        println("0%")
        val i = createEvent(0)
        val duration = new Duration(SharedFunctions.startTime, DateTime.now());
        println("Time: " + duration.getMillis + " || Data: " + i)
      }
    
      def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = {
        if (count == SharedFunctions.maxEventCount) count
        else {
          val e = SharedFunctions.transform((randDate, name, age, myFloat))
          SharedFunctions.transform2(e)
          val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count,
            DateTime.now.getMillis - SharedFunctions.startTime.getMillis)
          createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f)
        }
      }
    
      def createEvent(count: Int): Int = {
        createEventWorker(0, count, 1254785478l, "name", 48, 23.09f)
      }
    }
    

    共享函数
    object SharedFunctions {
      val maxEventCount = 70000000
      val startTime = DateTime.now
    
      def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4)
      def transform2(e : Event) : Int = factorial(e.getAgeYrs)
    
      def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100) / totalValue)
      def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = {
        val cProgress = calculatePercentage(fileSize, currentSize)
        if (oldProgress != cProgress) println(s"$oldProgress% | $t ms")
        cProgress
      }
    
      private def factorialWorker(n1: Int, n2: Int): Int = {
        if (n1 == 0) n2
        else factorialWorker(n1 -1, n2*n1)
      }
      def factorial (n : Int): Int = {
        factorialWorker(n, 1)
      }
    }
    

    实现事件
    /**
     * Autogenerated by Avro
     *
     * DO NOT EDIT DIRECTLY
     */
    
    @SuppressWarnings("all")
    @org.apache.avro.specific.AvroGenerated
    public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
      public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}");
      public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
      @Deprecated public long timestampMS;
      @Deprecated public CharSequence name;
      @Deprecated public int ageYrs;
      @Deprecated public float sizeCm;
    
      /**
       * Default constructor.  Note that this does not initialize fields
       * to their default values from the schema.  If that is desired then
       * one should use <code>newBuilder()</code>.
       */
      public Event() {}
    
      /**
       * All-args constructor.
       */
      public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) {
        this.timestampMS = timestampMS;
        this.name = name;
        this.ageYrs = ageYrs;
        this.sizeCm = sizeCm;
      }
    
      public org.apache.avro.Schema getSchema() { return SCHEMA$; }
      // Used by DatumWriter.  Applications should not call.
      public Object get(int field$) {
        switch (field$) {
        case 0: return timestampMS;
        case 1: return name;
        case 2: return ageYrs;
        case 3: return sizeCm;
        default: throw new org.apache.avro.AvroRuntimeException("Bad index");
        }
      }
      // Used by DatumReader.  Applications should not call.
      @SuppressWarnings(value="unchecked")
      public void put(int field$, Object value$) {
        switch (field$) {
        case 0: timestampMS = (Long)value$; break;
        case 1: name = (CharSequence)value$; break;
        case 2: ageYrs = (Integer)value$; break;
        case 3: sizeCm = (Float)value$; break;
        default: throw new org.apache.avro.AvroRuntimeException("Bad index");
        }
      }
    
      /**
       * Gets the value of the 'timestampMS' field.
       */
      public Long getTimestampMS() {
        return timestampMS;
      }
    
      /**
       * Sets the value of the 'timestampMS' field.
       * @param value the value to set.
       */
      public void setTimestampMS(Long value) {
        this.timestampMS = value;
      }
    
      /**
       * Gets the value of the 'name' field.
       */
      public CharSequence getName() {
        return name;
      }
    
      /**
       * Sets the value of the 'name' field.
       * @param value the value to set.
       */
      public void setName(CharSequence value) {
        this.name = value;
      }
    
      /**
       * Gets the value of the 'ageYrs' field.
       */
      public Integer getAgeYrs() {
        return ageYrs;
      }
    
      /**
       * Sets the value of the 'ageYrs' field.
       * @param value the value to set.
       */
      public void setAgeYrs(Integer value) {
        this.ageYrs = value;
      }
    
      /**
       * Gets the value of the 'sizeCm' field.
       */
      public Float getSizeCm() {
        return sizeCm;
      }
    
      /**
       * Sets the value of the 'sizeCm' field.
       * @param value the value to set.
       */
      public void setSizeCm(Float value) {
        this.sizeCm = value;
      }
    
      /** Creates a new Event RecordBuilder */
      public static Event.Builder newBuilder() {
        return new Event.Builder();
      }
    
      /** Creates a new Event RecordBuilder by copying an existing Builder */
      public static Event.Builder newBuilder(Event.Builder other) {
        return new Event.Builder(other);
      }
    
      /** Creates a new Event RecordBuilder by copying an existing Event instance */
      public static Event.Builder newBuilder(Event other) {
        return new Event.Builder(other);
      }
    
      /**
       * RecordBuilder for Event instances.
       */
      public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
        implements org.apache.avro.data.RecordBuilder<Event> {
    
        private long timestampMS;
        private CharSequence name;
        private int ageYrs;
        private float sizeCm;
    
        /** Creates a new Builder */
        private Builder() {
          super(Event.SCHEMA$);
        }
    
        /** Creates a Builder by copying an existing Builder */
        private Builder(Event.Builder other) {
          super(other);
          if (isValidValue(fields()[0], other.timestampMS)) {
            this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
            fieldSetFlags()[0] = true;
          }
          if (isValidValue(fields()[1], other.name)) {
            this.name = data().deepCopy(fields()[1].schema(), other.name);
            fieldSetFlags()[1] = true;
          }
          if (isValidValue(fields()[2], other.ageYrs)) {
            this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
            fieldSetFlags()[2] = true;
          }
          if (isValidValue(fields()[3], other.sizeCm)) {
            this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
            fieldSetFlags()[3] = true;
          }
        }
    
        /** Creates a Builder by copying an existing Event instance */
        private Builder(Event other) {
                super(Event.SCHEMA$);
          if (isValidValue(fields()[0], other.timestampMS)) {
            this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS);
            fieldSetFlags()[0] = true;
          }
          if (isValidValue(fields()[1], other.name)) {
            this.name = data().deepCopy(fields()[1].schema(), other.name);
            fieldSetFlags()[1] = true;
          }
          if (isValidValue(fields()[2], other.ageYrs)) {
            this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs);
            fieldSetFlags()[2] = true;
          }
          if (isValidValue(fields()[3], other.sizeCm)) {
            this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm);
            fieldSetFlags()[3] = true;
          }
        }
    
        /** Gets the value of the 'timestampMS' field */
        public Long getTimestampMS() {
          return timestampMS;
        }
    
        /** Sets the value of the 'timestampMS' field */
        public Event.Builder setTimestampMS(long value) {
          validate(fields()[0], value);
          this.timestampMS = value;
          fieldSetFlags()[0] = true;
          return this;
        }
    
        /** Checks whether the 'timestampMS' field has been set */
        public boolean hasTimestampMS() {
          return fieldSetFlags()[0];
        }
    
        /** Clears the value of the 'timestampMS' field */
        public Event.Builder clearTimestampMS() {
          fieldSetFlags()[0] = false;
          return this;
        }
    
        /** Gets the value of the 'name' field */
        public CharSequence getName() {
          return name;
        }
    
        /** Sets the value of the 'name' field */
        public Event.Builder setName(CharSequence value) {
          validate(fields()[1], value);
          this.name = value;
          fieldSetFlags()[1] = true;
          return this;
        }
    
        /** Checks whether the 'name' field has been set */
        public boolean hasName() {
          return fieldSetFlags()[1];
        }
    
        /** Clears the value of the 'name' field */
        public Event.Builder clearName() {
          name = null;
          fieldSetFlags()[1] = false;
          return this;
        }
    
        /** Gets the value of the 'ageYrs' field */
        public Integer getAgeYrs() {
          return ageYrs;
        }
    
        /** Sets the value of the 'ageYrs' field */
        public Event.Builder setAgeYrs(int value) {
          validate(fields()[2], value);
          this.ageYrs = value;
          fieldSetFlags()[2] = true;
          return this;
        }
    
        /** Checks whether the 'ageYrs' field has been set */
        public boolean hasAgeYrs() {
          return fieldSetFlags()[2];
        }
    
        /** Clears the value of the 'ageYrs' field */
        public Event.Builder clearAgeYrs() {
          fieldSetFlags()[2] = false;
          return this;
        }
    
        /** Gets the value of the 'sizeCm' field */
        public Float getSizeCm() {
          return sizeCm;
        }
    
        /** Sets the value of the 'sizeCm' field */
        public Event.Builder setSizeCm(float value) {
          validate(fields()[3], value);
          this.sizeCm = value;
          fieldSetFlags()[3] = true;
          return this;
        }
    
        /** Checks whether the 'sizeCm' field has been set */
        public boolean hasSizeCm() {
          return fieldSetFlags()[3];
        }
    
        /** Clears the value of the 'sizeCm' field */
        public Event.Builder clearSizeCm() {
          fieldSetFlags()[3] = false;
          return this;
        }
    
        @Override
        public Event build() {
          try {
            Event record = new Event();
            record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]);
            record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]);
            record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]);
            record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]);
            return record;
          } catch (Exception e) {
            throw new org.apache.avro.AvroRuntimeException(e);
          }
        }
      }
    }
    

    最佳答案

    除了我完全同意的Roland的解释之外,应该理解akka Streams不仅仅是并发编程框架。流还提供背压,这意味着仅当需要在Source中处理事件时,事件才由Sink生成。这种需求沟通在每个处理步骤都会增加一些开销。

    因此,您的单线程和多线程比较不是“苹果对苹果”。

    如果您想要原始的多线程执行性能,那么使用Future/Actor是更好的选择。

    09-05 19:40
    查看更多