Skip to content

Commit

Permalink
Include sharding key if it is available in the document (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
pitagoras3 authored Oct 23, 2024
1 parent 0ff06aa commit 0a7ce3b
Showing 1 changed file with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,37 @@ internal data class InsertReplaceChangeEvent(

fun fromMongoChangeStreamDocument(
changeStreamDocument: ChangeStreamDocument<BsonDocument>,
shardingKey: String?
): InsertReplaceChangeEvent =
InsertReplaceChangeEvent(
changeStreamDocument.operationType,
upsertKey(changeStreamDocument.documentKey!!, shardingKey),
shardingKey: String?,
): InsertReplaceChangeEvent {
val documentFromEvent =
changeStreamDocument.fullDocument?.toBsonDocument(
BsonDocument::class.java,
codecRegistry
codecRegistry,
)

return InsertReplaceChangeEvent(
changeStreamDocument.operationType,
upsertKey(changeStreamDocument.documentKey!!, shardingKey, documentFromEvent),
documentFromEvent,
)
}

// This addresses "Failed to target upsert by query :: could not extract exact shard key" error for sharded destination collections
// https://www.mongodb.com/docs/manual/reference/method/db.collection.update/#sharded-collections
private fun upsertKey(documentKey: BsonDocument, shardingKey: String?): BsonDocument {
if (shardingKey == null) return documentKey // When no shardingKey, don't change documentKey
if (documentKey.containsKey(shardingKey)) return documentKey // When sharding key is already in documentKey, don't change documentKey
return documentKey.append(shardingKey, BsonNull.VALUE) // When there is no shardingKey, add "shardingKey: null" to documentKey
private fun upsertKey(
documentKey: BsonDocument,
shardingKey: String?,
documentFromEvent: BsonDocument?,
): BsonDocument {
// When no shardingKey, don't change documentKey
if (shardingKey == null) return documentKey
// When sharding key is already in documentKey, don't change documentKey
if (documentKey.containsKey(shardingKey)) return documentKey
// When destination collection is sharded, and there is no shardingKey, add "shardingKey: <value>" to documentKey
return documentKey.append(
shardingKey,
documentFromEvent?.get(shardingKey, BsonNull.VALUE) ?: BsonNull.VALUE,
)
}
}

Expand Down

0 comments on commit 0a7ce3b

Please sign in to comment.