import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}

object ScalaParallell extends App {

  implicit val system = ActorSystem("QuickStart")

  def longRunningComputation(x: Int): Int = {
    println("Computing ", x)
    Thread.sleep(10000)
    x
  }

  val processor: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      // prepare graph elements
      val balance = b.add(Balance[Int](2))
      val merge = b.add(Merge[Int](2))
      val f = Flow[Int].map(longRunningComputation)


      // connect the graph
      balance.out(0) ~> f.async ~> merge.in(0)
      balance.out(1) ~> f.async ~> merge.in(1)

      // expose ports
      FlowShape(balance.in, merge.out)
    })


  // Wire it all up.

  val source: Source[Int, NotUsed] = Source(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

  source.via(processor).runForeach(println)


  Thread.sleep(5000)
}