Skip to content

Commit

Permalink
Allow keywords to be specified at point of file creation that can be …
Browse files Browse the repository at this point in the history
…used

to search for files.

This is useful because currently users need to encode this type of
information in the filename.
  • Loading branch information
shinyhappydan committed Jan 9, 2024
1 parent fe86350 commit b5d41bb
Show file tree
Hide file tree
Showing 41 changed files with 429 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"prefLabel": "http://www.w3.org/2004/02/skos/core#prefLabel",
"name": "http://schema.org/name",
"label": "http://www.w3.org/2000/01/rdf-schema#label",
"description": "http://schema.org/description"
"description": "http://schema.org/description",
"keywords": "https://schema.org/keywords",
"@vocab": "https://bluebrain.github.io/nexus/vocabulary/"
},
"@id": "https://bluebrain.github.io/nexus/contexts/elasticsearch-indexing.json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@
}
}
},
"keywords": {
"type": "object",
"dynamic": true
},
"_storage": {
"properties": {
"_rev": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.circe.syntax._
/**
* Enumeration type for all possible bulk operations
*/
sealed trait ElasticSearchBulk extends Product with Serializable {
sealed trait ElasticSearchAction extends Product with Serializable {

/**
* @return
Expand All @@ -30,22 +30,22 @@ sealed trait ElasticSearchBulk extends Product with Serializable {
Json.obj("_index" -> index.value.asJson, "_id" -> id.asJson)
}

object ElasticSearchBulk {
object ElasticSearchAction {

private val newLine = System.lineSeparator()

final case class Index(index: IndexLabel, id: String, content: Json) extends ElasticSearchBulk {
final case class Index(index: IndexLabel, id: String, content: Json) extends ElasticSearchAction {
def payload: String = Json.obj("index" -> json).noSpaces + newLine + content.noSpaces
}
final case class Create(index: IndexLabel, id: String, content: Json) extends ElasticSearchBulk {
final case class Create(index: IndexLabel, id: String, content: Json) extends ElasticSearchAction {
def payload: String = Json.obj("create" -> json).noSpaces + newLine + content.noSpaces
}
final case class Update(index: IndexLabel, id: String, content: Json, retry: Int = 0) extends ElasticSearchBulk {
final case class Update(index: IndexLabel, id: String, content: Json, retry: Int = 0) extends ElasticSearchAction {
val modified = if (retry > 0) json deepMerge Json.obj("retry_on_conflict" -> retry.asJson) else json

def payload: String = Json.obj("update" -> modified).noSpaces + newLine + content.asJson.noSpaces
}
final case class Delete(index: IndexLabel, id: String) extends ElasticSearchBulk {
final case class Delete(index: IndexLabel, id: String) extends ElasticSearchAction {
def payload: String = Json.obj("delete" -> json).noSpaces + newLine
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength:
* @param refresh
* the value for the `refresh` Elasticsearch parameter
*/
def bulk(ops: Seq[ElasticSearchBulk], refresh: Refresh = Refresh.False): IO[BulkResponse] = {
def bulk(ops: Seq[ElasticSearchAction], refresh: Refresh = Refresh.False): IO[BulkResponse] = {
if (ops.isEmpty) IO.pure(BulkResponse.Success)
else {
val bulkEndpoint = (endpoint / bulkPath).withQuery(Query(refreshParam -> refresh.value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ final case class QueryBuilder private[client] (private val query: JsonObject) {
range(nxv.createdAt.prefix, params.createdAt) ++
params.updatedBy.map(term(nxv.updatedBy.prefix, _)) ++
range(nxv.updatedAt.prefix, params.updatedAt) ++
params.tag.map(term(nxv.tags.prefix, _)),
params.tag.map(term(nxv.tags.prefix, _)) ++
params.fileUserMetadata.toList.flatMap(_.keywords).map { case (key, value) =>
term(s"keywords.$key", value)
},
mustNotTerms = typesTerms(params.typeOperator.negate, excludeTypes),
withScore = params.q.isDefined
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.syntax.kamonSyntax
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchAction.{Delete, Index}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.BulkResponse.{MixedOutcomes, Success}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.{BulkResponse, Refresh}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchBulk, ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchAction, ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem
Expand Down Expand Up @@ -49,20 +50,20 @@ final class ElasticSearchSink private (
KamonMetricComponent(ElasticSearchViews.entityType.value)

override def apply(elements: Chunk[Elem[Json]]): IO[Chunk[Elem[Unit]]] = {
val bulk = elements.foldLeft(Vector.empty[ElasticSearchBulk]) {
case (acc, successElem @ Elem.SuccessElem(_, _, _, _, _, json, _)) =>
val actions = elements.foldLeft(Vector.empty[ElasticSearchAction]) {
case (actions, successElem @ Elem.SuccessElem(_, _, _, _, _, json, _)) =>
if (json.isEmpty()) {
acc :+ ElasticSearchBulk.Delete(index, documentId(successElem))
actions :+ Delete(index, documentId(successElem))
} else
acc :+ ElasticSearchBulk.Index(index, documentId(successElem), json)
case (acc, droppedElem: Elem.DroppedElem) =>
acc :+ ElasticSearchBulk.Delete(index, documentId(droppedElem))
case (acc, _: Elem.FailedElem) => acc
actions :+ Index(index, documentId(successElem), json)
case (actions, droppedElem: Elem.DroppedElem) =>
actions :+ Delete(index, documentId(droppedElem))
case (actions, _: Elem.FailedElem) => actions
}

if (bulk.nonEmpty) {
if (actions.nonEmpty) {
client
.bulk(bulk, refresh)
.bulk(actions, refresh)
.map(ElasticSearchSink.markElems(_, elements, documentId))
} else {
IO.pure(elements.map(_.void))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model

import akka.http.scaladsl.unmarshalling.{FromStringUnmarshaller, Unmarshaller}
import ch.epfl.bluebrain.nexus.delta.kernel.search.TimeRange
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ResourcesSearchParams.{Type, TypeOperator}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ResourcesSearchParams.{FileUserMetadata, Type, TypeOperator}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.QueryParamsUnmarshalling.{iriFromStringUnmarshaller, iriVocabFromStringUnmarshaller => iriUnmarshaller}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectContext
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import io.circe.{parser, Decoder}
import io.circe.generic.semiauto.deriveDecoder

import scala.util.{Failure, Success}

/**
* Search parameters for any generic resource type.
Expand Down Expand Up @@ -49,6 +53,7 @@ final case class ResourcesSearchParams(
updatedAt: TimeRange = TimeRange.Anytime,
types: List[Type] = List.empty,
typeOperator: TypeOperator = TypeOperator.Or,
fileUserMetadata: Option[FileUserMetadata] = None,
schema: Option[ResourceRef] = None,
q: Option[String] = None,
tag: Option[UserTag] = None
Expand All @@ -62,6 +67,20 @@ final case class ResourcesSearchParams(

object ResourcesSearchParams {

case class FileUserMetadata(keywords: Map[String, String])

object FileUserMetadata {

implicit val decoder: Decoder[FileUserMetadata] = deriveDecoder[FileUserMetadata]
implicit val fromStringUnmarshaller: FromStringUnmarshaller[FileUserMetadata] =
Unmarshaller.strict[String, FileUserMetadata] { str =>
parser.parse(str).flatMap(_.as[FileUserMetadata]).toTry match {
case Failure(exception) => throw exception
case Success(value) => value
}
}
}

sealed trait TypeOperator extends Product with Serializable {

/** Turn `And` into `Or` and vice-versa */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.http.scaladsl.server.{Directive, Directive0, Directive1, MalformedQu
import akka.http.scaladsl.unmarshalling.{FromStringUnmarshaller, Unmarshaller}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ResourcesSearchParams
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ResourcesSearchParams.TypeOperator.Or
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ResourcesSearchParams.{Type, TypeOperator}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ResourcesSearchParams.{FileUserMetadata, Type, TypeOperator}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{DeltaSchemeDirectives, UriDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.QueryParamsUnmarshalling.IriBase
Expand All @@ -29,6 +29,9 @@ trait ElasticSearchViewsDirectives extends UriDirectives {
private def types(implicit um: FromStringUnmarshaller[Type]): Directive1[List[Type]] =
parameter("type".as[Type].*).map(_.toList.reverse)

private def userFileMetadata: Directive1[Option[FileUserMetadata]] =
parameter("metadata".as[FileUserMetadata].?)

private def typeOperator(implicit um: FromStringUnmarshaller[TypeOperator]): Directive1[TypeOperator] = {
parameter("typeOperator".as[TypeOperator].?[TypeOperator](Or))
}
Expand Down Expand Up @@ -68,7 +71,9 @@ trait ElasticSearchViewsDirectives extends UriDirectives {
baseUri: BaseUri,
pc: ProjectContext
): Directive1[ResourcesSearchParams] = {
(searchParams & createdAt & updatedAt & types & typeOperator & schema & id & locate & parameter("q".?) & tagParam)
(searchParams & createdAt & updatedAt & types & typeOperator & userFileMetadata & schema & id & locate & parameter(
"q".?
) & tagParam)
.tmap {
case (
deprecated,
Expand All @@ -79,6 +84,7 @@ trait ElasticSearchViewsDirectives extends UriDirectives {
updatedAt,
types,
typeOperator,
fileUserMetadata,
schema,
id,
locate,
Expand All @@ -97,6 +103,7 @@ trait ElasticSearchViewsDirectives extends UriDirectives {
updatedAt,
types,
typeOperator,
fileUserMetadata,
schema,
qq,
tag
Expand All @@ -111,7 +118,9 @@ trait ElasticSearchViewsDirectives extends UriDirectives {
implicit val baseIriUm: FromStringUnmarshaller[IriBase] =
DeltaSchemeDirectives.iriBaseFromStringUnmarshallerNoExpansion

(searchParams & createdAt & updatedAt & types & typeOperator & schema & id & locate & parameter("q".?) & tagParam)
(searchParams & createdAt & updatedAt & types & typeOperator & userFileMetadata & schema & id & locate & parameter(
"q".?
) & tagParam)
.tmap {
case (
deprecated,
Expand All @@ -122,6 +131,7 @@ trait ElasticSearchViewsDirectives extends UriDirectives {
updatedAt,
types,
typeOperator,
keywords,
schema,
id,
locate,
Expand All @@ -140,6 +150,7 @@ trait ElasticSearchViewsDirectives extends UriDirectives {
updatedAt,
types,
typeOperator,
keywords,
schema,
qq,
tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViewsQuerySuite.Sample
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchBulk
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchAction
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{DifferentElasticSearchViewType, ProjectContextRejection, ViewIsDeprecated, ViewNotFound}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.{AggregateElasticSearchViewValue, IndexingElasticSearchViewValue}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, permissions, ElasticSearchViewRejection, ElasticSearchViewType}
Expand Down Expand Up @@ -268,7 +268,7 @@ class ElasticSearchViewsQuerySuite
bulk <- allResources.traverse { r =>
r.asDocument(ref).map { d =>
// We create a unique id across all indices
ElasticSearchBulk.Index(view.index, genString(), d)
ElasticSearchAction.Index(view.index, genString(), d)
}
}
_ <- client.bulk(bulk)
Expand Down
Loading

0 comments on commit b5d41bb

Please sign in to comment.