Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk enhancement with dynamic congestion control #32567

Open
wants to merge 45 commits into
base: main
Choose a base branch
from

Conversation

aditishree1
Copy link
Member

Packages impacted by this PR

@azure/cosmos

Issues associated with this PR

#29100
#30376

Describe the problem that is addressed by this PR

This PR enhances the flow of Bulk API. It implements custom retry policy for bulk and dynamically controls degree of concurrency for each partition according to throttling and no. of items processed.

What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen?

https://excalidraw.com/#room=ac45daa258e67c6fd89d,jkOfy1GkGAKwRUYZkTGgog

Are there test cases added in this PR? (If not, why?)

yes

Provide a list of related PRs (if any)

Command used to generate this PR:**(Applicable only to SDK release request PRs)

Checklists

  • Added impacted package name to the issue description
  • Does this PR needs any fixes in the SDK Generator?** (If so, create an Issue in the Autorest/typescript repository and link it here)
  • Added a changelog (if necessary)

@azure-sdk
Copy link
Collaborator

API change check

APIView has identified API level changes in this PR and created following API reviews.

@azure/cosmos

@aditishree1 aditishree1 marked this pull request as ready for review January 21, 2025 03:08
@@ -558,6 +582,9 @@ export const Constants: {
SDKVersion: string;
CosmosDbDiagnosticLevelEnvVarName: string;
DefaultMaxBulkRequestBodySizeInBytes: number;
MaxBulkOperationsCount: number;
Copy link
Member

@topshot99 topshot99 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for adding bulk in the names? Can't it be just Operations count?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not any specific reason. we are using these constants for bulk, that's why. Should we change these? In .Net, these are named as MaxOperationsInDirectModeBatchRequest and DefaultMaxBulkRequestBodySizeInBytes

} finally {
for (const [key, streamer] of this.streamersByPartitionKeyRangeId.entries()) {
streamer.disposeTimers();
this.limitersByPartitionKeyRangeId.delete(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just delete the entire map in one go?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

// Licensed under the MIT License.
import semaphore from "semaphore";
/**
* Semaphores and locks for execution of Bulk
Copy link
Member

@topshot99 topshot99 Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a better description? Can it be used for other flows too like changefeed, query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods (isStopped and stopDispatch) are specific for bulk only. They are used to block requests in case of 410. If there are similar requirements then this could be used.
I'll add more description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed


private async reBatchOperation(operation: ItemBulkOperation): Promise<void> {
const partitionKeyRangeId = await this.resolvePartitionKeyRangeId(operation.operationInput);
operation.operationContext.reRouteOperation(partitionKeyRangeId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of reRouteOperation can it be just update partitionKeyRangeID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

return limiter;
}

private getOrCreateStreamerForPKRange(pkRangeId: string): BulkStreamerPerPartition {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to getStreamerForPKRange

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

* @param operation - operation to add
*/
add(operation: ItemBulkOperation): void {
let toDispatch: BulkBatcher | null = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this initialized with null

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could keep it as undefined also. I'll update.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

this.options,
this.diagnosticNode,
);
if (response.statusCode === 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log about significance of 0 status code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

this.lock.take(() => {
try {
// attempt to add operation until it fits in the current batch for the streamer
while (!this.currentBatcher.tryAdd(operation)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a backend issue where the backend keeps timing out and, after some retries, returns an error, the dispatcher might get stuck. In such cases, does it really make sense to keep a thread busy while adding the batcher? Wouldn't it be better to make this method async and wait for tryAdd to complete? Maybe the batcher can handle the issue internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is only for adding of operation in the batch. The tryAdd method will try to add operation in current batch. This will only return false when the batch is full. In that case, the full batch will be assigned to toDispatch and a new batch will get created. So, when this loop will run again, that operation will get added to the new batch.


if (toDispatch) {
// dispatch with fire and forget. No need to wait for the dispatch to complete.
toDispatch.dispatch(this.partitionMetric);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calling dispatch with every add operation. I thought it's controlled by timer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have both the mechanisms for dispatching the batch. The add operation checks the operation count and batch size condition. Based on this, if the batch is full, it will dispatch it. Timer mechanism ensures timely dispatch to prevent the batch from waiting for too long when there aren't enough operations to fill it.

Constants.BulkMaxDegreeOfConcurrency
) {
if (this.limiter.current() > 0) {
this.limiter.leave(this.congestionIncreaseFactor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

help me understand this? how updating limiter helps in increasing concurrency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the semaphore is initialized with max 50 slots. We start with degree of concurrency as 1 and block 49 slots. According to the algo, we take or leave slots. Suppose we have current degree of concurrency as 1 and we didn't get any throttles, the algo will run and we will release one more slot to increase concurrency. So, now max of 2 requests can be sent parallely.

* @hidden
*/

export class ItemBulkOperation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be an interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

/**
* Represents a result for a specific operation that was part of a batch request
*/
export class BulkOperationResult {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert it to an iterface

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

// API can return 413 which means the response is bigger than 4Mb.
// Operations that exceed the 4Mb limit are returned as 413/3402, while the operations within the 4Mb limit will be 200
if (
err.code === StatusCodes.RequestEntityTooLarge &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are already validating the batch size, do we really need this retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we get this status code and substatus code when the response size from backend exceeds 4 Mb. I was not able to replicate this error to see the exact behaviour from backend. But I included it since it was present in the .NET SDK's bulk retry policy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a possibility that addition of headers might exceed the size limit.

shouldRetryForThrottle = Array.isArray(retryResult) ? retryResult[0] : retryResult;
}
if (shouldRetryForThrottle) {
await sleep(this.nextRetryPolicy.retryAfterInMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it already covered:

await sleep(retryPolicy.retryAfterInMs);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bulk retry policy will apply on top of current retry policies since we are applying retry on operation level. So, it will not take this path.

Copy link
Member

@topshot99 topshot99 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check comments

@aditishree1
Copy link
Member Author

check comments

addressed comments

operations.forEach((operation) => bulkStreamer.addOperations(operation));
const response = await bulkStreamer.endStream();
// Create
response.forEach((res, index) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a check for number of responses

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

assert.strictEqual(res.statusCode, 201, `Status should be 201 for operation ${index}`),
);
});
it("Check case when cumulative size of all operations is greater than threshold - payload size is 25x threshold", async function () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about one operation having size greater than threshold?

Copy link
Member Author

@aditishree1 aditishree1 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, it will send the request anyway and throw the 413 error. I'll add a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

assert.equal(response[4].resourceBody.name, "nice");
assert.equal(response[4].statusCode, 200);
});
it("Check case when cumulative size of all operations is less than threshold", async function () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only operation input is changing in the test, can we combine these tests together?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

*
* @internal
*/
// TODO: Add a public reset() method which will utilize this to reset the state of the streamer to use it again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it future TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

*/
async endStream(): Promise<BulkStreamerResponse> {
if (this.streamEnded) {
throw new ErrorResponse("Bulk streamer has already ended.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we attach some status code too? some 4xx code conveying bad request

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will be http like status code. should we set that for throwing sdk error?

if (this.streamEnded) {
throw new ErrorResponse("Bulk streamer has already ended.");
}
this.streamEnded = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add it after line 136

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this get stuck if user will keep on adding operation after calling endStream while promise are settling? We were trying to avoid this case with streamEnded flag.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants