Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk enhancement with dynamic congestion control #32567

Open
wants to merge 45 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
bfbfa4b
modified bulk implementation
Jan 3, 2025
5b0432c
remove logs
Jan 3, 2025
19a940a
add bulkExecutionRetryPolicy and update response
Jan 5, 2025
f3cfd89
remove errors
Jan 5, 2025
a8d50fb
rush format
Jan 5, 2025
cc88b23
add congestion control logic
Jan 6, 2025
9984c3c
remove internal tag
Jan 6, 2025
3cfb4da
format
Jan 6, 2025
12f231c
remove executor cache usage
Jan 6, 2025
c4aca61
delete resources from executor and add checks for semaphore
Jan 7, 2025
eb9b281
modify public methods
Jan 11, 2025
2b1a23d
format
Jan 11, 2025
9a31b46
add option to pass a list of operations
Jan 13, 2025
ffc2a07
format
Jan 13, 2025
831336d
fix container leaking in tests
Jan 13, 2025
68bc101
add bulk request handler and fix operation Index
Jan 14, 2025
0206c2e
fix retry error and degree of concurrency
Jan 15, 2025
42a9d5d
Add limiter to stop batches on 410
amanrao23 Jan 15, 2025
2dd3ea4
fix bulk retry
Jan 15, 2025
f10ec1f
refactor addBulkOperations
Jan 15, 2025
9b1cd73
add ReadWriteLock test cases
amanrao23 Jan 15, 2025
4444062
skip bulk split tests temporarily
amanrao23 Jan 15, 2025
d70b07a
fix algo and add unit test
Jan 15, 2025
2e03115
add test cases
Jan 16, 2025
52c0c3e
format
Jan 16, 2025
d11fde7
remove bulkOptions
Jan 16, 2025
81cc3f6
modify api names, fix errors
Jan 19, 2025
1fea1e1
add unit test for bulk execution retry policy
Jan 20, 2025
6a55eb2
add lock on partition metric add method
Jan 20, 2025
b6fc416
reset wait time when increasing concurrency
Jan 20, 2025
4d47f1e
remove congestion timer
Jan 20, 2025
8d38448
modify limiter
Jan 21, 2025
f4e24ca
add doc comments and samples
Jan 21, 2025
ad1b6ca
add comment to response class
Jan 22, 2025
a5aa756
remove bulkStreamer cache
Jan 23, 2025
a0b62e8
refactor names and modify comment
Jan 27, 2025
7f82e21
remove separate operation index, add checks and test
Jan 28, 2025
0c3aaff
refactor names and add comments
Jan 28, 2025
6e68420
address comments
Jan 29, 2025
63ebbbd
add lock on operationIndex
Jan 29, 2025
8c8eb80
add comment for TODO
Jan 29, 2025
a25b5e8
remove redundant tests
Jan 30, 2025
9d89d0c
Merge branch 'main' into bulk-congestion-control
Jan 30, 2025
f415870
format
Jan 30, 2025
028d437
extend operation response and add lock to degree of concurrency
Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
"EntraAuth.ts",
"AlterQueryThroughput.ts",
"Bulk.ts",
"BulkStreamer.ts",
"BulkUpdateWithSproc.ts",
"ChangeFeed.ts",
"ContainerManagement.ts",
Expand Down
22 changes: 22 additions & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ export type BulkPatchOperation = OperationBase & {
id: string;
};

// @public
export class BulkStreamer {
addOperations(operationInput: OperationInput | OperationInput[]): void;
endStream(): Promise<BulkStreamerResponse>;
}

// Warning: (ae-forgotten-export) The symbol "BulkOperationResult" needs to be exported by the entry point index.d.ts
//
// @public
export type BulkStreamerResponse = BulkOperationResult[] & {
diagnostics: CosmosDiagnostics;
};

