Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Field cap fix (#422)
Browse files Browse the repository at this point in the history
  • Loading branch information
thalurur authored Mar 30, 2021
1 parent f1e59ad commit a669d4c
Show file tree
Hide file tree
Showing 7 changed files with 474 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportInterceptor
import org.elasticsearch.watcher.ResourceWatcherService
import java.util.function.Supplier
import org.elasticsearch.common.component.Lifecycle
import org.elasticsearch.common.component.LifecycleComponent
import org.elasticsearch.common.component.LifecycleListener
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.transport.RemoteClusterService
import org.elasticsearch.transport.TransportService

internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, Plugin() {

Expand All @@ -129,6 +135,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
lateinit var clusterService: ClusterService
lateinit var indexNameExpressionResolver: IndexNameExpressionResolver
lateinit var rollupInterceptor: RollupInterceptor
lateinit var fieldCapsFilter: FieldCapsFilter

companion object {
const val PLUGIN_NAME = "opendistro-im"
Expand All @@ -148,6 +155,10 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act

override fun getJobRunner(): ScheduledJobRunner = IndexManagementRunner

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent?>> {
return mutableListOf<Class<out LifecycleComponent?>>(GuiceHolder::class.java)
}

override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
Expand Down Expand Up @@ -237,6 +248,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
.registerConsumers()
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
Expand Down Expand Up @@ -293,7 +305,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS,
RollupSettings.ROLLUP_INDEX,
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_DASHBOARDS
)
}

Expand Down Expand Up @@ -326,6 +339,28 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
}

override fun getActionFilters(): List<ActionFilter> {
return listOf(FieldCapsFilter(clusterService, indexNameExpressionResolver))
return listOf(fieldCapsFilter)
}
}

