Akka fluxul de Intrare ("În") ca Ieșire ("Out")

0

Problema

Am încercat să scrie o bucata de cod care are următorul text:-

  1. Citește o mare fișier csv de la distanță de sursa ca s3.
  2. Procesul de înregistrare a fișierului de înregistrare.
  3. Trimite o notificare pentru utilizator
  4. Scrie de ieșire de la o locație la distanță

Proba de înregistrare de intrare în csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Mea de intrare caz de clasă care reprezintă un record de intrare în csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Proba de înregistrare în ieșire csv (care trebuie să fie scrise):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Mea ieșire în caz de clasă care reprezintă un record de intrare în csv:

case class OutputRecord(recordId: String, name: String, designation: String)

Citind o înregistrare folosind akka flux csv (foloseste Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Acum am o funcție a procesului de înregistrări:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Funcție pentru a scrie OutputRecord ca csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Funcție pentru a trimite e-mail de notificare:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Coasere pe toate împreună

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

Pe Linia 15 si 16 sunt obtinerea o eroare, sunt fie în măsură să adăugați Linia 15 sau Linia 16, dar nu atât de atât notify & writeOutput are nevoie de outputRecord. Odată notifica este numit eu pierd outputRecord.

Există o modalitate pot adăuga atât notify și writeOutput la același grafic?

Eu nu sunt în căutarea pentru execuția în paralel ca vreau la primul apel notify și atunci numai writeOutput. Deci, acest lucru nu este de ajutor: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

În caz de utilizare pare a fi foarte simplu pentru mine, dar cum unii nu sunt în stare să găsească o soluție curată.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Cel mai bun răspuns

1

Producția de notify este un PushResultdar la intrare de writeOutput este ByteString. Odată ce ați schimba asta se va compila. În cazul în care aveți nevoie ByteString, pentru a primi la fel de OutputRecord.

BTW, în codul de probă pe care le-ați furnizat, o eroare similară există în readCSV și process.

2021-11-24 03:36:16

În alte limbi

Această pagină este în alte limbi

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................