-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk enhancement with dynamic congestion control #32567
base: main
Are you sure you want to change the base?
Conversation
API change check APIView has identified API level changes in this PR and created following API reviews. |
@@ -558,6 +582,9 @@ export const Constants: { | |||
SDKVersion: string; | |||
CosmosDbDiagnosticLevelEnvVarName: string; | |||
DefaultMaxBulkRequestBodySizeInBytes: number; | |||
MaxBulkOperationsCount: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason for adding bulk in the names? Can't it be just Operations count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not any specific reason. we are using these constants for bulk, that's why. Should we change these? In .Net, these are named as MaxOperationsInDirectModeBatchRequest
and DefaultMaxBulkRequestBodySizeInBytes
} finally { | ||
for (const [key, streamer] of this.streamersByPartitionKeyRangeId.entries()) { | ||
streamer.disposeTimers(); | ||
this.limitersByPartitionKeyRangeId.delete(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just delete the entire map in one go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
// Licensed under the MIT License. | ||
import semaphore from "semaphore"; | ||
/** | ||
* Semaphores and locks for execution of Bulk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a better description? Can it be used for other flows too like changefeed, query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The methods (isStopped and stopDispatch) are specific for bulk only. They are used to block requests in case of 410. If there are similar requirements then this could be used.
I'll add more description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
|
||
private async reBatchOperation(operation: ItemBulkOperation): Promise<void> { | ||
const partitionKeyRangeId = await this.resolvePartitionKeyRangeId(operation.operationInput); | ||
operation.operationContext.reRouteOperation(partitionKeyRangeId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of reRouteOperation can it be just update partitionKeyRangeID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
return limiter; | ||
} | ||
|
||
private getOrCreateStreamerForPKRange(pkRangeId: string): BulkStreamerPerPartition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to getStreamerForPKRange
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
* @param operation - operation to add | ||
*/ | ||
add(operation: ItemBulkOperation): void { | ||
let toDispatch: BulkBatcher | null = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this initialized with null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could keep it as undefined also. I'll update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
this.options, | ||
this.diagnosticNode, | ||
); | ||
if (response.statusCode === 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a log about significance of 0 status code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
this.lock.take(() => { | ||
try { | ||
// attempt to add operation until it fits in the current batch for the streamer | ||
while (!this.currentBatcher.tryAdd(operation)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's a backend issue where the backend keeps timing out and, after some retries, returns an error, the dispatcher might get stuck. In such cases, does it really make sense to keep a thread busy while adding the batcher? Wouldn't it be better to make this method async and wait for tryAdd
to complete? Maybe the batcher can handle the issue internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock is only for adding of operation in the batch. The tryAdd
method will try to add operation in current batch. This will only return false when the batch is full. In that case, the full batch will be assigned to toDispatch
and a new batch will get created. So, when this loop will run again, that operation will get added to the new batch.
|
||
if (toDispatch) { | ||
// dispatch with fire and forget. No need to wait for the dispatch to complete. | ||
toDispatch.dispatch(this.partitionMetric); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are calling dispatch with every add operation. I thought it's controlled by timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have both the mechanisms for dispatching the batch. The add operation checks the operation count and batch size condition. Based on this, if the batch is full, it will dispatch it. Timer mechanism ensures timely dispatch to prevent the batch from waiting for too long when there aren't enough operations to fill it.
Constants.BulkMaxDegreeOfConcurrency | ||
) { | ||
if (this.limiter.current() > 0) { | ||
this.limiter.leave(this.congestionIncreaseFactor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
help me understand this? how updating limiter helps in increasing concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the semaphore is initialized with max 50 slots. We start with degree of concurrency as 1 and block 49 slots. According to the algo, we take or leave slots. Suppose we have current degree of concurrency as 1 and we didn't get any throttles, the algo will run and we will release one more slot to increase concurrency. So, now max of 2 requests can be sent parallely.
* @hidden | ||
*/ | ||
|
||
export class ItemBulkOperation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it be an interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
/** | ||
* Represents a result for a specific operation that was part of a batch request | ||
*/ | ||
export class BulkOperationResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
convert it to an iterface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
// API can return 413 which means the response is bigger than 4Mb. | ||
// Operations that exceed the 4Mb limit are returned as 413/3402, while the operations within the 4Mb limit will be 200 | ||
if ( | ||
err.code === StatusCodes.RequestEntityTooLarge && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are already validating the batch size, do we really need this retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we get this status code and substatus code when the response size from backend exceeds 4 Mb. I was not able to replicate this error to see the exact behaviour from backend. But I included it since it was present in the .NET SDK's bulk retry policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a possibility that addition of headers might exceed the size limit.
shouldRetryForThrottle = Array.isArray(retryResult) ? retryResult[0] : retryResult; | ||
} | ||
if (shouldRetryForThrottle) { | ||
await sleep(this.nextRetryPolicy.retryAfterInMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it already covered:
await sleep(retryPolicy.retryAfterInMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bulk retry policy will apply on top of current retry policies since we are applying retry on operation level. So, it will not take this path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check comments
addressed comments |
operations.forEach((operation) => bulkStreamer.addOperations(operation)); | ||
const response = await bulkStreamer.endStream(); | ||
// Create | ||
response.forEach((res, index) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a check for number of responses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
assert.strictEqual(res.statusCode, 201, `Status should be 201 for operation ${index}`), | ||
); | ||
}); | ||
it("Check case when cumulative size of all operations is greater than threshold - payload size is 25x threshold", async function () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about one operation having size greater than threshold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, it will send the request anyway and throw the 413 error. I'll add a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
assert.equal(response[4].resourceBody.name, "nice"); | ||
assert.equal(response[4].statusCode, 200); | ||
}); | ||
it("Check case when cumulative size of all operations is less than threshold", async function () { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only operation input is changing in the test, can we combine these tests together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
* | ||
* @internal | ||
*/ | ||
// TODO: Add a public reset() method which will utilize this to reset the state of the streamer to use it again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it future TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
*/ | ||
async endStream(): Promise<BulkStreamerResponse> { | ||
if (this.streamEnded) { | ||
throw new ErrorResponse("Bulk streamer has already ended."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we attach some status code too? some 4xx code conveying bad request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will be http like status code. should we set that for throwing sdk error?
if (this.streamEnded) { | ||
throw new ErrorResponse("Bulk streamer has already ended."); | ||
} | ||
this.streamEnded = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add it after line 136
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this get stuck if user will keep on adding operation after calling endStream while promise are settling? We were trying to avoid this case with streamEnded flag.
Packages impacted by this PR
@azure/cosmos
Issues associated with this PR
#29100
#30376
Describe the problem that is addressed by this PR
This PR enhances the flow of Bulk API. It implements custom retry policy for bulk and dynamically controls degree of concurrency for each partition according to throttling and no. of items processed.
What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen?
https://excalidraw.com/#room=ac45daa258e67c6fd89d,jkOfy1GkGAKwRUYZkTGgog
Are there test cases added in this PR? (If not, why?)
yes
Provide a list of related PRs (if any)
Command used to generate this PR:**(Applicable only to SDK release request PRs)
Checklists