Skip to content

Commit

Permalink
refactor names and add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditishree . committed Jan 28, 2025
1 parent 7f82e21 commit 0c3aaff
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
14 changes: 7 additions & 7 deletions sdk/cosmosdb/cosmos/src/bulk/BulkStreamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class BulkStreamer {
throw new ErrorResponse("Operation is required.");
}
const partitionKeyRangeId = await this.resolvePartitionKeyRangeId(operation);
const streamerForPartition = this.getOrCreateStreamerForPKRange(partitionKeyRangeId);
const streamerForPartition = this.getStreamerForPKRange(partitionKeyRangeId);
const retryPolicy = this.getRetryPolicy();
const context = new ItemBulkOperationContext(partitionKeyRangeId, retryPolicy);
const itemOperation = new ItemBulkOperation(index, operation, context);
Expand Down Expand Up @@ -192,7 +192,7 @@ export class BulkStreamer {
): Promise<BulkResponse> {
if (!operations.length) return;
const pkRangeId = operations[0].operationContext.pkRangeId;
const limiter = this.getOrCreateLimiterForPKRange(pkRangeId);
const limiter = this.getLimiterForPKRange(pkRangeId);
const path = getPathFromLink(this.container.url, ResourceType.item);
const requestBody: Operation[] = [];
const partitionDefinition = await readPartitionKeyDefinition(diagnosticNode, this.container);
Expand Down Expand Up @@ -239,12 +239,12 @@ export class BulkStreamer {

private async reBatchOperation(operation: ItemBulkOperation): Promise<void> {
const partitionKeyRangeId = await this.resolvePartitionKeyRangeId(operation.operationInput);
operation.operationContext.reRouteOperation(partitionKeyRangeId);
const streamer = this.getOrCreateStreamerForPKRange(partitionKeyRangeId);
operation.operationContext.updatePKRangeId(partitionKeyRangeId);
const streamer = this.getStreamerForPKRange(partitionKeyRangeId);
streamer.add(operation);
}

private getOrCreateLimiterForPKRange(pkRangeId: string): Limiter {
private getLimiterForPKRange(pkRangeId: string): Limiter {
let limiter = this.limitersByPartitionKeyRangeId.get(pkRangeId);
if (!limiter) {
limiter = new Limiter(Constants.BulkMaxDegreeOfConcurrency);
Expand All @@ -257,11 +257,11 @@ export class BulkStreamer {
return limiter;
}

private getOrCreateStreamerForPKRange(pkRangeId: string): BulkStreamerPerPartition {
private getStreamerForPKRange(pkRangeId: string): BulkStreamerPerPartition {
if (this.streamersByPartitionKeyRangeId.has(pkRangeId)) {
return this.streamersByPartitionKeyRangeId.get(pkRangeId);
}
const limiter = this.getOrCreateLimiterForPKRange(pkRangeId);
const limiter = this.getLimiterForPKRange(pkRangeId);
const newStreamer = new BulkStreamerPerPartition(
this.executeRequest,
this.reBatchOperation,
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmosdb/cosmos/src/bulk/ItemBulkOperationContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class ItemBulkOperationContext {
return this.taskCompletionSource.task;
}

reRouteOperation(pkRangeId: string): void {
updatePKRangeId(pkRangeId: string): void {
this.pkRangeId = pkRangeId;
}

Expand Down
28 changes: 27 additions & 1 deletion sdk/cosmosdb/cosmos/src/bulk/Limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,73 @@
// Licensed under the MIT License.
import semaphore from "semaphore";
/**
* Semaphores and locks for execution of Bulk
* Semaphores and locks for execution of Bulk.
* This class controls number of concurrent requests for bulk per partition
* and helps in blocking requests to a partition upon encountering 410 error.
* @hidden
*/
export class Limiter {
// semaphore to control number of concurrent requests for a partition.
private limiter: semaphore.Semaphore;
// flag indicating whether dispatch has been stopped due to 410 error
private dispatchStopped: boolean = false;
// read write lock to safely control access to `dispatchStopped` flag
private readWriteLock: ReadWriteLock;

constructor(capacity: number) {
this.limiter = semaphore(capacity);
this.readWriteLock = new ReadWriteLock();
}

/**
* acquires a slot to execute specified callback
* @param callback - callback function to take the slot
*/
take(callback: () => void): void {
this.limiter.take(() => {
callback();
});
}

/**
* @returns number of currently acquired slots
*/
current(): number {
return this.limiter.current;
}

/**
* releases the specified number of slots
* @param number - number of slots to release
*/
leave(number?: number): void {
this.limiter.leave(number);
}

/**
* checks if we have encountered 410 error during bulk and stopped dispatch
* @returns true if dispatch is stopped
*/
async isStopped(): Promise<boolean> {
await this.readWriteLock.acquireRead();
const stopDispatch = this.dispatchStopped;
this.readWriteLock.releaseRead();
return stopDispatch;
}

/**
* stops dispatching by setting the `dispatchStopped` flag to `true`.
*/
async stopDispatch(): Promise<void> {
await this.readWriteLock.acquireWrite();
this.dispatchStopped = true;
this.readWriteLock.releaseWrite();
}
}

/**
* ReadWriteLock class to manage read and write locks
*/
export class ReadWriteLock {
private readers = 0; // Count of active readers
private writer = false; // Indicates if a writer is active
Expand Down

0 comments on commit 0c3aaff

Please sign in to comment.