Skip to content

Commit

Permalink
Merge pull request #38 from scalableminds/fix-get-multiple-keys-pagin…
Browse files Browse the repository at this point in the history
…ation

Fix pagination for GetMultipleKeys, introducing startAfterKey
  • Loading branch information
fm3 authored May 8, 2023
2 parents 7477ec1 + e928433 commit 18a119e
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 47 deletions.
9 changes: 9 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Changelog

## Breaking Changes

- The `GetMultipleKeys` call now takes a `startAfterKey` instead of a `key` for pagination. The returned list will only start *after* this key. [#38](https://github.com/scalableminds/fossildb/pull/38)

## Fixes

- Fixed a bug where the pagination for `GetMultipleKeys` could lead to an endless loop if some keys are prefixes of others. [#38](https://github.com/scalableminds/fossildb/pull/38)
2 changes: 1 addition & 1 deletion src/main/protobuf/fossildbapi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ message GetMultipleVersionsReply {

message GetMultipleKeysRequest {
required string collection = 1;
required string key = 2;
optional string startAfterKey = 2;
optional string prefix = 3;
optional uint64 version = 4;
optional uint32 limit = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)

override def getMultipleKeys(req: GetMultipleKeysRequest): Future[GetMultipleKeysReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val (keys, values, versions) = store.getMultipleKeys(req.key, req.prefix, req.version, req.limit)
val (keys, values, versions) = store.getMultipleKeys(req.startAfterKey, req.prefix, req.version, req.limit)
GetMultipleKeysReply(success = true, None, keys, values.map(ByteString.copyFrom), versions)
} { errorMsg => GetMultipleKeysReply(success = false, errorMsg) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import scala.util.Try


case class VersionedKey(key: String, version: Long) {
override def toString: String = s"$key@${(~version).toHexString.toUpperCase}@$version"
override def toString: String = s"$key${VersionedKey.versionSeparator}${(~version).toHexString.toUpperCase}${VersionedKey.versionSeparator}$version"
}

object VersionedKey {

val versionSeparator: Char = '@'

def apply(key: String): Option[VersionedKey] = {
val parts = key.split('@')
val parts = key.split(versionSeparator)
for {
key <- parts.headOption
versionString <- parts.lastOption
Expand Down Expand Up @@ -91,8 +93,6 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
def get(key: String, version: Option[Long] = None): Option[VersionedKeyValuePair[Array[Byte]]] =
scanVersionValuePairs(key, version).toStream.headOption

def getUnderlying: RocksDBStore = underlying

def getMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): (List[Array[Byte]], List[Long]) = {

@tailrec
Expand All @@ -113,33 +113,52 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {

private def scanVersionValuePairs(key: String, version: Option[Long] = None): Iterator[VersionedKeyValuePair[Array[Byte]]] = {
requireValidKey(key)
val prefix = s"$key@"
val prefix = s"$key${VersionedKey.versionSeparator}"
underlying.scan(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { pair =>
VersionedKey(pair.key).map(VersionedKeyValuePair(_, pair.value))
}
}

private def scanVersionsOnly(key: String, version: Option[Long] = None): Iterator[VersionedKey] = {
requireValidKey(key)
val prefix = s"$key@"
val prefix = s"$key${VersionedKey.versionSeparator}"
underlying.scanKeysOnly(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { key =>
VersionedKey(key)
}
}

def getMultipleKeys(key: String, prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
requireValidKey(key)
def getMultipleKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
startAfterKey.foreach(requireValidKey)
prefix.foreach{ p => requireValidKey(p)}
val iterator: VersionFilterIterator = scanKeys(key, prefix, version)
val asSequence = iterator.take(limit.getOrElse(Int.MaxValue)).toSeq
val keys = asSequence.map(_.key)
val values = asSequence.map(_.value)
val versions = asSequence.map(_.version)
val iterator: VersionFilterIterator = scanKeys(startAfterKey, prefix, version)

/*
Note that seek in the underlying iterators either hits precisely or goes to the
lexicographically *next* key. To achieve correct behavior with startAfterKey,
we have to advance once in case of the exact hit.
*/
val firstItemOpt: Option[VersionedKeyValuePair[Array[Byte]]] = if (iterator.hasNext) {
val firstItem = iterator.next()
if (startAfterKey.contains(firstItem.key)) {
None
} else {
Some(firstItem)
}
} else None

val limitPadded = limit.map(_ + 1).getOrElse(Int.MaxValue)
val asVector = iterator.take(limitPadded).toVector
val asSequenceAdvancedIfNeeded = firstItemOpt.map(_ +: asVector).getOrElse(asVector).take(limit.getOrElse(Int.MaxValue))
val keys = asSequenceAdvancedIfNeeded.map(_.key)
val values = asSequenceAdvancedIfNeeded.map(_.value)
val versions = asSequenceAdvancedIfNeeded.map(_.version)
(keys, values, versions)
}

private def scanKeys(key: String, prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator =
new VersionFilterIterator(underlying.scan(key, prefix), version)
private def scanKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator = {
val fullKey = startAfterKey.map(key => s"$key${VersionedKey.versionSeparator}").orElse(prefix).getOrElse("")
new VersionFilterIterator(underlying.scan(fullKey, prefix), version)
}

def deleteMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): Unit = {
@tailrec
Expand Down Expand Up @@ -178,6 +197,6 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
}

private def requireValidKey(key: String): Unit = {
require(!(key contains "@"), "keys cannot contain the char @")
require(!key.contains(VersionedKey.versionSeparator), s"keys cannot contain the char ${VersionedKey.versionSeparator}")
}
}
116 changes: 87 additions & 29 deletions src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
private val testData3 = ByteString.copyFromUtf8("testData3")

private val aKey = "aKey"
private val anotherKey = "anotherKey"
private val aNotherKey = "aNotherKey"
private val aThirdKey = "aThirdKey"

override def beforeEach: Unit = {
Expand Down Expand Up @@ -114,7 +114,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
}

it should "fail after Put with other key" in {
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
val reply = client.get(GetRequest(collectionA, aKey))
assert(!reply.success)
}
Expand All @@ -136,24 +136,24 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
"ListKeys" should "list all keys of a collection" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, anotherKey, Some(4), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(4), testData2))
client.put(PutRequest(collectionB, aThirdKey, Some(1), testData1))
val reply = client.listKeys(ListKeysRequest(collectionA))
assert(reply.keys.contains(aKey))
assert(reply.keys.contains(anotherKey))
assert(reply.keys.contains(aNotherKey))
assert(reply.keys.length == 2)
}

it should "support pagination with startAfterKey" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, anotherKey, Some(4), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(4), testData2))
client.put(PutRequest(collectionB, aThirdKey, Some(1), testData1))
val reply = client.listKeys(ListKeysRequest(collectionA, Some(1)))
assert(reply.keys.length == 1)
assert(reply.keys.contains(aKey))
val reply2 = client.listKeys(ListKeysRequest(collectionA, Some(1), Some(reply.keys.last)))
assert(reply2.keys.contains(anotherKey))
assert(reply2.keys.contains(aNotherKey))
assert(reply2.keys.length == 1)
}

Expand All @@ -173,7 +173,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
val reply = client.getMultipleVersions(GetMultipleVersionsRequest(collectionA, aKey))
assert(reply.versions(0) == 2)
assert(reply.versions(1) == 1)
Expand All @@ -191,7 +191,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
client.put(PutRequest(collectionA, aKey, Some(3), testData3))
client.put(PutRequest(collectionA, aKey, Some(4), testData1))
client.put(PutRequest(collectionA, aKey, Some(5), testData1))
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))

val reply = client.getMultipleVersions(GetMultipleVersionsRequest(collectionA, aKey, Some(4), Some(2)))
assert(reply.versions(0) == 4)
Expand All @@ -202,68 +202,126 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
assert(reply.values.length == 2)
}

