Am încercat să scrie o bucata de cod care are următorul text:-
- Citește o mare fișier csv de la distanță de sursa ca s3.
- Procesul de înregistrare a fișierului de înregistrare.
- Trimite o notificare pentru utilizator
- Scrie de ieșire de la o locație la distanță
Proba de înregistrare de intrare în csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Mea de intrare caz de clasă care reprezintă un record de intrare în csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Proba de înregistrare în ieșire csv (care trebuie să fie scrise):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Mea ieșire în caz de clasă care reprezintă un record de intrare în csv:
case class OutputRecord(recordId: String, name: String, designation: String)
Citind o înregistrare folosind akka flux csv (foloseste Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Acum am o funcție a procesului de înregistrări:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Funcție pentru a scrie OutputRecord ca csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Funcție pentru a trimite e-mail de notificare:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Coasere pe toate împreună
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
Pe Linia 15 si 16 sunt obtinerea o eroare, sunt fie în măsură să adăugați Linia 15 sau Linia 16, dar nu atât de atât notify
& writeOutput
are nevoie de outputRecord
. Odată notifica este numit eu pierd outputRecord
.
Există o modalitate pot adăuga atât notify
și writeOutput
la același grafic?
Eu nu sunt în căutarea pentru execuția în paralel ca vreau la primul apel notify
și atunci numai writeOutput
. Deci, acest lucru nu este de ajutor: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
În caz de utilizare pare a fi foarte simplu pentru mine, dar cum unii nu sunt în stare să găsească o soluție curată.