Concatinating două Fluxuri în Akka stream

0

Problema

Am încercat să concat două Fluxuri și eu nu sunt în măsură să explice de ieșire a mea în aplicare.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

Mă aștept următoarea ieșire din acest cod.

2
3
4
.
.
.
11
10
20
.
.
.
100

În schimb, văd doar "2" a fi tipărite. Puteți să vă rugăm să explicați ce este greșit în implmentation și cum ar trebui să se schimbe programul pentru a obține rezultatul dorit.

akka akka-stream scala
2021-10-21 17:29:00
2

Cel mai bun răspuns

3

De la Akka Stream API docs:

Concat:

Emite atunci când fluxul de curent are un element disponibile; dacă curentul de intrare completeaza, se încearcă următoarea

Broadcast:

Emite atunci când toate ieșirile se oprește backpressuring și există un element de intrare disponibile

Cei doi operatori nu va lucra în colaborare cât nu există un conflict în modul în care funcționează ... Concat incearca sa scoti toate elementele dintr-una din Broadcastmodelului ieșiri înainte de a trece la celălalt, întrucât Broadcast nu emit decât dacă există o cerere pentru TOATE rezultatele.

Pentru ce ai nevoie, ai putea înlănțui folosind concat cum a sugerat de către comentatori:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

sau echivalent, utilizare Source.combine ca mai jos:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

Folosind GraphDSL, care este o versiune simplificată a punerii în aplicare de Sursă.combina:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

În alte limbi

Această pagină este în alte limbi

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................