class GuiceHolder @Inject constructor(
remoteClusterService: TransportService
) : LifecycleComponent {
override fun close() {}
override fun lifecycleState(): Lifecycle.State? {
return null
}

override fun addLifecycleListener(listener: LifecycleListener) {}
override fun removeLifecycleListener(listener: LifecycleListener) {}
override fun start() {}
override fun stop() {}

companion object {
lateinit var remoteClusterService: RemoteClusterService
}

init {
Companion.remoteClusterService = remoteClusterService.remoteClusterService
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup.actionfilter

import com.amazon.opendistroforelasticsearch.indexmanagement.GuiceHolder
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.Rollup
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.RollupFieldMapping
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.settings.RollupSettings
Expand All @@ -33,57 +34,85 @@ import org.elasticsearch.action.support.ActionFilterChain
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Strings
import org.elasticsearch.common.xcontent.DeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.common.xcontent.json.JsonXContent
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.RemoteClusterAware

private val logger = LogManager.getLogger(FieldCapsFilter::class.java)

@Suppress("UNCHECKED_CAST", "SpreadOperator", "TooManyFunctions")
@Suppress("UNCHECKED_CAST", "SpreadOperator", "TooManyFunctions", "ComplexMethod", "NestedBlockDepth")
class FieldCapsFilter(
val clusterService: ClusterService,
val indexNameExpressionResolver: IndexNameExpressionResolver
val settings: Settings,
private val indexNameExpressionResolver: IndexNameExpressionResolver
) : ActionFilter {

@Volatile private var shouldIntercept = RollupSettings.ROLLUP_DASHBOARDS.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) {
flag -> shouldIntercept = flag
}
}

override fun <Request : ActionRequest?, Response : ActionResponse?> apply(
task: Task,
action: String,
request: Request,
listener: ActionListener<Response>,
chain: ActionFilterChain<Request, Response>
) {
if (request is FieldCapabilitiesRequest) {
if (request is FieldCapabilitiesRequest && shouldIntercept) {
val indices = request.indices().map { it.toString() }.toTypedArray()
val concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
val rollupIndices = mutableSetOf<String>()
val nonRollupIndices = mutableSetOf<String>()
for (index in concreteIndices) {
val isRollupIndex = RollupSettings.ROLLUP_INDEX.get(clusterService.state().metadata.index(index).settings)
if (isRollupIndex) {
rollupIndices.add(index)
} else {
nonRollupIndices.add(index)
val remoteClusterIndices = GuiceHolder.remoteClusterService.groupIndices(request.indicesOptions(), indices) {
idx: String? -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterService.state())
}
val localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)

localIndices?.let {
val concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), it)
for (index in concreteIndices) {
val isRollupIndex = RollupSettings.ROLLUP_INDEX.get(clusterService.state().metadata.index(index).settings)
if (isRollupIndex) {
rollupIndices.add(index)
} else {
nonRollupIndices.add(index)
}
}
}

remoteClusterIndices.entries.forEach {
val cluster = it.key
val clusterIndices = it.value
clusterIndices.indices().forEach { index ->
nonRollupIndices.add("$cluster${RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR}$index")
}
}
logger.debug("Resolved into rollup $rollupIndices and non rollup $nonRollupIndices indices")

if (rollupIndices.isEmpty()) {
return chain.proceed(task, action, request, listener)
}

if (nonRollupIndices.isEmpty()) {
val rewrittenResponse = rewriteResponse(mapOf(), arrayOf(), rollupIndices)
return listener.onResponse(rewrittenResponse as Response)
/**
* The request can be one of two cases:
* 1 Just rollup indices
* 2 Rollup + NonRollup indices
* If 1 we forward the request to chain and discard the whole response from chain when rewriting.
* If 2 we forward the request to chain with only non rollup indices and append rollup data to response when rewriting.
* We are calling with rollup indices in 1 instead of an empty request since empty is defaulted to returning all indices in cluster.
**/
if (nonRollupIndices.isNotEmpty()) {
request.indices(*nonRollupIndices.toTypedArray())
}

request.indices(*nonRollupIndices.toTypedArray())
chain.proceed(task, action, request, object : ActionListener<Response> {
override fun onResponse(response: Response) {
logger.info("Has rollup indices will rewrite field caps response")
response as FieldCapabilitiesResponse
val rewrittenResponse = rewriteResponse(response.get(), response.indices, rollupIndices)
val rewrittenResponse = rewriteResponse(response, rollupIndices, nonRollupIndices.isEmpty())
listener.onResponse(rewrittenResponse as Response)
}

Expand All @@ -96,30 +125,63 @@ class FieldCapsFilter(
}
}

/**
* The FieldCapabilitiesResponse can contain merged or unmerged data. The response will hold unmerged data if its a cross cluster search.
*
* There is a boolean available in the FieldCapabilitiesRequest `isMergeResults` which indicates if the response is merged/unmerged.
* Unfortunately this is package private and when rewriting we can't access it from request. Instead will be relying on the response.
* If response has indexResponses then its unmerged else merged.
*/
internal fun rewriteResponse(response: FieldCapabilitiesResponse, rollupIndices: Set<String>, shouldDiscardResponse: Boolean): ActionResponse {
val ismFieldCapabilitiesResponse = ISMFieldCapabilitiesResponse.fromFieldCapabilitiesResponse(response)
val isMergedResponse = ismFieldCapabilitiesResponse.indexResponses.isEmpty()

// if original response contained only rollup indices we should discard it
val fields = if (shouldDiscardResponse) mapOf() else response.get()
val indices = if (shouldDiscardResponse) arrayOf() else response.indices
val indexResponses = if (shouldDiscardResponse) listOf() else ismFieldCapabilitiesResponse.indexResponses

return if (isMergedResponse) {
rewriteResponse(indices, fields, rollupIndices)
} else {
val rollupIndexResponses = populateRollupIndexResponses(rollupIndices)
val mergedIndexResponses = indexResponses + rollupIndexResponses

val rewrittenISMResponse = ISMFieldCapabilitiesResponse(arrayOf(), mapOf(), mergedIndexResponses)
rewrittenISMResponse.toFieldCapabilitiesResponse()
}
}

private fun populateRollupIndexResponses(rollupIndices: Set<String>): List<ISMFieldCapabilitiesIndexResponse> {
val indexResponses = mutableListOf<ISMFieldCapabilitiesIndexResponse>()
rollupIndices.forEach { rollupIndex ->
val rollupIsmFieldCapabilities = mutableMapOf<String, ISMIndexFieldCapabilities>()
val rollupFieldMappings = populateSourceFieldMappingsForRollupIndex(rollupIndex)

rollupFieldMappings.forEach { rollupFieldMapping ->
val fieldName = rollupFieldMapping.fieldName
val type = rollupFieldMapping.sourceType!!
val isSearchable = rollupFieldMapping.fieldType == RollupFieldMapping.Companion.FieldType.DIMENSION
rollupIsmFieldCapabilities[fieldName] = ISMIndexFieldCapabilities(fieldName, type, isSearchable, true, mapOf())
}

indexResponses.add(ISMFieldCapabilitiesIndexResponse(rollupIndex, rollupIsmFieldCapabilities, true))
}

return indexResponses
}

private fun rewriteResponse(
fields: Map<String, Map<String, FieldCapabilities>>,
indices: Array<String>,
fields: Map<String, Map<String, FieldCapabilities>>,
rollupIndices: Set<String>
): ActionResponse {
val filteredIndicesFields = expandIndicesInFields(indices, fields)
val rollupIndicesFields = populateRollupIndicesFields(rollupIndices)
val mergedFields = mergeFields(filteredIndicesFields, rollupIndicesFields)
val mergedIndices = indices + rollupIndices.toTypedArray()

return buildFieldCapsResponse(mergedIndices, mergedFields)
}

private fun buildFieldCapsResponse(indices: Array<String>, fields: Map<String, Map<String, FieldCapabilities>>): ActionResponse {
val builder = XContentFactory.jsonBuilder().prettyPrint()
builder.startObject()
builder.field("indices", indices)
builder.field("fields", fields as Map<String, Any>?)
builder.endObject()

val parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler
.THROW_UNSUPPORTED_OPERATION, Strings.toString(builder))

return FieldCapabilitiesResponse.fromXContent(parser)
return FieldCapabilitiesResponse(mergedIndices, mergedFields)
}

private fun populateRollupIndicesFields(rollupIndices: Set<String>): Map<String, Map<String, FieldCapabilities>> {
Expand All @@ -134,7 +196,7 @@ class FieldCapsFilter(
}
val isSearchable = fieldMapping.fieldType == RollupFieldMapping.Companion.FieldType.DIMENSION
response[fieldName]!![type] = FieldCapabilities(fieldName, type, isSearchable, true, fieldMappingIndexMap.getValue(fieldMapping)
.toTypedArray(), null, null, mapOf<String, Set<String>>())
.toTypedArray(), null, null, mapOf<String, Set<String>>())
}

return response
Expand All @@ -156,14 +218,11 @@ class FieldCapsFilter(
return rollupFieldMappings
}

private fun populateSourceFieldMappingsForRollupIndex(rollupIndex: String): Map<String, Set<RollupFieldMapping>> {
val fieldMappings = mutableMapOf<String, MutableSet<RollupFieldMapping>>()
private fun populateSourceFieldMappingsForRollupIndex(rollupIndex: String): Set<RollupFieldMapping> {
val fieldMappings = mutableSetOf<RollupFieldMapping>()
val rollupJobs = clusterService.state().metadata.index(rollupIndex).getRollupJobs() ?: return fieldMappings
rollupJobs.forEach { rollup ->
if (fieldMappings[rollup.targetIndex] == null) {
fieldMappings[rollup.targetIndex] = mutableSetOf()
}
fieldMappings[rollup.targetIndex]!!.addAll(populateSourceFieldMappingsForRollupJob(rollup))
fieldMappings.addAll(populateSourceFieldMappingsForRollupJob(rollup))
}
return fieldMappings
}
Expand All @@ -174,13 +233,11 @@ class FieldCapsFilter(

rollupIndices.forEach { rollupIndex ->
val fieldMappings = populateSourceFieldMappingsForRollupIndex(rollupIndex)
fieldMappings.forEach { rollupIndexFieldMappings ->
rollupIndexFieldMappings.value.forEach { fieldMapping ->
if (fieldMappingsMap[fieldMapping] == null) {
fieldMappingsMap[fieldMapping] = mutableSetOf()
}
fieldMappingsMap[fieldMapping]!!.add(rollupIndexFieldMappings.key)
fieldMappings.forEach { fieldMapping ->
if (fieldMappingsMap[fieldMapping] == null) {
fieldMappingsMap[fieldMapping] = mutableSetOf()
}
fieldMappingsMap[fieldMapping]!!.add(rollupIndex)
}
}

Expand All @@ -205,7 +262,7 @@ class FieldCapsFilter(
val fieldCaps = fields.getValue(field).getValue(type)
val rewrittenIndices = if (fieldCaps.indices() != null && fieldCaps.indices().isNotEmpty()) fieldCaps.indices() else indices
expandedResponse[field]!![type] = FieldCapabilities(fieldCaps.name, fieldCaps.type, fieldCaps.isSearchable, fieldCaps
.isAggregatable, rewrittenIndices, fieldCaps.nonSearchableIndices(), fieldCaps.nonAggregatableIndices(), fieldCaps.meta())
.isAggregatable, rewrittenIndices, fieldCaps.nonSearchableIndices(), fieldCaps.nonAggregatableIndices(), fieldCaps.meta())
}
}

Expand Down Expand Up @@ -257,12 +314,12 @@ class FieldCapsFilter(
val nonAggregatableIndices = mergeNonAggregatableIndices(fc1, fc2)
val nonSearchableIndices = mergeNonSearchableIndices(fc1, fc2)
val meta = (fc1.meta().keys + fc2.meta().keys)
.associateWith {
val data = mutableSetOf<String>()
data.addAll(fc1.meta().getOrDefault(it, mutableSetOf()))
data.addAll(fc2.meta().getOrDefault(it, mutableSetOf()))
data
}
.associateWith {
val data = mutableSetOf<String>()
data.addAll(fc1.meta().getOrDefault(it, mutableSetOf()))
data.addAll(fc2.meta().getOrDefault(it, mutableSetOf()))
data
}

return FieldCapabilities(name, type, isSearchable, isAggregatable, indices, nonSearchableIndices, nonAggregatableIndices, meta)
}
Expand Down
Loading

0 comments on commit a669d4c

Please sign in to comment.