From 44f5589289a503a3aad74930087637a294d235e2 Mon Sep 17 00:00:00 2001 From: Marjan Kalanaki Date: Fri, 31 Jan 2025 18:43:03 +0000 Subject: [PATCH] support zip for output of transcription service this is as a result of a change in transcription service in https://github.com/guardian/transcription-service/pull/122 --- .../ExternalTranscriptionExtractor.scala | 16 ++---- .../ExternalTranscriptionWorker.scala | 53 +++++++++++++------ 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/backend/app/extraction/ExternalTranscriptionExtractor.scala b/backend/app/extraction/ExternalTranscriptionExtractor.scala index 99f753ea..134e34ca 100644 --- a/backend/app/extraction/ExternalTranscriptionExtractor.scala +++ b/backend/app/extraction/ExternalTranscriptionExtractor.scala @@ -19,8 +19,8 @@ case class SignedUrl(url: String, key: String) object SignedUrl { implicit val formats = Json.format[SignedUrl] } -case class OutputBucketUrls(text: SignedUrl, srt: SignedUrl, json: SignedUrl) -case class OutputBucketKeys(text: String, srt: String, json: String) +case class OutputBucketUrls(zip: SignedUrl) +case class OutputBucketKeys(zip: String) case class TranscriptionJob(id: String, originalFilename: String, inputSignedUrl: String, sentTimestamp: String, userEmail: String, transcriptDestinationService: String, outputBucketUrls: OutputBucketUrls, languageCode: String, translate: Boolean, translationOutputBucketUrls: OutputBucketUrls, @@ -118,19 +118,13 @@ class ExternalTranscriptionExtractor(index: Index, transcribeConfig: TranscribeC override def priority = 2 private def getOutputBucketUrls(blobUri: String): Either[Failure, OutputBucketUrls] = { - val srtKey = s"srt/$blobUri.srt" - val jsonKey = s"json/$blobUri.json" - val textKey = s"text/$blobUri.txt" + val zipKey = s"zip/$blobUri.zip" val bucketUrls = for { - srt <- outputStorage.getUploadSignedUrl(srtKey) - json <- outputStorage.getUploadSignedUrl(jsonKey) - text <- outputStorage.getUploadSignedUrl(textKey) + zip <- outputStorage.getUploadSignedUrl(zipKey) } yield { OutputBucketUrls( - SignedUrl(text, textKey), - SignedUrl(srt, srtKey), - SignedUrl(json, jsonKey) + SignedUrl(zip, zipKey), ) } diff --git a/backend/app/extraction/ExternalTranscriptionWorker.scala b/backend/app/extraction/ExternalTranscriptionWorker.scala index c6245472..75dfedd8 100644 --- a/backend/app/extraction/ExternalTranscriptionWorker.scala +++ b/backend/app/extraction/ExternalTranscriptionWorker.scala @@ -11,10 +11,12 @@ import services.{ObjectStorage, TranscribeConfig} import utils.Logging import utils.attempt.{DocumentUpdateFailure, ExternalTranscriptionOutputFailure, Failure, JsonParseFailure} +import java.io.{BufferedReader, InputStream, InputStreamReader} import java.nio.charset.StandardCharsets +import java.util.zip.ZipInputStream import scala.concurrent.ExecutionContext import scala.jdk.CollectionConverters.CollectionHasAsScala -import scala.util.Try +import scala.util.{Success, Try, Failure => ScalaFailure} case class TranscriptionMessageAttribute(receiveCount: Int, messageGroupId: String) case class TranscriptionTexts(transcript: String, translation: Option[String]) @@ -83,20 +85,41 @@ class ExternalTranscriptionWorker(manifest: WorkerManifest, amazonSQSClient: Ama } private def getTranscriptionTexts(transcriptionOutput: TranscriptionOutputSuccess): Either[Failure, TranscriptionTexts] = { - val transcript = blobStorage.get(transcriptionOutput.outputBucketKeys.text) - - transcript.flatMap { transcriptStream => - val transcriptText = new String(transcriptStream.readAllBytes(), StandardCharsets.UTF_8) - - transcriptionOutput.translationOutputBucketKeys match { - case Some(keys) => - val translation = blobStorage.get(keys.text) - translation.map { translationStream => - val text = new String(translationStream.readAllBytes(), StandardCharsets.UTF_8) - TranscriptionTexts(transcriptText, Some(text)) - } - case None => Right(TranscriptionTexts(transcriptText, None)) - } + val transcript = blobStorage.get(transcriptionOutput.outputBucketKeys.zip).flatMap { transcriptStream => + extractFileFromZip(transcriptStream, "transcript.txt") + } + + + val translation = transcriptionOutput.translationOutputBucketKeys match { + case Some(keys) => + blobStorage.get(keys.zip).flatMap { translationStream => + extractFileFromZip(translationStream, "transcript.txt") + }.map(Some(_)) // Wrap in Some to indicate translation exists + case None => Right(None) // No translation zip, return None + } + + for { + transcriptText <- transcript + translationText <- translation + } yield TranscriptionTexts(transcriptText, translationText) + } + + private def extractFileFromZip(zipStream: InputStream, fileName: String): Either[Failure, String] = { + val zipInputStream = new ZipInputStream(zipStream, StandardCharsets.UTF_8) + + Try { + Iterator.continually(zipInputStream.getNextEntry) + .takeWhile(_ != null) // Process entries until we hit null + .find(entry => !entry.isDirectory && entry.getName == fileName) + .map { _ => + val reader = new BufferedReader(new InputStreamReader(zipInputStream, StandardCharsets.UTF_8)) + val content = reader.lines().toArray.mkString("\n") // Read all lines into a string + zipInputStream.closeEntry() + content + }.toRight(ExternalTranscriptionOutputFailure(s"File '$fileName' not found in the ZIP archive")) + } match { + case Success(result) => result + case ScalaFailure(exception) => Left(ExternalTranscriptionOutputFailure(s"Failed to extract '$fileName': ${exception.getMessage}")) } }