Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/guardian/grid into mk-chequ…
Browse files Browse the repository at this point in the history
…erboard-fixes
  • Loading branch information
paperboyo committed Mar 1, 2021
2 parents f419f8f + 37ad1bf commit bf704bb
Show file tree
Hide file tree
Showing 32 changed files with 455 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class BatchIndexLambdaHandler {
threshold = sys.env.get("LATENCY_THRESHOLD").map(t => Integer.parseInt(t)),
maxSize = sys.env("MAX_SIZE").toInt,
startState = IndexInputCreation.get(sys.env("START_STATE").toInt),
checkerStartState = IndexInputCreation.get(sys.env("CHECKER_START_STATE").toInt)
checkerStartState = IndexInputCreation.get(sys.env("CHECKER_START_STATE").toInt),
domainRoot = sys.env("DOMAIN_ROOT"),
)

private val batchIndex = new BatchIndexHandler(cfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ import com.amazonaws.services.dynamodbv2.document.utils.ValueMap
import com.gu.mediaservice.indexing.IndexInputCreation._
import com.gu.mediaservice.indexing.ProduceProgress
import com.gu.mediaservice.lib.aws.UpdateMessage
import com.gu.mediaservice.lib.config.{ServiceHosts, Services}
import com.gu.mediaservice.model.Image
import com.typesafe.scalalogging.LazyLogging
import net.logstash.logback.marker.Markers
import play.api.libs.json.{JsObject, Json}

import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

import scala.collection.JavaConverters._

case class BatchIndexHandlerConfig(
Expand All @@ -35,7 +34,8 @@ case class BatchIndexHandlerConfig(
threshold: Option[Integer],
startState: ProduceProgress,
checkerStartState: ProduceProgress,
maxSize: Int
maxSize: Int,
domainRoot: String
)

case class SuccessResult(foundImagesCount: Int, notFoundImagesCount: Int, progressHistory: String, projectionTookInSec: Long)
Expand All @@ -54,9 +54,10 @@ class BatchIndexHandler(cfg: BatchIndexHandlerConfig) extends LoggingWithMarkers
private val GetIdsTimeout = new FiniteDuration(20, TimeUnit.SECONDS)
private val GlobalTimeout = new FiniteDuration(MainProcessingTimeoutInSec, TimeUnit.SECONDS)
private val ImagesProjectionTimeout = new FiniteDuration(ProjectionTimeoutInSec, TimeUnit.MINUTES)
private val gridClient = GridClient(maxIdleConnections, debugHttpResponse = false)
val services = new Services(domainRoot, ServiceHosts.guardianPrefixes, Set.empty)
private val gridClient = GridClient(apiKey, services, maxIdleConnections, debugHttpResponse = false)

private val ImagesBatchProjector = new ImagesBatchProjection(apiKey, projectionEndpoint, ImagesProjectionTimeout, gridClient, maxSize)
private val ImagesBatchProjector = new ImagesBatchProjection(apiKey, ImagesProjectionTimeout, gridClient, maxSize)
private val InputIdsStore = new InputIdsStore(AwsHelpers.buildDynamoTableClient(dynamoTableName), batchSize)

import ImagesBatchProjector._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import java.net.URL

import com.gu.mediaservice.lib.config.{ServiceHosts, Services}
import com.gu.mediaservice.model._
import com.gu.mediaservice.model.leases.LeasesByMedia
import com.gu.mediaservice.model.usage.Usage
import com.typesafe.scalalogging.LazyLogging
import org.joda.time.DateTime
import play.api.libs.json._
Expand All @@ -14,19 +12,19 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object ImageDataMergerConfig {
private val gridClient = GridClient(maxIdleConnections = 5)

def apply(apiKey: String, domainRoot: String, imageLoaderEndpointOpt: Option[String]): ImageDataMergerConfig = {
val services = new Services(domainRoot, ServiceHosts.guardianPrefixes, Set.empty)
val imageLoaderEndpoint = imageLoaderEndpointOpt match {
case Some(uri) => uri
case None => services.loaderBaseUri
}
new ImageDataMergerConfig(apiKey, services, gridClient, imageLoaderEndpoint)
new ImageDataMergerConfig(apiKey, services, imageLoaderEndpoint)
}
}

case class ImageDataMergerConfig(apiKey: String, services: Services, gridClient: GridClient, imageLoaderEndpoint: String) {
case class ImageDataMergerConfig(apiKey: String, services: Services, imageLoaderEndpoint: String) {
val gridClient = GridClient(apiKey, services, maxIdleConnections = 5)
def isValidApiKey(): Boolean = {
// Make an API key authenticated request to the leases API as a way of validating the API key.
// A 200 indicates a valid key.
Expand Down Expand Up @@ -138,7 +136,6 @@ class ImageDataMerger(config: ImageDataMergerConfig) extends LazyLogging {
import config._
import services._


def getMergedImageData(mediaId: String)(implicit ec: ExecutionContext): FullImageProjectionResult = {
try {
val maybeImageFuture = getMergedImageDataInternal(mediaId)
Expand All @@ -151,7 +148,7 @@ class ImageDataMerger(config: ImageDataMergerConfig) extends LazyLogging {
}

private def getMergedImageDataInternal(mediaId: String)(implicit ec: ExecutionContext): Future[Option[Image]] = {
val maybeImage: Option[Image] = getImageLoaderProjection(mediaId)
val maybeImage: Option[Image] = gridClient.getImageLoaderProjection(mediaId, imageLoaderEndpoint)
maybeImage match {
case Some(img) =>
aggregate(img).map { aggImg =>
Expand All @@ -166,10 +163,10 @@ class ImageDataMerger(config: ImageDataMergerConfig) extends LazyLogging {
val mediaId = image.id
for {
collections <- getCollectionsResponse(mediaId)
edits <- getEdits(mediaId)
leases <- getLeases(mediaId)
usages <- getUsages(mediaId)
crops <- getCrops(mediaId)
edits <- gridClient.getEdits(mediaId)
leases <- gridClient.getLeases(mediaId)
usages <- gridClient.getUsages(mediaId)
crops <- gridClient.getCrops(mediaId)
} yield image.copy(
collections = collections,
userMetadata = edits,
Expand All @@ -179,15 +176,6 @@ class ImageDataMerger(config: ImageDataMergerConfig) extends LazyLogging {
)
}

private def getImageLoaderProjection(mediaId: String): Option[Image] = {
logger.info("attempt to get image projection from image-loader")
val url = new URL(s"$imageLoaderEndpoint/images/project/$mediaId")
val res = gridClient.makeGetRequestSync(url, apiKey)
validateResponse(res, url)
logger.info(s"got image projection from image-loader for $mediaId with status code $res.statusCode")
if (res.statusCode == 200) Some(res.body.as[Image]) else None
}

private def getCollectionsResponse(mediaId: String)(implicit ec: ExecutionContext): Future[List[Collection]] = {
logger.info("attempt to get collections")
val url = new URL(s"$collectionsBaseUri/images/$mediaId")
Expand All @@ -197,49 +185,6 @@ class ImageDataMerger(config: ImageDataMergerConfig) extends LazyLogging {
}
}

private def getEdits(mediaId: String)(implicit ec: ExecutionContext): Future[Option[Edits]] = {
logger.info("attempt to get edits")
val url = new URL(s"$metadataBaseUri/edits/$mediaId")
gridClient.makeGetRequestAsync(url, apiKey).map { res =>
validateResponse(res, url)
if (res.statusCode == 200) Some((res.body \ "data").as[Edits]) else None
}
}

private def getCrops(mediaId: String)(implicit ec: ExecutionContext): Future[List[Crop]] = {
logger.info("attempt to get crops")
val url = new URL(s"$cropperBaseUri/crops/$mediaId")
gridClient.makeGetRequestAsync(url, apiKey).map { res =>
validateResponse(res, url)
if (res.statusCode == 200) (res.body \ "data").as[List[Crop]] else Nil
}
}

private def getLeases(mediaId: String)(implicit ec: ExecutionContext): Future[LeasesByMedia] = {
logger.info("attempt to get leases")
val url = new URL(s"$leasesBaseUri/leases/media/$mediaId")
gridClient.makeGetRequestAsync(url, apiKey).map { res =>
validateResponse(res, url)
if (res.statusCode == 200) (res.body \ "data").as[LeasesByMedia] else LeasesByMedia.empty
}
}

private def getUsages(mediaId: String)(implicit ec: ExecutionContext): Future[List[Usage]] = {
logger.info("attempt to get usages")

def unpackUsagesFromEntityResponse(resBody: JsValue): List[JsValue] = {
(resBody \ "data").as[JsArray].value
.map(entity => (entity.as[JsObject] \ "data").as[JsValue]).toList
}

val url = new URL(s"$usageBaseUri/usages/media/$mediaId")
gridClient.makeGetRequestAsync(url, apiKey).map { res =>
validateResponse(res, url)
if (res.statusCode == 200) unpackUsagesFromEntityResponse(res.body).map(_.as[Usage])
else Nil
}
}

private def validateResponse(res: ResponseWrapper, url: URL): Unit = {
import res._
if (statusCode != 200 && statusCode != 404) {
Expand All @@ -260,6 +205,3 @@ class ImageDataMerger(config: ImageDataMergerConfig) extends LazyLogging {
}
}

class DownstreamApiInBadStateException(message: String, downstreamMessage: String) extends IllegalStateException(message) {
def getDownstreamMessage = downstreamMessage
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.gu.mediaservice.model.Image
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

class ImagesBatchProjection(apiKey: String, domainRoot: String, timeout: Duration, gridClient: GridClient, maxSize: Int) {
class ImagesBatchProjection(apiKey: String, timeout: Duration, gridClient: GridClient, maxSize: Int) {

private implicit val ThrottledExecutionContext = ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(5))

Expand Down
42 changes: 26 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ val commonSettings = Seq(
publishArtifact in (Compile, packageDoc) := false
)

//Common projects to all organizations
lazy val commonProjects: Seq[sbt.ProjectReference] = Seq(commonLib, restLib, auth, collections, cropper, imageLoader, leases, thrall, kahuna, metadataEditor, usage, mediaApi, adminToolsLambda, adminToolsScripts, adminToolsDev)

lazy val root = project("grid", path = Some("."))
.aggregate(commonLib, auth, collections, cropper, imageLoader, leases, thrall, kahuna, metadataEditor, usage, mediaApi, adminToolsLambda, adminToolsScripts, adminToolsDev)
.aggregate((maybeBBCLib.toList ++ commonProjects):_*)
.enablePlugins(RiffRaffArtifact)
.settings(
riffRaffManifestProjectName := s"media-service::grid::all",
Expand Down Expand Up @@ -67,20 +70,16 @@ val okHttpVersion = "3.12.1"

val bbcBuildProcess: Boolean = System.getenv().asScala.get("BUILD_ORG").contains("bbc")

val bbcCommonLibSettings: SettingsDefinition = if (bbcBuildProcess) {
Seq(
libraryDependencies ++= Seq("com.gu" %% "pan-domain-auth-play_2-6" % "0.9.2-SNAPSHOT")
)
} else {
Seq(
libraryDependencies ++= Seq("com.gu" %% "pan-domain-auth-play_2-6" % "0.8.2")
)
}
//BBC specific project, it only gets compiled when bbcBuildProcess is true
lazy val bbcProject = project("bbc").dependsOn(restLib)

val maybeBBCLib: Option[sbt.ProjectReference] = if(bbcBuildProcess) Some(bbcProject) else None

lazy val commonLib = project("common-lib").settings(
libraryDependencies ++= Seq(
// also exists in plugins.sbt, TODO deduplicate this
"com.gu" %% "editorial-permissions-client" % "2.0",
"com.gu" %% "pan-domain-auth-play_2-6" % "0.8.2",
"com.amazonaws" % "aws-java-sdk-iam" % awsSdkVersion,
"com.amazonaws" % "aws-java-sdk-s3" % awsSdkVersion,
"com.amazonaws" % "aws-java-sdk-ec2" % awsSdkVersion,
Expand Down Expand Up @@ -110,10 +109,11 @@ lazy val commonLib = project("common-lib").settings(
"org.codehaus.janino" % "janino" % "3.0.6",
"com.typesafe.play" %% "play-json-joda" % "2.6.9",
"com.gu" %% "scanamo" % "1.0.0-M8",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.9.10.7"
"com.fasterxml.jackson.core" % "jackson-databind" % "2.9.10.7",
"com.squareup.okhttp3" % "okhttp" % okHttpVersion
),
dependencyOverrides += "org.apache.thrift" % "libthrift" % "0.9.1"
).settings(bbcCommonLibSettings)
)

lazy val restLib = project("rest-lib").settings(
libraryDependencies ++= Seq(
Expand All @@ -134,7 +134,6 @@ lazy val cropper = playProject("cropper", 9006)

lazy val imageLoader = playProject("image-loader", 9003).settings {
libraryDependencies ++= Seq(
"com.squareup.okhttp3" % "okhttp" % okHttpVersion,
"org.apache.tika" % "tika-core" % "1.20",
"com.drewnoakes" % "metadata-extractor" % "2.15.0"
)
Expand Down Expand Up @@ -180,7 +179,6 @@ lazy val adminToolsLib = project("admin-tools-lib", Some("admin-tools/lib"))
ExclusionRule("com.gu", "kinesis-logback-appender")
),
libraryDependencies ++= Seq(
"com.squareup.okhttp3" % "okhttp" % okHttpVersion,
"com.typesafe.play" %% "play-json" % "2.6.9",
"com.typesafe.play" %% "play-json-joda" % "2.6.9",
"com.typesafe.play" %% "play-functional" % "2.6.9",
Expand Down Expand Up @@ -247,6 +245,15 @@ lazy val usage = playProject("usage", 9009).settings(

lazy val scripts = project("scripts")
.dependsOn(commonLib)
.enablePlugins(JavaAppPackaging, UniversalPlugin)
.settings(
libraryDependencies ++= Seq(
// V2 of the AWS SDK as it's easier to use for scripts and won't leak to the rest of the project from here
"software.amazon.awssdk" % "s3" % "2.15.81",
// bump jcommander explicitly as AWS SDK is pulling in a vulnerable version
"com.beust" % "jcommander" % "1.75"
)
)

lazy val migration = project("migration")
.dependsOn(commonLib).
Expand Down Expand Up @@ -281,8 +288,8 @@ val buildInfo = Seq(
)
)

def playProject(projectName: String, port: Int, path: Option[String] = None): Project =
project(projectName, path)
def playProject(projectName: String, port: Int, path: Option[String] = None): Project = {
val commonProject = project(projectName, path)
.enablePlugins(PlayScala, JDebPackaging, SystemdPlugin, BuildInfoPlugin)
.dependsOn(restLib)
.settings(commonSettings ++ buildInfo ++ Seq(
Expand All @@ -309,3 +316,6 @@ def playProject(projectName: String, port: Int, path: Option[String] = None): Pr
"-J-XX:GCLogFileSize=2M"
)
))
//Add the BBC library dependency if defined
maybeBBCLib.fold(commonProject){commonProject.dependsOn(_)}
}
Loading

0 comments on commit bf704bb

Please sign in to comment.