-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS S3: Add getObjectByRanges to S3 API #2982
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,14 +4,12 @@ | |
|
||
package akka.stream.alpakka.s3.impl | ||
|
||
import java.net.InetSocketAddress | ||
import java.time.{Instant, ZoneOffset, ZonedDateTime} | ||
import scala.annotation.nowarn | ||
import akka.actor.ActorSystem | ||
import akka.annotation.InternalApi | ||
import akka.dispatch.ExecutionContexts | ||
import akka.http.scaladsl.Http.OutgoingConnection | ||
import akka.http.scaladsl.model.StatusCodes.{NoContent, NotFound, OK} | ||
import akka.http.scaladsl.model.headers.ByteRange.FromOffset | ||
import akka.http.scaladsl.model.headers._ | ||
import akka.http.scaladsl.model.{headers => http, _} | ||
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} | ||
|
@@ -26,7 +24,11 @@ import akka.util.ByteString | |
import akka.{Done, NotUsed} | ||
import software.amazon.awssdk.regions.Region | ||
|
||
import scala.collection.immutable | ||
import java.net.InetSocketAddress | ||
import java.time.{Instant, ZoneOffset, ZonedDateTime} | ||
import scala.annotation.{nowarn, tailrec} | ||
import scala.collection.mutable.ListBuffer | ||
import scala.collection.{immutable, mutable} | ||
import scala.concurrent.{Future, Promise} | ||
import scala.util.{Failure, Success, Try} | ||
|
||
|
@@ -37,6 +39,9 @@ import scala.util.{Failure, Success, Try} | |
BucketAndKey.validateObjectKey(key, conf) | ||
this | ||
} | ||
|
||
def mkString: String = | ||
s"s3://$bucket/$key" | ||
} | ||
|
||
/** Internal Api */ | ||
|
@@ -165,9 +170,12 @@ import scala.util.{Failure, Success, Try} | |
import Marshalling._ | ||
|
||
val MinChunkSize: Int = 5 * 1024 * 1024 //in bytes | ||
val DefaultByteRangeSize: Long = 8 * 1024 * 1024 | ||
val atLeastOneByteString: Flow[ByteString, ByteString, NotUsed] = | ||
Flow[ByteString].orElse(Source.single(ByteString.empty)) | ||
|
||
private val RangeEndMarker = "$END$" | ||
|
||
// def because tokens can expire | ||
private def signingKey(implicit settings: S3Settings) = { | ||
val requestDate = ZonedDateTime.now(ZoneOffset.UTC) | ||
|
@@ -232,6 +240,128 @@ import scala.util.{Failure, Success, Try} | |
.mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.parasitic)) | ||
} | ||
|
||
def getObjectByRanges( | ||
s3Location: S3Location, | ||
versionId: Option[String], | ||
s3Headers: S3Headers, | ||
rangeSize: Long = DefaultByteRangeSize, | ||
parallelism: Int = 4 | ||
): Source[ByteString, Future[ObjectMetadata]] = { | ||
Source.fromMaterializer { (_, _) => | ||
val objectMetadataMat = Promise[ObjectMetadata]() | ||
getObjectMetadata(s3Location.bucket, s3Location.key, versionId, s3Headers) | ||
.flatMapConcat { | ||
case Some(s3Meta) if s3Meta.contentLength == 0 => | ||
objectMetadataMat.success(s3Meta) | ||
Source.empty[ByteString] | ||
case Some(s3Meta) => | ||
objectMetadataMat.success(s3Meta) | ||
doGetByRanges(s3Location, versionId, s3Headers, s3Meta.contentLength, rangeSize, parallelism) | ||
case None => | ||
val exc = new NoSuchElementException(s"Object does not exist at location [${s3Location.mkString}]") | ||
objectMetadataMat.failure(exc) | ||
Source.failed(exc) | ||
} | ||
.mapError { | ||
case e: Throwable => | ||
objectMetadataMat.tryFailure(e) | ||
e | ||
} | ||
.mapMaterializedValue(_ => objectMetadataMat.future) | ||
} | ||
.mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.parasitic)) | ||
} | ||
|
||
private def doGetByRanges( | ||
s3Location: S3Location, | ||
versionId: Option[String], | ||
s3Headers: S3Headers, | ||
contentLength: Long, | ||
rangeSize: Long, | ||
parallelism: Int | ||
): Source[ByteString, Any] = { | ||
val byteRanges = computeByteRanges(contentLength, rangeSize) | ||
if (byteRanges.size <= 1) { | ||
getObject(s3Location, None, versionId, s3Headers) | ||
} else { | ||
Source(byteRanges) | ||
.zipWithIndex | ||
.flatMapMerge(parallelism, brToIdx => { | ||
val (br, idx) = brToIdx | ||
val endMarker = Source.single(ByteString("$END$")) | ||
getObject(s3Location, Some(br), versionId, s3Headers).concat(endMarker).map(_ -> idx) | ||
}) | ||
.statefulMapConcat(RangeMapConcat) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Composing each range-source with a buffer to allow parallel fetching, and then concatenating the resulting streams to get the resulting bytes out in the right order seems like it would achieve the same but much simpler. Am I missing something clever that this does? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understood correctly, you are thinking about getObject(s3Location, Some(br), versionId, s3Headers).conflate(_ ++ _).concat(endMarker).map(_ -> idx) As How can I order them back, without Note I am not trying to buffer "next" range, if bytes of the "next" range are pushed, I'll push them directly downstream as buffering those bytes is useless (?). As well, regarding buffers, was not sure if it was useful to "hard pull" upstreams until //...
.statefulMapConcat(RangeMapConcat)
// Might be useful to consume elements of all flatMapMerge materialized upstreams
.batchWeighted(parallelism * rangeSize, _.size, identity)(_ ++ _) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking something like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But ofc that may not be good enough with buffer sized in chunks instead of bytes, we don't have a buffer with weighted size calculation though, maybe batchWeighted could do, not sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm can't make it work: Tried with: Source(byteRanges)
.mapAsync(parallelism)(br => Future.successful(
getObject(s3Location, Some(br), versionId, s3Headers).batchWeighted(rangeSize, _.size, identity)(_ ++ _)
))
.flatMapConcat(identity) and Source(byteRanges)
.mapAsync(parallelism)(br => Future.successful(
Source.fromMaterializer { case (mat, _) =>
getObject(s3Location, Some(br), versionId, s3Headers)
.preMaterialize()(mat)
._2
.batchWeighted(rangeSize, _.size, identity)(_ ++ _)
}
))
.flatMapConcat(identity) But in both situations, ranges are fetched one by one and download perf looks like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, ofc, they aren't materialized so they can start consume bytes until flatMapConcat:ed, didn't think of that. Pre-materialization creates a running source but the downstream is not materialized until it is used, so you would need to put the batching before preMaterialize. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the record I created an upstream issue with an idea that could make this kind of thing easier: akka/akka#31958 (continue with the current solution here though) |
||
} | ||
} | ||
|
||
private def computeByteRanges(contentLength: Long, rangeSize: Long): Seq[ByteRange] = { | ||
require(contentLength >= 0, s"contentLength ($contentLength) must be >= 0") | ||
require(rangeSize > 0, s"rangeSize ($rangeSize) must be > 0") | ||
if (contentLength <= rangeSize) | ||
Nil | ||
else { | ||
val ranges = ListBuffer[ByteRange]() | ||
for (i <- 0L until contentLength by rangeSize) { | ||
if ((i + rangeSize) >= contentLength) | ||
ranges += FromOffset(i) | ||
else | ||
ranges += ByteRange(i, i + rangeSize - 1) | ||
} | ||
ranges.result() | ||
} | ||
} | ||
|
||
private val RangeMapConcat: () => ((ByteString, Long)) => IterableOnce[ByteString] = () => { | ||
var currentRangeIdx = 0L | ||
var completedRanges = Set.empty[Long] | ||
var bufferByRangeIdx = Map.empty[Long, mutable.Queue[ByteString]] | ||
|
||
val isEndMarker: ByteString => Boolean = bs => bs.size == RangeEndMarker.length && bs.utf8String == RangeEndMarker | ||
|
||
def foldRangeBuffers(): Option[ByteString] = { | ||
@tailrec | ||
def innerFoldRangeBuffers(acc: Option[ByteString]): Option[ByteString] = { | ||
bufferByRangeIdx.get(currentRangeIdx) match { | ||
case None => | ||
if (completedRanges.contains(currentRangeIdx)) | ||
currentRangeIdx += 1 | ||
if (bufferByRangeIdx.contains(currentRangeIdx)) | ||
innerFoldRangeBuffers(acc) | ||
else | ||
acc | ||
case Some(queue) => | ||
val next = queue.dequeueAll(_ => true).foldLeft(acc.getOrElse(ByteString.empty))(_ ++ _) | ||
bufferByRangeIdx -= currentRangeIdx | ||
if (completedRanges.contains(currentRangeIdx)) | ||
currentRangeIdx += 1 | ||
if (bufferByRangeIdx.contains(currentRangeIdx)) | ||
innerFoldRangeBuffers(Some(next)) | ||
else | ||
Some(next) | ||
} | ||
} | ||
|
||
innerFoldRangeBuffers(None) | ||
} | ||
|
||
bsToIdx => { | ||
val (bs, idx) = bsToIdx | ||
if (isEndMarker(bs)) { | ||
completedRanges = completedRanges + idx | ||
foldRangeBuffers().toList | ||
} else if (idx == currentRangeIdx) { | ||
bs :: Nil | ||
} else { | ||
bufferByRangeIdx = bufferByRangeIdx.updatedWith(idx.toInt) { | ||
case Some(queue) => Some(queue.enqueue(bs)) | ||
case None => Some(mutable.Queue(bs)) | ||
} | ||
Nil | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* An ADT that represents the current state of pagination | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will close the downstream ASAP, do you want to defer it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It happens after
getObjectMetadata
result has been pulled so I am not sure of what that implies in this context ?The idea is that no
ObjectMetadata
means no S3 object, so I think the source should fail as well as the materialized Future.