Skip to content

Commit

Permalink
avro: go back to using input stream when parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed May 17, 2024
1 parent 33cf12d commit 3f45326
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ trait AvroSupport {

Try {
if (bs.isEmpty) throw Unmarshaller.NoContentException
AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next()
val builder =
if (ByteStringInputStream.byteStringSupportsAsInputStream)
AvroInputStream.json[A].from(ByteStringInputStream(bs))
else
AvroInputStream.json[A].from(bs.toArrayUnsafe())
builder.build(schema).iterator.next()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.pjfanning.pekkohttpavro4s

import org.apache.pekko.util.ByteString
import org.apache.pekko.util.ByteString.ByteString1C

import java.io.{ ByteArrayInputStream, InputStream }
import java.lang.invoke.{ MethodHandles, MethodType }
import scala.util.Try

private[pekkohttpavro4s] object ByteStringInputStream {
private val byteStringInputStreamMethodTypeOpt = Try {
val lookup = MethodHandles.publicLookup()
val inputStreamMethodType = MethodType.methodType(classOf[InputStream])
lookup.findVirtual(classOf[ByteString], "asInputStream", inputStreamMethodType)
}.toOption

def apply(bs: ByteString): InputStream = bs match {
case cs: ByteString1C =>
getInputStreamUnsafe(cs)
case _ =>
if (byteStringInputStreamMethodTypeOpt.isDefined) {
byteStringInputStreamMethodTypeOpt.get.invoke(bs).asInstanceOf[InputStream]
} else {
getInputStreamUnsafe(bs)
}
}

val byteStringSupportsAsInputStream: Boolean = byteStringInputStreamMethodTypeOpt.isDefined

private def getInputStreamUnsafe(bs: ByteString): InputStream =
new ByteArrayInputStream(bs.toArrayUnsafe())
}

0 comments on commit 3f45326

Please sign in to comment.