Akka Flux continuu consuma websocket

0

Problema

Im ceva nou pentru Scala și Akka Flux și im încercarea de a obține JSON String mesaje de la un websocket și împingeți-le la un Kafka subiect.

De acum eu sunt doar de lucru pe "primi mesaje de la un ws" parte.

Mesajele vin de la websocket pare ca acest lucru :

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

Vreau să împărțim acest JSON mesaj la mai multe mesaje :

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

Și apoi împingeți fiecare dintre aceste mesaje de la un kafka subiect.

Aici e ceea ce am realizat până acum :

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

E lucru im obtinerea de așteptat ieșire Json mesaj, dar mă întrebam dacă aș putea să scriu acest producător într-o mai "Akka-ish" stil, cum ar fi utilizarea GraphDSL. Deci am cateva intrebari :

  • Este posibil pentru a consuma continuu un WebSocket folosind un GraphDSL ? Dacă da, puteți să-mi arătați un exemplu te rog ?
  • Este o idee bună pentru a consuma WS folosind un GraphDSL ?
  • Ar trebui să mă descompun primite Json Mesaj ca fac înainte de a trimite-l la kafka ? Sau e mai bine să-l trimită ca este de latență mai mică ?
  • După producerea mesaj pentru Kafka, eu sunt de planificare să-l consume, folosind Apache Furtuna, este o idee bună ? Sau ar trebui să rămânem cu Akka ?

Vă mulțumim pentru citirea mine, cu stima, Arès

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

Cel mai bun răspuns

1

Acest cod este o multime Akka-ish: scaladsl este doar ca Akka ca GraphDSL sau de punere în aplicare un obicei GraphStage. Singurul motiv pentru care, OMI/E, pentru a merge la GraphDSL dacă forma actuală a graficului nu este ușor exprimate în scaladsl.

Eu personal as merge pe traseul de definire a unui CoinPrice clasa pentru a face modelul explicit

case class CoinPrice(coin: String, price: BigDecimal)

Și apoi au o Flow[Message, CoinPrice, NotUsed] care interpreteaza 1 mesaj de intrare în zero sau mai multe CoinPrices. Ceva (folosind Juca JSON aici), cum ar fi:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

S-ar putea, în funcție de ceea ce dimensiune de JSONs în mesaj sunt, să rupă în diferite stream etape, pentru a permite o asincron granița dintre JSON analiza și extragerea în CoinPrices. De exemplu,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

În cele de mai sus, etapele pe fiecare parte a async limita va executa separat actori și, astfel, eventual, concomitent (dacă nu e suficient de nuclee CPU disponibile etc.), la costul de deasupra suplimentar pentru actori de a coordona și de a face schimb de mesaje. Acel plus de coordonare/comunicare aeriene (cf. Gunther Universal Scalabilitate Lege) este doar de gând să fie în valoare de ea, dacă obiecte JSON sunt suficient de mari și vine suficient de repede (în mod constant vin în fața anterioară a terminat de procesat).

Dacă intenția dumneavoastră este de a consuma websocket până când programul se oprește, s-ar putea găsi mai clară a folosi doar Source.never[Message].

2021-11-21 12:42:30

Multumesc pentru raspuns, e foarte clar, am o singură întrebare tho. Cum pot să-mi rupă de răspuns în diferite stream etape ? Poți să-mi arăți un exemplu, te rog ? Sau ma orientez la buna parte din documentația ?
Arès

În alte limbi

Această pagină este în alte limbi

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................