// @public
export class ChangeFeedIterator<T> {
fetchNext(): Promise<ChangeFeedResponse<Array<T & Resource>>>;
Expand Down Expand Up @@ -564,6 +577,9 @@ export const Constants: {
SDKVersion: string;
CosmosDbDiagnosticLevelEnvVarName: string;
DefaultMaxBulkRequestBodySizeInBytes: number;
MaxBulkOperationsCount: number;
Copy link
Member

@topshot99 topshot99 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for adding bulk in the names? Can't it be just Operations count?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not any specific reason. we are using these constants for bulk, that's why. Should we change these? In .Net, these are named as MaxOperationsInDirectModeBatchRequest and DefaultMaxBulkRequestBodySizeInBytes

BulkTimeoutInMs: number;
BulkMaxDegreeOfConcurrency: number;
Quota: {
CollectionSize: string;
};
Expand Down Expand Up @@ -997,6 +1013,7 @@ export interface ErrorBody {

// @public (undocumented)
export class ErrorResponse extends Error {
constructor(message?: string, code?: number, substatus?: number);
// (undocumented)
[key: string]: any;
// (undocumented)
Expand Down Expand Up @@ -1286,6 +1303,7 @@ export class Items {
// (undocumented)
readonly container: Container;
create<T extends ItemDefinition = any>(body: T, options?: RequestOptions): Promise<ItemResponse<T>>;
getBulkStreamer(options?: RequestOptions): BulkStreamer;
getChangeFeedIterator<T>(changeFeedIteratorOptions?: ChangeFeedIteratorOptions): ChangeFeedPullModelIterator<T>;
query(query: string | SqlQuerySpec, options?: FeedOptions): QueryIterator<any>;
query<T>(query: string | SqlQuerySpec, options?: FeedOptions): QueryIterator<T>;
Expand Down Expand Up @@ -2271,6 +2289,8 @@ export interface StatusCodesType {
// (undocumented)
ENOTFOUND: "ENOTFOUND";
// (undocumented)
FailedDependency: 424;
// (undocumented)
Forbidden: 403;
// (undocumented)
Gone: 410;
Expand All @@ -2279,6 +2299,8 @@ export interface StatusCodesType {
// (undocumented)
MethodNotAllowed: 405;
// (undocumented)
MultiStatus: 207;
// (undocumented)
NoContent: 204;
// (undocumented)
NotFound: 404;
Expand Down
64 changes: 64 additions & 0 deletions sdk/cosmosdb/cosmos/samples-dev/BulkStreamer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

/**
* @summary Demonstrates example of bulk stream operations.
*/

import * as dotenv from "dotenv";
dotenv.config();

import { handleError, finish, logStep } from "./Shared/handleError";
import type { OperationInput } from "@azure/cosmos";
import { CosmosClient } from "@azure/cosmos";

const key = process.env.COSMOS_KEY || "<cosmos key>";
const endpoint = process.env.COSMOS_ENDPOINT || "<cosmos endpoint>";

async function run(): Promise<void> {
const containerId = "bulkStreamerContainer";
const client = new CosmosClient({
key: key,
endpoint: endpoint,
});
const { database } = await client.databases.create({ id: "bulkStreamer db" });
logStep(`Creating multi-partition container '${containerId}' with partition key /key`);
const { container } = await database.containers.create({
id: containerId,
partitionKey: {
paths: ["/key"],
version: 2,
},
throughput: 1000,
});

logStep("Preparing 10 'Create' operations");
const createOperations: OperationInput[] = Array.from({ length: 10 }, (_, index) => ({
operationType: "Create",
resourceBody: {
id: `doc${index + 1}`,
name: `sample${index + 1}`,
key: `${index + 1}`,
},
}));

logStep("Preparing a 'Read' operation for 'doc1'");
const readOperation: OperationInput = { operationType: "Read", id: "doc1", partitionKey: "1" };

logStep(`Getting a Bulk Streamer instance`);
const bulkStreamer = container.items.getBulkStreamer();

// an operation or a list of operations could be provided as input to addOperations
logStep("Adding the list of 'Create' operations to the Bulk Streamer");
bulkStreamer.addOperations(createOperations);
logStep("Adding a single 'Read' operation to the Bulk Streamer...");
bulkStreamer.addOperations(readOperation);

logStep("Ending the bulk stream");
const response = await bulkStreamer.endStream();
console.log("Bulk Response: ", response);

await finish();
}

run().catch(handleError);
62 changes: 62 additions & 0 deletions sdk/cosmosdb/cosmos/samples/v4/javascript/BulkStreamer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

/**
* @summary Demonstrates example of bulk stream operations.
*/

require("dotenv").config();

const { handleError, finish, logStep } = require("./Shared/handleError");
const { CosmosClient } = require("@azure/cosmos");

const key = process.env.COSMOS_KEY || "<cosmos key>";
const endpoint = process.env.COSMOS_ENDPOINT || "<cosmos endpoint>";

async function run() {
const containerId = "bulkStreamerContainer";
const client = new CosmosClient({
key: key,
endpoint: endpoint,
});
const { database } = await client.databases.create({ id: "bulkStreamer db" });
logStep(`Creating multi-partition container '${containerId}' with partition key /key`);
const { container } = await database.containers.create({
id: containerId,
partitionKey: {
paths: ["/key"],
version: 2,
},
throughput: 1000,
});

logStep("Preparing 10 'Create' operations");
const createOperations = Array.from({ length: 10 }, (_, index) => ({
operationType: "Create",
resourceBody: {
id: `doc${index + 1}`,
name: `sample${index + 1}`,
key: `${index + 1}`,
},
}));

logStep("Preparing a 'Read' operation for 'doc1'");
const readOperation = { operationType: "Read", id: "doc1", partitionKey: "1" };

logStep(`Getting a Bulk Streamer instance`);
const bulkStreamer = container.items.getBulkStreamer();

// an operation or a list of operations could be provided as input to addOperations
logStep("Adding the list of 'Create' operations to the Bulk Streamer");
bulkStreamer.addOperations(createOperations);
logStep("Adding a single 'Read' operation to the Bulk Streamer...");
bulkStreamer.addOperations(readOperation);

logStep("Ending the bulk stream");
const response = await bulkStreamer.endStream();
console.log("Bulk Response: ", response);

await finish();
}

run().catch(handleError);
4 changes: 4 additions & 0 deletions sdk/cosmosdb/cosmos/samples/v4/javascript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ These sample programs show how to use the JavaScript client libraries for Azure
| --------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- |
| [AlterQueryThroughput.js][alterquerythroughput] | Updates a container offer to change query throughput. |
| [Bulk.js][bulk] | Shows a simple bulk call with each BulkOperation type. |
| [BulkStreamer.js][bulkstreamer] | Demonstrates example of bulk stream operations. |
| [BulkUpdateWithSproc.js][bulkupdatewithsproc] | Bulk Updates documents with a Stored Procedure. Prefer `container.items().bulk()` to this behavior. |
| [ChangeFeed.js][changefeed] | Demonstrates using a ChangeFeed. |
| [ChangeFeedIterator\ChangeFeedHierarchicalPartitionKey.js][changefeediterator_changefeedhierarchicalpartitionkey] | Demonstrates using a ChangeFeed for a partition key |
Expand All @@ -28,6 +29,7 @@ These sample programs show how to use the JavaScript client libraries for Azure
| [IndexManagement.js][indexmanagement] | Shows various ways to manage indexing items or changing container index policies. |
| [ItemManagement.js][itemmanagement] | Demonstrates item creation, read, delete and reading all items belonging to a container. |
| [QueryThroughput.js][querythroughput] | Demonstrates query throughput scenarios. |
| [Query\FullTextSearch.js][query_fulltextsearch] | Demonstrates full text search queries. |
| [SasTokenAuth.js][sastokenauth] | Demonstrates using SasTokens for granting scoped access to Cosmos resources. _Private feature_ |
| [ServerSideScripts.js][serversidescripts] | Demonstrates using stored procedures for server side run functions |

Expand Down Expand Up @@ -73,6 +75,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP

[alterquerythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/AlterQueryThroughput.js
[bulk]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/Bulk.js
[bulkstreamer]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/BulkStreamer.js
[bulkupdatewithsproc]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/BulkUpdateWithSproc.js
[changefeed]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeed.js
[changefeediterator_changefeedhierarchicalpartitionkey]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeedIterator/ChangeFeedHierarchicalPartitionKey.js
Expand All @@ -86,6 +89,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP
[indexmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/IndexManagement.js
[itemmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ItemManagement.js
[querythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/QueryThroughput.js
[query_fulltextsearch]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/Query/FullTextSearch.js
[sastokenauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/SasTokenAuth.js
[serversidescripts]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ServerSideScripts.js
[apiref]: https://learn.microsoft.com/javascript/api/@azure/cosmos
Expand Down
4 changes: 4 additions & 0 deletions sdk/cosmosdb/cosmos/samples/v4/typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ These sample programs show how to use the TypeScript client libraries for Azure
| --------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- |
| [AlterQueryThroughput.ts][alterquerythroughput] | Updates a container offer to change query throughput. |
| [Bulk.ts][bulk] | Shows a simple bulk call with each BulkOperation type. |
| [BulkStreamer.ts][bulkstreamer] | Demonstrates example of bulk stream operations. |
| [BulkUpdateWithSproc.ts][bulkupdatewithsproc] | Bulk Updates documents with a Stored Procedure. Prefer `container.items().bulk()` to this behavior. |
| [ChangeFeed.ts][changefeed] | Demonstrates using a ChangeFeed. |
| [ChangeFeedIterator\ChangeFeedHierarchicalPartitionKey.ts][changefeediterator_changefeedhierarchicalpartitionkey] | Demonstrates using a ChangeFeed for a partition key |
Expand All @@ -28,6 +29,7 @@ These sample programs show how to use the TypeScript client libraries for Azure
| [IndexManagement.ts][indexmanagement] | Shows various ways to manage indexing items or changing container index policies. |
| [ItemManagement.ts][itemmanagement] | Demonstrates item creation, read, delete and reading all items belonging to a container. |
| [QueryThroughput.ts][querythroughput] | Demonstrates query throughput scenarios. |
| [Query\FullTextSearch.ts][query_fulltextsearch] | Demonstrates full text search queries. |
| [SasTokenAuth.ts][sastokenauth] | Demonstrates using SasTokens for granting scoped access to Cosmos resources. _Private feature_ |
| [ServerSideScripts.ts][serversidescripts] | Demonstrates using stored procedures for server side run functions |

Expand Down Expand Up @@ -85,6 +87,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP

[alterquerythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/AlterQueryThroughput.ts
[bulk]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/Bulk.ts
[bulkstreamer]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkStreamer.ts
[bulkupdatewithsproc]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkUpdateWithSproc.ts
[changefeed]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ChangeFeed.ts
[changefeediterator_changefeedhierarchicalpartitionkey]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ChangeFeedIterator/ChangeFeedHierarchicalPartitionKey.ts
Expand All @@ -98,6 +101,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP
[indexmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/IndexManagement.ts
[itemmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ItemManagement.ts
[querythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/QueryThroughput.ts
[query_fulltextsearch]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/Query/FullTextSearch.ts
[sastokenauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/SasTokenAuth.ts
[serversidescripts]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ServerSideScripts.ts
[apiref]: https://learn.microsoft.com/javascript/api/@azure/cosmos
Expand Down
69 changes: 69 additions & 0 deletions sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkStreamer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

/**
* @summary Demonstrates example of bulk stream operations.
*/

import * as dotenv from "dotenv";
dotenv.config();

import { handleError, finish, logStep } from "./Shared/handleError";
import type {
OperationInput,
} from "@azure/cosmos";
import {
CosmosClient
} from "@azure/cosmos";

const key = process.env.COSMOS_KEY || "<cosmos key>";
const endpoint = process.env.COSMOS_ENDPOINT || "<cosmos endpoint>";


async function run(): Promise<void> {
const containerId = "bulkStreamerContainer";
const client = new CosmosClient({
key: key,
endpoint: endpoint,
});
const { database } = await client.databases.create({ id: ("bulkStreamer db") });
logStep(`Creating multi-partition container '${containerId}' with partition key /key`);
const { container } = await database.containers.create({
id: containerId,
partitionKey: {
paths: ["/key"],
version: 2,
},
throughput: 1000,
});

logStep("Preparing 10 'Create' operations")
const createOperations: OperationInput[] = Array.from({ length: 10 }, (_, index) => ({
operationType: "Create",
resourceBody: {
id: `doc${index + 1}`,
name: `sample${index + 1}`,
key: `${index + 1}`,
},
}));

logStep("Preparing a 'Read' operation for 'doc1'")
const readOperation: OperationInput = { operationType: "Read", id: "doc1", partitionKey: "1" };

logStep(`Getting a Bulk Streamer instance`);
const bulkStreamer = container.items.getBulkStreamer();

// an operation or a list of operations could be provided as input to addOperations
logStep("Adding the list of 'Create' operations to the Bulk Streamer");
bulkStreamer.addOperations(createOperations);
logStep("Adding a single 'Read' operation to the Bulk Streamer...")
bulkStreamer.addOperations(readOperation);

logStep("Ending the bulk stream");
const response = await bulkStreamer.endStream();
console.log("Bulk Response: ", response);

await finish();
}

run().catch(handleError);
Loading
Loading