"GetMultipleKeys" should "return keys starting with initial one (no prefix)" in {
"GetMultipleKeys" should "return all keys" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, anotherKey, Some(0), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData2))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData3))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aThirdKey))
assert(reply.keys.length == 2)
assert(reply.keys.contains(anotherKey))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA))
assert(reply.keys.length == 3)
assert(reply.keys.contains(aNotherKey))
assert(reply.keys.contains(aThirdKey))
assert(reply.values.length == 2)
assert(reply.values.length == 3)
assert(reply.values.contains(testData2))
assert(reply.values.contains(testData3))
assert(reply.actualVersions.length == 2)
assert(reply.actualVersions.length == 3)
assert(reply.actualVersions.contains(0))
}

it should "return keys of matching version (sorted alphabetically)" in {
it should "return keys of matching version" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, anotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
client.put(PutRequest(collectionA, anotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aKey, None, Some(1)))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, None, Some(1)))
assert(reply.keys.length == 3)
assert(reply.values.contains(testData2))
}

it should "return keys of matching version, matching prefix (sorted alphabetically)" in {
it should "return keys of matching version, matching prefix" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, anotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
client.put(PutRequest(collectionA, anotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aKey, Some("aK"), Some(1)))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, Some("aN"), Some(1)))
assert(reply.keys.length == 1)
assert(reply.keys.contains(aNotherKey))
assert(reply.values.contains(testData2))
assert(reply.actualVersions.contains(1))
}

it should "return keys of matching version, matching prefix even if it is exact match" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, Some(aNotherKey), Some(1)))
assert(reply.keys.length == 1)
assert(reply.keys.contains(aNotherKey))
assert(reply.values.contains(testData2))
assert(reply.actualVersions.contains(1))
}

it should "with limit return only the first n keys of matching version " in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, anotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
client.put(PutRequest(collectionA, anotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aKey, None, Some(1), Some(2)))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, None, Some(1), Some(2)))
assert(reply.keys.length == 2)
assert(reply.values.contains(testData2))
assert(reply.actualVersions.contains(1))
}

it should "support pagination with startAfterKey" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, Some(aKey), None, None, Some(2)))
assert(reply.keys.length == 2)
assert(reply.values.contains(testData1))
assert(reply.actualVersions.contains(0))
}

it should "support pagination with startAfterKey, with prefix and version" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, Some(aKey), Some("a"), Some(1), Some(1)))
assert(reply.keys.length == 1)
assert(reply.values.contains(testData2))
assert(reply.actualVersions.contains(1))
}

it should "support pagination with startAfterKey, with prefix and version where no keys match the prefix" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, Some(aKey), Some("BogusPrefix"), Some(1), Some(2)))
assert(reply.keys.isEmpty)
}

"Backup" should "create non-empty backup directory" in {
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
client.backup(BackupRequest())
Expand Down

0 comments on commit 18a119e

Please sign in to comment.