Im ceva nou pentru Scala și Akka Flux și im încercarea de a obține JSON String mesaje de la un websocket și împingeți-le la un Kafka subiect.
De acum eu sunt doar de lucru pe "primi mesaje de la un ws" parte.
Mesajele vin de la websocket pare ca acest lucru :
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
Vreau să împărțim acest JSON mesaj la mai multe mesaje :
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
Și apoi împingeți fiecare dintre aceste mesaje de la un kafka subiect.
Aici e ceea ce am realizat până acum :
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
E lucru im obtinerea de așteptat ieșire Json mesaj, dar mă întrebam dacă aș putea să scriu acest producător într-o mai "Akka-ish" stil, cum ar fi utilizarea GraphDSL. Deci am cateva intrebari :
- Este posibil pentru a consuma continuu un WebSocket folosind un GraphDSL ? Dacă da, puteți să-mi arătați un exemplu te rog ?
- Este o idee bună pentru a consuma WS folosind un GraphDSL ?
- Ar trebui să mă descompun primite Json Mesaj ca fac înainte de a trimite-l la kafka ? Sau e mai bine să-l trimită ca este de latență mai mică ?
- După producerea mesaj pentru Kafka, eu sunt de planificare să-l consume, folosind Apache Furtuna, este o idee bună ? Sau ar trebui să rămânem cu Akka ?
Vă mulțumim pentru citirea mine, cu stima, Arès