Skip to content

Commit

Permalink
[msgpack#691] Improve memory usage via reuse InputStreamBufferInput
Browse files Browse the repository at this point in the history
  • Loading branch information
koo-taejin committed Jan 19, 2023
1 parent f1ac3dd commit 56ac516
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* MessageBufferInput adapter for byte arrays
*/
public class ArrayBufferInput
implements MessageBufferInput
implements MessageBufferInput<byte[]>
{
private MessageBuffer buffer;
private boolean isEmpty;
Expand Down Expand Up @@ -66,9 +66,14 @@ public MessageBuffer reset(MessageBuffer buf)
return old;
}

public void reset(byte[] arr)
@Override
public byte[] reset(byte[] arr)
{
reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null")));
final MessageBuffer messageBuffer = reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null")));
if (messageBuffer == null) {
return null;
}
return messageBuffer.array();
}

public void reset(byte[] arr, int offset, int len)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer}
*/
public class ByteBufferInput
implements MessageBufferInput
implements MessageBufferInput<ByteBuffer>
{
private ByteBuffer input;
private boolean isRead = false;
Expand All @@ -39,6 +39,7 @@ public ByteBufferInput(ByteBuffer input)
* @param input new buffer
* @return the old buffer
*/
@Override
public ByteBuffer reset(ByteBuffer input)
{
ByteBuffer old = this.input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* {@link MessageBufferInput} adapter for {@link java.nio.channels.ReadableByteChannel}
*/
public class ChannelBufferInput
implements MessageBufferInput
implements MessageBufferInput<ReadableByteChannel>
{
private ReadableByteChannel channel;
private final MessageBuffer buffer;
Expand All @@ -49,8 +49,8 @@ public ChannelBufferInput(ReadableByteChannel channel, int bufferSize)
* @param channel new channel
* @return the old resource
*/
@Override
public ReadableByteChannel reset(ReadableByteChannel channel)
throws IOException
{
ReadableByteChannel old = this.channel;
this.channel = channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* {@link MessageBufferInput} adapter for {@link InputStream}
*/
public class InputStreamBufferInput
implements MessageBufferInput
implements MessageBufferInput<InputStream>
{
private InputStream in;
private final byte[] buffer;
Expand Down Expand Up @@ -60,8 +60,8 @@ public InputStreamBufferInput(InputStream in, int bufferSize)
* @param in new stream
* @return the old resource
*/
@Override
public InputStream reset(InputStream in)
throws IOException
{
InputStream old = this.in;
this.in = in;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* A MessageBufferInput implementation has control of lifecycle of the memory so that it can reuse previously
* allocated memory, use memory pools, or use memory-mapped files.
*/
public interface MessageBufferInput
public interface MessageBufferInput<T>
extends Closeable
{
/**
Expand All @@ -40,6 +40,8 @@ public interface MessageBufferInput
MessageBuffer next()
throws IOException;

T reset(T input);

/**
* Closes the input.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration
*/
public class SequenceMessageBufferInput
implements MessageBufferInput
implements MessageBufferInput<Void>
{
private Enumeration<? extends MessageBufferInput> sequence;
private MessageBufferInput input;
Expand Down Expand Up @@ -54,6 +54,11 @@ public MessageBuffer next() throws IOException
return buffer;
}

@Override
public Void reset(Void input) {
throw new UnsupportedOperationException("reset");
}

private void nextInput() throws IOException
{
if (input != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
import scala.util.Random

object MessageUnpackerTest {
class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput {
class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput[Void] {
var cursor = 0
override def next(): MessageBuffer = {
if (cursor < array.length) {
Expand All @@ -41,6 +41,9 @@ object MessageUnpackerTest {
}
}


override def reset(input: Void): Void = throw new UnsupportedOperationException("reset")

override def close(): Unit = {}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ByteStringTest extends AirSpec {
private val byteString = ByteString(createMessagePackData(_.packString(unpackedString)))

private def unpackString(messageBuffer: MessageBuffer) = {
val input = new MessageBufferInput {
val input = new MessageBufferInput[Void] {

private var isRead = false

Expand All @@ -38,6 +38,8 @@ class ByteStringTest extends AirSpec {
messageBuffer
}
override def close(): Unit = {}

override def reset(input: Void): Void = throw new UnsupportedOperationException("reset")
}

MessagePack.newDefaultUnpacker(input).unpackString()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.jackson.dataformat;

import org.msgpack.core.buffer.MessageBufferInput;

public interface MessageBufferInputLocator
{
MessageBufferInput get(Class clazz);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.jackson.dataformat;

import org.msgpack.core.buffer.MessageBufferInput;

interface MessageBufferInputProvider
{
MessageBufferInput provide();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// MessagePack for Java
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package org.msgpack.jackson.dataformat;

import org.msgpack.core.buffer.MessageBufferInput;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class MessageBufferInputRegistry implements MessageBufferInputLocator
{
private final Map<Class, MessageBufferInput> messageBufferInputMap = new HashMap<>(1);

@Override
public MessageBufferInput get(Class clazz)
{
return messageBufferInputMap.get(clazz);
}

public boolean register(Class clazz, MessageBufferInputProvider provider)
{
Objects.requireNonNull(clazz, "clazz");
Objects.requireNonNull(provider, "provider");

if (messageBufferInputMap.containsKey(clazz)) {
return false;
}

messageBufferInputMap.put(clazz, provider.provide());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
public class MessagePackParser
extends ParserMinimalBase
{
private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder =
new ThreadLocal<Tuple<Object, MessageUnpacker>>();
private static final ThreadLocal<Triple<Object, MessageUnpacker, MessageBufferInputLocator>> reuseObjectHolder =
new ThreadLocal<>();
private final MessageUnpacker messageUnpacker;

private static final BigInteger LONG_MIN = BigInteger.valueOf((long) Long.MIN_VALUE);
Expand Down Expand Up @@ -126,11 +126,17 @@ public MessagePackParser(
IOContext ctxt,
int features,
ObjectCodec objectCodec,
InputStream in,
final InputStream in,
boolean reuseResourceInParser)
throws IOException
{
this(ctxt, features, new InputStreamBufferInput(in), objectCodec, in, reuseResourceInParser);
this(ctxt, features, new MessageBufferInputProvider() {
@Override
public MessageBufferInput provide()
{
return new InputStreamBufferInput(in);
}
}, objectCodec, in, reuseResourceInParser);
}

public MessagePackParser(IOContext ctxt, int features, ObjectCodec objectCodec, byte[] bytes)
Expand All @@ -143,16 +149,22 @@ public MessagePackParser(
IOContext ctxt,
int features,
ObjectCodec objectCodec,
byte[] bytes,
final byte[] bytes,
boolean reuseResourceInParser)
throws IOException
{
this(ctxt, features, new ArrayBufferInput(bytes), objectCodec, bytes, reuseResourceInParser);
this(ctxt, features, new MessageBufferInputProvider() {
@Override
public MessageBufferInput provide()
{
return new ArrayBufferInput(bytes);
}
}, objectCodec, bytes, reuseResourceInParser);
}

private MessagePackParser(IOContext ctxt,
int features,
MessageBufferInput input,
MessageBufferInputProvider bufferInputProvider,
ObjectCodec objectCodec,
Object src,
boolean reuseResourceInParser)
Expand All @@ -167,29 +179,48 @@ private MessagePackParser(IOContext ctxt,
parsingContext = JsonReadContext.createRootContext(dups);
this.reuseResourceInParser = reuseResourceInParser;
if (!reuseResourceInParser) {
this.messageUnpacker = MessagePack.newDefaultUnpacker(input);
this.messageUnpacker = MessagePack.newDefaultUnpacker(bufferInputProvider.provide());
return;
}
else {
this.messageUnpacker = null;
}

MessageUnpacker messageUnpacker;
Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get();
if (messageUnpackerTuple == null) {
messageUnpacker = MessagePack.newDefaultUnpacker(input);
MessageBufferInputLocator messageBufferInputLocator;
Triple<Object, MessageUnpacker, MessageBufferInputLocator> messageUnpackerTriple = reuseObjectHolder.get();
if (messageUnpackerTriple == null) {
final MessageBufferInputRegistry messageBufferInputRegistry = new MessageBufferInputRegistry();
messageBufferInputRegistry.register(src.getClass(), bufferInputProvider);
messageBufferInputLocator = messageBufferInputRegistry;
messageUnpacker = MessagePack.newDefaultUnpacker(messageBufferInputRegistry.get(src.getClass()));
}
else {
// Considering to reuse InputStream with JsonParser.Feature.AUTO_CLOSE_SOURCE,
// MessagePackParser needs to use the MessageUnpacker that has the same InputStream
// since it has buffer which has loaded the InputStream data ahead.
// However, it needs to call MessageUnpacker#reset when the source is different from the previous one.
if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTuple.first() != src) {
messageUnpackerTuple.second().reset(input);
if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTriple.first() != src) {
final MessageBufferInputLocator bufferInputLocator = messageUnpackerTriple.third();
MessageBufferInput messageBufferInput = bufferInputLocator.get(src.getClass());
if (messageBufferInput != null) {
messageBufferInput.reset(src);
}
else {
if (bufferInputLocator instanceof MessageBufferInputRegistry) {
((MessageBufferInputRegistry) bufferInputLocator).register(src.getClass(), bufferInputProvider);
messageBufferInput = bufferInputLocator.get(src.getClass());
}
else {
messageBufferInput = bufferInputProvider.provide();
}
}
messageUnpackerTriple.second().reset(messageBufferInput);
}
messageUnpacker = messageUnpackerTuple.second();
messageUnpacker = messageUnpackerTriple.second();
messageBufferInputLocator = messageUnpackerTriple.third();
}
messageUnpackerHolder.set(new Tuple<Object, MessageUnpacker>(src, messageUnpacker));
reuseObjectHolder.set(new Triple<Object, MessageUnpacker, MessageBufferInputLocator>(src, messageUnpacker, messageBufferInputLocator));
}

public void setExtensionTypeCustomDeserializers(ExtensionTypeCustomDeserializers extTypeCustomDesers)
Expand Down Expand Up @@ -690,7 +721,7 @@ private MessageUnpacker getMessageUnpacker()
return this.messageUnpacker;
}

Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get();
Triple<Object, MessageUnpacker, MessageBufferInputLocator> messageUnpackerTuple = reuseObjectHolder.get();
if (messageUnpackerTuple == null) {
throw new IllegalStateException("messageUnpacker is null");
}
Expand Down
Loading

0 comments on commit 56ac516

Please sign in to comment.