Skip to content

Commit

Permalink
[ML] Report cause of data frame analytics inference failure (elastic#…
Browse files Browse the repository at this point in the history
…66203)

In case we encounter an error that fails inference during a
data frame analytics job, we should try to unwrap the cause
of the error and report that as it will much more useful.
  • Loading branch information
dimitris-athanasiou authored Dec 12, 2020
1 parent 7e6b52a commit 945ba2d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -70,8 +72,9 @@ static <T> T inflate(String compressedString,

if (streamSizeCause != null) {
// The root cause is that the model is too big.
throw new IOException("Cannot parse model definition as the content is larger than the maximum stream size of ["
+ streamSizeCause.getMaxBytes() + "] bytes. Max stream size is 10% of the JVM heap or 1GB whichever is smallest");
throw new CircuitBreakingException("Cannot parse model definition as the content is larger than the maximum stream size " +
"of [" + streamSizeCause.getMaxBytes() + "] bytes. Max stream size is 10% of the JVM heap or 1GB whichever is smallest",
CircuitBreaker.Durability.PERMANENT);
} else {
throw parseException;
}
Expand All @@ -92,7 +95,8 @@ static InputStream inflate(String compressedString, long streamSize) throws IOEx
// If the compressed length is already too large, it make sense that the inflated length would be as well
// In the extremely small string case, the compressed data could actually be longer than the compressed stream
if (compressedBytes.length > Math.max(100L, streamSize)) {
throw new IOException("compressed stream is longer than maximum allowed bytes [" + streamSize + "]");
throw new CircuitBreakingException("compressed stream is longer than maximum allowed bytes [" + streamSize + "]",
CircuitBreaker.Durability.PERMANENT);
}
InputStream gzipStream = new GZIPInputStream(new BytesArray(compressedBytes).streamInput(), BUFFER_SIZE);
return new SimpleBoundedInputStream(gzipStream, streamSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.ml.inference;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -65,13 +67,15 @@ public void testInflateParsingTooLargeStream() throws IOException {
String compressedString = InferenceToXContentCompressor.deflate(definition);
int max = compressedString.getBytes(StandardCharsets.UTF_8).length + 10;

IOException e = expectThrows(IOException.class, ()-> InferenceToXContentCompressor.inflate(compressedString,
CircuitBreakingException e = expectThrows(CircuitBreakingException.class, ()-> InferenceToXContentCompressor.inflate(
compressedString,
parser -> TrainedModelDefinition.fromXContent(parser, true).build(),
xContentRegistry(),
max));

assertThat(e.getMessage(), equalTo("Cannot parse model definition as the content is larger than the maximum stream size of ["
+ max + "] bytes. Max stream size is 10% of the JVM heap or 1GB whichever is smallest"));
assertThat(e.getDurability(), equalTo(CircuitBreaker.Durability.PERMANENT));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -89,7 +90,13 @@ public void run(String modelId) {
inferTestDocs(localModel, testDocsIterator);
}
} catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] Error during inference against model [{}]", config.getId(), modelId), e);
LOGGER.error(new ParameterizedMessage("[{}] Error running inference on model [{}]", config.getId(), modelId), e);

if (e instanceof ElasticsearchException) {
Throwable rootCause = ((ElasticsearchException) e).getRootCause();
throw new ElasticsearchException("[{}] failed running inference on model [{}]; cause was [{}]", rootCause, config.getId(),
modelId, rootCause.getMessage());
}
throw ExceptionsHelper.serverError("[{}] failed running inference on model [{}]; cause was [{}]", e, config.getId(), modelId,
e.getMessage());
}
Expand Down

0 comments on commit 945ba2d

Please sign in to comment.