@@ -27,7 +27,8 @@ class AtmosphereEventHandler @Inject()(
2727 testTypeService : TestTypeService ,
2828 genotypeDataRepository : GenotypeDataRepository ,
2929 populationBreakdownRepository : PopulationBreakdownRepository ,
30- haplogroupReconciliationRepository : HaplogroupReconciliationRepository
30+ haplogroupReconciliationRepository : HaplogroupReconciliationRepository ,
31+ instrumentObservationRepository : InstrumentObservationRepository
3132 )(implicit ec : ExecutionContext ) extends Logging {
3233
3334 def handle (event : FirehoseEvent ): Future [FirehoseResult ] = {
@@ -39,7 +40,7 @@ class AtmosphereEventHandler @Inject()(
3940 case e : GenotypeEvent => handleGenotype(e)
4041 case e : PopulationBreakdownEvent => handlePopulationBreakdown(e)
4142 case e : HaplogroupReconciliationEvent => handleHaplogroupReconciliation(e)
42- // Add other handlers as needed
43+ case e : InstrumentObservationEvent => handleInstrumentObservation(e)
4344 case _ =>
4445 logger.warn(s " Unhandled event type: ${event.getClass.getSimpleName} for ${event.atUri}" )
4546 Future .successful(FirehoseResult .Success (event.atUri, " " , None , " Ignored (Not Implemented)" ))
@@ -817,4 +818,84 @@ class AtmosphereEventHandler @Inject()(
817818 }
818819 }
819820
821+ // --- Instrument Observation Handling ---
822+
823+ private def handleInstrumentObservation (event : InstrumentObservationEvent ): Future [FirehoseResult ] = {
824+ event.action match {
825+ case FirehoseAction .Create => createInstrumentObservation(event)
826+ case FirehoseAction .Update => updateInstrumentObservation(event)
827+ case FirehoseAction .Delete => deleteInstrumentObservation(event)
828+ }
829+ }
830+
831+ private def createInstrumentObservation (event : InstrumentObservationEvent ): Future [FirehoseResult ] = {
832+ event.payload match {
833+ case Some (record) =>
834+ instrumentObservationRepository.findByAtUri(record.atUri).flatMap {
835+ case Some (_) =>
836+ Future .successful(FirehoseResult .Conflict (event.atUri, " Instrument observation already exists" ))
837+ case None =>
838+ val observation = InstrumentObservation (
839+ atUri = record.atUri,
840+ atCid = event.atCid,
841+ instrumentId = record.instrumentId,
842+ labName = record.labName,
843+ biosampleRef = record.biosampleRef,
844+ sequenceRunRef = record.sequenceRunRef,
845+ platform = record.platform,
846+ instrumentModel = record.instrumentModel,
847+ flowcellId = record.flowcellId,
848+ runDate = record.runDate.map(i => LocalDateTime .ofInstant(i, ZoneId .systemDefault())),
849+ confidence = record.confidence
850+ .map(ObservationConfidence .fromString)
851+ .getOrElse(ObservationConfidence .Inferred )
852+ )
853+ instrumentObservationRepository.create(observation).map { created =>
854+ logger.info(s " Created instrument observation for instrument ${record.instrumentId} at lab ${record.labName}" )
855+ FirehoseResult .Success (event.atUri, UUID .randomUUID().toString, None , " Instrument observation created" )
856+ }
857+ }
858+ case None =>
859+ Future .successful(FirehoseResult .ValidationError (event.atUri, " Missing payload for InstrumentObservationEvent" ))
860+ }
861+ }
862+
863+ private def updateInstrumentObservation (event : InstrumentObservationEvent ): Future [FirehoseResult ] = {
864+ event.payload match {
865+ case Some (record) =>
866+ instrumentObservationRepository.findByAtUri(event.atUri).flatMap {
867+ case Some (existing) =>
868+ val updated = existing.copy(
869+ atCid = event.atCid,
870+ instrumentId = record.instrumentId,
871+ labName = record.labName,
872+ biosampleRef = record.biosampleRef,
873+ sequenceRunRef = record.sequenceRunRef,
874+ platform = record.platform,
875+ instrumentModel = record.instrumentModel,
876+ flowcellId = record.flowcellId,
877+ runDate = record.runDate.map(i => LocalDateTime .ofInstant(i, ZoneId .systemDefault())),
878+ confidence = record.confidence
879+ .map(ObservationConfidence .fromString)
880+ .getOrElse(existing.confidence)
881+ )
882+ instrumentObservationRepository.update(updated).map { success =>
883+ if (success) FirehoseResult .Success (event.atUri, UUID .randomUUID().toString, None , " Instrument observation updated" )
884+ else FirehoseResult .Error (event.atUri, " Failed to update instrument observation" )
885+ }
886+ case None =>
887+ Future .successful(FirehoseResult .NotFound (event.atUri))
888+ }
889+ case None =>
890+ Future .successful(FirehoseResult .ValidationError (event.atUri, " Missing payload for InstrumentObservationEvent" ))
891+ }
892+ }
893+
894+ private def deleteInstrumentObservation (event : InstrumentObservationEvent ): Future [FirehoseResult ] = {
895+ instrumentObservationRepository.deleteByAtUri(event.atUri).map { deleted =>
896+ if (deleted) FirehoseResult .Success (event.atUri, " " , None , " Instrument observation deleted" )
897+ else FirehoseResult .NotFound (event.atUri)
898+ }
899+ }
900+
820901}
0 commit comments