diff --git a/sdk/cosmosdb/cosmos/package.json b/sdk/cosmosdb/cosmos/package.json index c9d56e79809e..65fd12517bc2 100644 --- a/sdk/cosmosdb/cosmos/package.json +++ b/sdk/cosmosdb/cosmos/package.json @@ -131,6 +131,7 @@ "EntraAuth.ts", "AlterQueryThroughput.ts", "Bulk.ts", + "BulkStreamer.ts", "BulkUpdateWithSproc.ts", "ChangeFeed.ts", "ContainerManagement.ts", diff --git a/sdk/cosmosdb/cosmos/review/cosmos.api.md b/sdk/cosmosdb/cosmos/review/cosmos.api.md index 064e375b6e9c..035e79f2e38c 100644 --- a/sdk/cosmosdb/cosmos/review/cosmos.api.md +++ b/sdk/cosmosdb/cosmos/review/cosmos.api.md @@ -41,6 +41,26 @@ export type BulkOperationResponse = OperationResponse[] & { diagnostics: CosmosDiagnostics; }; +// @public +export interface BulkOperationResult extends OperationResponse { + activityId?: string; + diagnostics?: CosmosDiagnostics; + operationInput?: ItemOperation; + retryAfter?: number; + sessionToken?: string; + subStatusCode?: number; +} + +// @public +export class BulkOperations { + static getCreateItemOperation(partitionKey: PartitionKey, resourceBody: JSONObject): ItemOperation; + static getDeleteItemOperation(id: string, partitionKey: PartitionKey): ItemOperation; + static getPatchItemOperation(id: string, partitionKey: PartitionKey, resourceBody: PatchRequestBody): ItemOperation; + static getReadItemOperation(id: string, partitionKey: PartitionKey): ItemOperation; + static getReplaceItemOperation(id: string, partitionKey: PartitionKey, resourceBody: JSONObject): ItemOperation; + static getUpsertItemOperation(partitionKey: PartitionKey, resourceBody: JSONObject): ItemOperation; +} + // @public (undocumented) export const BulkOperationType: { readonly Create: "Create"; @@ -63,6 +83,12 @@ export type BulkPatchOperation = OperationBase & { id: string; }; +// @public +export class BulkStreamer { + dispose(): void; + execute(operationInput: ItemOperation[]): Promise[]; +} + // @public export class ChangeFeedIterator { fetchNext(): Promise>>; @@ -619,6 +645,9 @@ export const Constants: { SDKVersion: string; CosmosDbDiagnosticLevelEnvVarName: string; DefaultMaxBulkRequestBodySizeInBytes: number; + MaxBulkOperationsCount: number; + BulkTimeoutInMs: number; + BulkMaxDegreeOfConcurrency: number; Encryption: { DiagnosticsDecryptOperation: string; DiagnosticsDuration: string; @@ -1136,6 +1165,7 @@ export interface ErrorBody { // @public (undocumented) export class ErrorResponse extends Error { + constructor(message?: string, code?: number, substatus?: number); // (undocumented) [key: string]: any; // (undocumented) @@ -1403,6 +1433,14 @@ export interface ItemDefinition { ttl?: number; } +// @public +export interface ItemOperation { + id?: string; + operationType: string; + partitionKey: PartitionKey; + resourceBody?: JSONObject | PatchRequestBody; +} + // @public (undocumented) export class ItemResponse extends ResourceResponse { constructor(resource: T & Resource, headers: CosmosHeaders, statusCode: number, subsstatusCode: number, item: Item, diagnostics: CosmosDiagnostics); @@ -1425,6 +1463,7 @@ export class Items { // (undocumented) readonly container: Container; create(body: T, options?: RequestOptions): Promise>; + getBulkStreamer(options?: RequestOptions): BulkStreamer; getChangeFeedIterator(changeFeedIteratorOptions?: ChangeFeedIteratorOptions): ChangeFeedPullModelIterator; getEncryptionQueryIterator(queryBuilder: EncryptionQueryBuilder, options?: FeedOptions): Promise>; query(query: string | SqlQuerySpec, options?: FeedOptions): QueryIterator; @@ -2415,6 +2454,8 @@ export interface StatusCodesType { // (undocumented) ENOTFOUND: "ENOTFOUND"; // (undocumented) + FailedDependency: 424; + // (undocumented) Forbidden: 403; // (undocumented) Gone: 410; @@ -2423,6 +2464,8 @@ export interface StatusCodesType { // (undocumented) MethodNotAllowed: 405; // (undocumented) + MultiStatus: 207; + // (undocumented) NoContent: 204; // (undocumented) NotFound: 404; diff --git a/sdk/cosmosdb/cosmos/samples-dev/BulkStreamer.ts b/sdk/cosmosdb/cosmos/samples-dev/BulkStreamer.ts new file mode 100644 index 000000000000..774dbd40b840 --- /dev/null +++ b/sdk/cosmosdb/cosmos/samples-dev/BulkStreamer.ts @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * @summary Demonstrates an example of streamable bulk operation. + */ + +import * as dotenv from "dotenv"; +dotenv.config(); + +import { handleError, finish, logStep } from "./Shared/handleError"; +import { + CosmosClient, + BulkOperations, + ItemOperation, + BulkStreamer, + BulkOperationResult, +} from "@azure/cosmos"; +import assert from "assert"; + +const key = process.env.COSMOS_KEY || ""; +const endpoint = process.env.COSMOS_ENDPOINT || ""; + +async function run(): Promise { + const containerId = "bulkStreamerContainer"; + const client = new CosmosClient({ + key: key, + endpoint: endpoint, + }); + const { database } = await client.databases.create({ id: "bulkStreamer db" }); + logStep(`Creating container '${containerId}' with partition key /key`); + const { container } = await database.containers.create({ + id: containerId, + partitionKey: { + paths: ["/key"], + version: 2, + }, + throughput: 3000, + }); + + const totalOperations = 5000; + const chunkSize = 1000; + + logStep("Get instance of bulk streamer"); + const bulkStreamer: BulkStreamer = container.items.getBulkStreamer(); + let operationPromises: Promise[] = []; + + for (let i = 0; i < totalOperations; i += chunkSize) { + const operationsChunk: ItemOperation[] = Array.from({ length: chunkSize }, (_, j) => { + const index = i + j; + return BulkOperations.getCreateItemOperation(`${index + 1}`, { + id: `doc${index + 1}`, + name: `sample${index + 1}`, + key: `${index + 1}`, + }); + }); + logStep(`Adding chunk of ${chunkSize} operations to execute starting at document id ${i + 1}`); + + // Execute chunk and obtain list of promises for each operation. + const executePromises = bulkStreamer.execute(operationsChunk); + operationPromises.push(...executePromises); + // process operation result as it resolves + executePromises.forEach((result) => + result.then((result) => assert.equal(result.statusCode, 201)), + ); + } + + logStep("Wait for all operations to settle..."); + await Promise.allSettled(operationPromises); + + // make sure that all promises are settled before disposing of the bulk streamer + logStep("Dispose of bulk streamer..."); + bulkStreamer.dispose(); + await container.delete(); + await finish(); +} + +run().catch(handleError); diff --git a/sdk/cosmosdb/cosmos/samples/v4/javascript/BulkStreamer.js b/sdk/cosmosdb/cosmos/samples/v4/javascript/BulkStreamer.js new file mode 100644 index 000000000000..5a4389447a2e --- /dev/null +++ b/sdk/cosmosdb/cosmos/samples/v4/javascript/BulkStreamer.js @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * @summary Demonstrates an example of streamable bulk operation. + */ + +require("dotenv").config(); + +const { handleError, finish, logStep } = require("./Shared/handleError"); +const { CosmosClient, BulkOperations } = require("@azure/cosmos"); +const assert = require("assert"); + +const key = process.env.COSMOS_KEY || ""; +const endpoint = process.env.COSMOS_ENDPOINT || ""; + +async function run() { + const containerId = "bulkStreamerContainer"; + const client = new CosmosClient({ + key: key, + endpoint: endpoint, + }); + const { database } = await client.databases.create({ id: "bulkStreamer db" }); + logStep(`Creating container '${containerId}' with partition key /key`); + const { container } = await database.containers.create({ + id: containerId, + partitionKey: { + paths: ["/key"], + version: 2, + }, + throughput: 3000, + }); + + const totalOperations = 5000; + const chunkSize = 1000; + + logStep("Get instance of bulk streamer"); + const bulkStreamer = container.items.getBulkStreamer(); + let operationPromises = []; + + for (let i = 0; i < totalOperations; i += chunkSize) { + const operationsChunk = Array.from({ length: chunkSize }, (_, j) => { + const index = i + j; + return BulkOperations.getCreateItemOperation(`${index + 1}`, { + id: `doc${index + 1}`, + name: `sample${index + 1}`, + key: `${index + 1}`, + }); + }); + logStep(`Adding chunk of ${chunkSize} operations to execute starting at document id ${i + 1}`); + + // Execute chunk and obtain list of promises for each operation. + const executePromises = bulkStreamer.execute(operationsChunk); + operationPromises.push(...executePromises); + // process operation result as it resolves + executePromises.forEach((result) => + result.then((result) => assert.equal(result.statusCode, 201)), + ); + } + + logStep("Wait for all operations to settle..."); + await Promise.allSettled(operationPromises); + + // make sure that all promises are settled before disposing of the bulk streamer + logStep("Dispose of bulk streamer..."); + bulkStreamer.dispose(); + await container.delete(); + await finish(); +} + +run().catch(handleError); diff --git a/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeed.js b/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeed.js index d58971b67899..af0095cfda70 100644 --- a/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeed.js +++ b/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeed.js @@ -48,20 +48,32 @@ async function run() { console.log(` ✨✨✨ Change Feed Samples ✨✨✨ + + + There are 4 scenarios for change feed: 1. Start from a specific continuation 2. Start from a specific point in time 3. Start from the beginning 4. Start from now + + + All 4 scenarios will eventually catch up to each other if read for long enough + + + In this sample, we expect the scenario to see the following items, by id: 1. [3] 2. [2, 3] 3. [1, 2, 3] 4. [] + + + After we've read to this point, if we insert a new item id 4, we expect all of them to see it, since they will all be caught up. `); diff --git a/sdk/cosmosdb/cosmos/samples/v4/javascript/README.md b/sdk/cosmosdb/cosmos/samples/v4/javascript/README.md index 34f99bfd7f00..4d1a46f17a43 100644 --- a/sdk/cosmosdb/cosmos/samples/v4/javascript/README.md +++ b/sdk/cosmosdb/cosmos/samples/v4/javascript/README.md @@ -15,6 +15,7 @@ These sample programs show how to use the JavaScript client libraries for Azure | --------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- | | [AlterQueryThroughput.js][alterquerythroughput] | Updates a container offer to change query throughput. | | [Bulk.js][bulk] | Shows a simple bulk call with each BulkOperation type. | +| [BulkStreamer.js][bulkstreamer] | Demonstrates an example of streamable bulk operation. | | [BulkUpdateWithSproc.js][bulkupdatewithsproc] | Bulk Updates documents with a Stored Procedure. Prefer `container.items().bulk()` to this behavior. | | [ChangeFeed.js][changefeed] | Demonstrates using a ChangeFeed. | | [ChangeFeedIterator\ChangeFeedHierarchicalPartitionKey.js][changefeediterator_changefeedhierarchicalpartitionkey] | Demonstrates using a ChangeFeed for a partition key | @@ -29,6 +30,7 @@ These sample programs show how to use the JavaScript client libraries for Azure | [IndexManagement.js][indexmanagement] | Shows various ways to manage indexing items or changing container index policies. | | [ItemManagement.js][itemmanagement] | Demonstrates item creation, read, delete and reading all items belonging to a container. | | [QueryThroughput.js][querythroughput] | Demonstrates query throughput scenarios. | +| [Query\FullTextSearch.js][query_fulltextsearch] | Demonstrates full text search queries. | | [SasTokenAuth.js][sastokenauth] | Demonstrates using SasTokens for granting scoped access to Cosmos resources. _Private feature_ | | [ServerSideScripts.js][serversidescripts] | Demonstrates using stored procedures for server side run functions | @@ -74,11 +76,13 @@ Take a look at our [API Documentation][apiref] for more information about the AP [alterquerythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/AlterQueryThroughput.js [bulk]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/Bulk.js +[bulkstreamer]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/BulkStreamer.js [bulkupdatewithsproc]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/BulkUpdateWithSproc.js [changefeed]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeed.js [changefeediterator_changefeedhierarchicalpartitionkey]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeedIterator/ChangeFeedHierarchicalPartitionKey.js [changefeediterator_changefeediteratorallversionsanddeletes]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeedIterator/ChangeFeedIteratorAllVersionsAndDeletes.js [changefeediterator_changefeediteratorlatestversion]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ChangeFeedIterator/ChangeFeedIteratorLatestVersion.js +[clientsideencryption]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ClientSideEncryption.js [containermanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ContainerManagement.js [databasemanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/DatabaseManagement.js [diagnostics]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/Diagnostics.js @@ -87,6 +91,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP [indexmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/IndexManagement.js [itemmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ItemManagement.js [querythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/QueryThroughput.js +[query_fulltextsearch]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/Query/FullTextSearch.js [sastokenauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/SasTokenAuth.js [serversidescripts]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/javascript/ServerSideScripts.js [apiref]: https://learn.microsoft.com/javascript/api/@azure/cosmos diff --git a/sdk/cosmosdb/cosmos/samples/v4/typescript/README.md b/sdk/cosmosdb/cosmos/samples/v4/typescript/README.md index 5705aae52c9b..11f8f97530b1 100644 --- a/sdk/cosmosdb/cosmos/samples/v4/typescript/README.md +++ b/sdk/cosmosdb/cosmos/samples/v4/typescript/README.md @@ -15,6 +15,7 @@ These sample programs show how to use the TypeScript client libraries for Azure | --------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- | | [AlterQueryThroughput.ts][alterquerythroughput] | Updates a container offer to change query throughput. | | [Bulk.ts][bulk] | Shows a simple bulk call with each BulkOperation type. | +| [BulkStreamer.ts][bulkstreamer] | Demonstrates an example of streamable bulk operation. | | [BulkUpdateWithSproc.ts][bulkupdatewithsproc] | Bulk Updates documents with a Stored Procedure. Prefer `container.items().bulk()` to this behavior. | | [ChangeFeed.ts][changefeed] | Demonstrates using a ChangeFeed. | | [ChangeFeedIterator\ChangeFeedHierarchicalPartitionKey.ts][changefeediterator_changefeedhierarchicalpartitionkey] | Demonstrates using a ChangeFeed for a partition key | @@ -29,6 +30,7 @@ These sample programs show how to use the TypeScript client libraries for Azure | [IndexManagement.ts][indexmanagement] | Shows various ways to manage indexing items or changing container index policies. | | [ItemManagement.ts][itemmanagement] | Demonstrates item creation, read, delete and reading all items belonging to a container. | | [QueryThroughput.ts][querythroughput] | Demonstrates query throughput scenarios. | +| [Query\FullTextSearch.ts][query_fulltextsearch] | Demonstrates full text search queries. | | [SasTokenAuth.ts][sastokenauth] | Demonstrates using SasTokens for granting scoped access to Cosmos resources. _Private feature_ | | [ServerSideScripts.ts][serversidescripts] | Demonstrates using stored procedures for server side run functions | @@ -86,11 +88,13 @@ Take a look at our [API Documentation][apiref] for more information about the AP [alterquerythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/AlterQueryThroughput.ts [bulk]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/Bulk.ts +[bulkstreamer]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkStreamer.ts [bulkupdatewithsproc]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkUpdateWithSproc.ts [changefeed]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ChangeFeed.ts [changefeediterator_changefeedhierarchicalpartitionkey]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ChangeFeedIterator/ChangeFeedHierarchicalPartitionKey.ts [changefeediterator_changefeediteratorallversionsanddeletes]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ChangeFeedIterator/ChangeFeedIteratorAllVersionsAndDeletes.ts [changefeediterator_changefeediteratorlatestversion]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ChangeFeedIterator/ChangeFeedIteratorLatestVersion.ts +[clientsideencryption]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ClientSideEncryption.ts [containermanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ContainerManagement.ts [databasemanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/DatabaseManagement.ts [diagnostics]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/Diagnostics.ts @@ -99,6 +103,7 @@ Take a look at our [API Documentation][apiref] for more information about the AP [indexmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/IndexManagement.ts [itemmanagement]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ItemManagement.ts [querythroughput]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/QueryThroughput.ts +[query_fulltextsearch]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/Query/FullTextSearch.ts [sastokenauth]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/SasTokenAuth.ts [serversidescripts]: https://github.com/Azure/azure-sdk-for-js/blob/main/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ServerSideScripts.ts [apiref]: https://learn.microsoft.com/javascript/api/@azure/cosmos diff --git a/sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkStreamer.ts b/sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkStreamer.ts new file mode 100644 index 000000000000..774dbd40b840 --- /dev/null +++ b/sdk/cosmosdb/cosmos/samples/v4/typescript/src/BulkStreamer.ts @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * @summary Demonstrates an example of streamable bulk operation. + */ + +import * as dotenv from "dotenv"; +dotenv.config(); + +import { handleError, finish, logStep } from "./Shared/handleError"; +import { + CosmosClient, + BulkOperations, + ItemOperation, + BulkStreamer, + BulkOperationResult, +} from "@azure/cosmos"; +import assert from "assert"; + +const key = process.env.COSMOS_KEY || ""; +const endpoint = process.env.COSMOS_ENDPOINT || ""; + +async function run(): Promise { + const containerId = "bulkStreamerContainer"; + const client = new CosmosClient({ + key: key, + endpoint: endpoint, + }); + const { database } = await client.databases.create({ id: "bulkStreamer db" }); + logStep(`Creating container '${containerId}' with partition key /key`); + const { container } = await database.containers.create({ + id: containerId, + partitionKey: { + paths: ["/key"], + version: 2, + }, + throughput: 3000, + }); + + const totalOperations = 5000; + const chunkSize = 1000; + + logStep("Get instance of bulk streamer"); + const bulkStreamer: BulkStreamer = container.items.getBulkStreamer(); + let operationPromises: Promise[] = []; + + for (let i = 0; i < totalOperations; i += chunkSize) { + const operationsChunk: ItemOperation[] = Array.from({ length: chunkSize }, (_, j) => { + const index = i + j; + return BulkOperations.getCreateItemOperation(`${index + 1}`, { + id: `doc${index + 1}`, + name: `sample${index + 1}`, + key: `${index + 1}`, + }); + }); + logStep(`Adding chunk of ${chunkSize} operations to execute starting at document id ${i + 1}`); + + // Execute chunk and obtain list of promises for each operation. + const executePromises = bulkStreamer.execute(operationsChunk); + operationPromises.push(...executePromises); + // process operation result as it resolves + executePromises.forEach((result) => + result.then((result) => assert.equal(result.statusCode, 201)), + ); + } + + logStep("Wait for all operations to settle..."); + await Promise.allSettled(operationPromises); + + // make sure that all promises are settled before disposing of the bulk streamer + logStep("Dispose of bulk streamer..."); + bulkStreamer.dispose(); + await container.delete(); + await finish(); +} + +run().catch(handleError); diff --git a/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ClientSideEncryption.ts b/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ClientSideEncryption.ts index 58988d92a847..19d00f88fc0f 100644 --- a/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ClientSideEncryption.ts +++ b/sdk/cosmosdb/cosmos/samples/v4/typescript/src/ClientSideEncryption.ts @@ -49,7 +49,7 @@ async function run() { // We can set encryption key time to live in hours (EncryptionTimeToLive.FromHours), // minutes (EncryptionTimeToLive.FromMinutes), and with no ttl (EncryptiontimeToLive.NoTTL) encryptionKeyTimeToLive: EncryptionTimeToLive.FromMinutes(10), - } + }, }); logStep("Create database and client encryption key"); diff --git a/sdk/cosmosdb/cosmos/src/ClientContext.ts b/sdk/cosmosdb/cosmos/src/ClientContext.ts index 72b7fbd8db17..b0bb2fbcbd70 100644 --- a/sdk/cosmosdb/cosmos/src/ClientContext.ts +++ b/sdk/cosmosdb/cosmos/src/ClientContext.ts @@ -39,6 +39,7 @@ import { DefaultDiagnosticFormatter } from "./diagnostics/DiagnosticFormatter"; import { CosmosDbDiagnosticLevel } from "./diagnostics/CosmosDbDiagnosticLevel"; import { randomUUID } from "@azure/core-util"; import { getUserAgent } from "./common/platform"; +import type { RetryOptions } from "./retry/retryOptions"; const logger: AzureLogger = createClientLogger("ClientContext"); const QueryJsonContentType = "application/query+json"; @@ -91,6 +92,7 @@ export class ClientContext { } this.initializeDiagnosticSettings(diagnosticLevel); } + /** @hidden */ public async read({ path, @@ -1001,4 +1003,11 @@ export class ClientContext { this.cosmosClientOptions.defaultHeaders[Constants.HttpHeaders.CustomUserAgent] = updatedUserAgent; } + + /** + * @internal + */ + public getRetryOptions(): RetryOptions { + return this.connectionPolicy.retryOptions; + } } diff --git a/sdk/cosmosdb/cosmos/src/bulk/BulkBatcher.ts b/sdk/cosmosdb/cosmos/src/bulk/BulkBatcher.ts new file mode 100644 index 000000000000..5ba1cee3421a --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/BulkBatcher.ts @@ -0,0 +1,181 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { DiagnosticNodeInternal, DiagnosticNodeType } from "../diagnostics/DiagnosticNodeInternal"; +import type { RequestOptions } from "../request"; +import { ErrorResponse } from "../request"; +import { Constants, StatusCodes } from "../common"; +import type { ExecuteCallback, RetryCallback } from "../utils/batch"; +import { calculateObjectSizeInBytes, isSuccessStatusCode } from "../utils/batch"; +import type { BulkResponse } from "./BulkResponse"; +import type { ItemBulkOperation } from "./ItemBulkOperation"; +import type { BulkPartitionMetric } from "./BulkPartitionMetric"; +import { getCurrentTimestampInMs } from "../utils/time"; +import type { Limiter } from "./Limiter"; +import type { CosmosDbDiagnosticLevel } from "../diagnostics/CosmosDbDiagnosticLevel"; +import type { EncryptionProcessor } from "../encryption/EncryptionProcessor"; + +/** + * Maintains a batch of operations and dispatches it as a unit of work. + * Execution of the request is done by the @see {@link ExecuteCallback} and retry is done by the @see {@link RetryCallback}. + * @hidden + */ + +export class BulkBatcher { + private batchOperationsList: ItemBulkOperation[]; + private currentSize: number; + private toBeDispatched: boolean; + private readonly executor: ExecuteCallback; + private readonly retrier: RetryCallback; + private readonly options: RequestOptions; + private readonly diagnosticLevel: CosmosDbDiagnosticLevel; + private readonly encryptionEnabled: boolean; + private readonly encryptionProcessor: EncryptionProcessor; + + constructor( + private limiter: Limiter, + executor: ExecuteCallback, + retrier: RetryCallback, + options: RequestOptions, + diagnosticLevel: CosmosDbDiagnosticLevel, + encryptionEnabled: boolean, + encryptionProcessor: EncryptionProcessor, + ) { + this.batchOperationsList = []; + this.executor = executor; + this.retrier = retrier; + this.options = options; + this.diagnosticLevel = diagnosticLevel; + this.encryptionEnabled = encryptionEnabled; + this.encryptionProcessor = encryptionProcessor; + this.currentSize = 0; + this.toBeDispatched = false; + } + + /** + * Attempts to add an operation to the current batch. + * Returns false if the batch is full or already dispatched. + */ + public tryAdd(operation: ItemBulkOperation): boolean { + if (this.toBeDispatched) { + return false; + } + if (!operation) { + throw new ErrorResponse("Operation is not defined"); + } + if (!operation.operationContext) { + throw new ErrorResponse("Operation context is not defined"); + } + if (this.batchOperationsList.length === Constants.MaxBulkOperationsCount) { + return false; + } + const currentOperationSize = calculateObjectSizeInBytes(operation); + if ( + this.batchOperationsList.length > 0 && + this.currentSize + currentOperationSize > Constants.DefaultMaxBulkRequestBodySizeInBytes + ) { + return false; + } + + this.currentSize += currentOperationSize; + this.batchOperationsList.push(operation); + return true; + } + + public isEmpty(): boolean { + return this.batchOperationsList.length === 0; + } + + /** + * Dispatches the current batch of operations. + * Handles retries for failed operations and updates the ordered response. + */ + public async dispatch(partitionMetric: BulkPartitionMetric): Promise { + this.toBeDispatched = true; + const startTime = getCurrentTimestampInMs(); + const diagnosticNode = new DiagnosticNodeInternal( + this.diagnosticLevel, + DiagnosticNodeType.BATCH_REQUEST, + null, + ); + try { + const response: BulkResponse = await this.executor( + this.batchOperationsList, + this.options, + diagnosticNode, + ); + // status code of 0 represents an empty response, + // we are sending this back from executor in case of 410 error + if (response.statusCode === 0) { + return; + } + const hasThrottles = 1; + const noThrottle = 0; + const numThrottle = response.results.some( + (result) => result.statusCode === StatusCodes.TooManyRequests, + ) + ? hasThrottles + : noThrottle; + const splitOrMerge = response.results.some((result) => result.statusCode === StatusCodes.Gone) + ? true + : false; + if (splitOrMerge) { + await this.limiter.stopDispatch(); + } + partitionMetric.add( + this.batchOperationsList.length, + getCurrentTimestampInMs() - startTime, + numThrottle, + ); + + for (let i = 0; i < response.operations.length; i++) { + const operation = response.operations[i]; + const bulkOperationResult = response.results[i]; + if (!isSuccessStatusCode(bulkOperationResult.statusCode)) { + const errorResponse = new ErrorResponse( + null, + bulkOperationResult.statusCode, + bulkOperationResult.subStatusCode, + ); + errorResponse.retryAfterInMs = bulkOperationResult.retryAfter; + const shouldRetry = await operation.operationContext.retryPolicy.shouldRetry( + errorResponse, + diagnosticNode, + ); + if (shouldRetry) { + await this.retrier(operation, diagnosticNode); + continue; + } + } + operation.operationContext.addDiagnosticChild(diagnosticNode); + try { + if (this.encryptionEnabled && bulkOperationResult.resourceBody) { + bulkOperationResult.resourceBody = await this.encryptionProcessor.decrypt( + bulkOperationResult.resourceBody, + operation.operationContext.diagnosticNode, + ); + } + } catch (error) { + // if decryption fails after successful write operation, fail the operation with internal server error + if ( + bulkOperationResult.operationInput.operationType !== "Read" && + bulkOperationResult.operationInput.operationType !== "Delete" + ) { + const errorResponse = new ErrorResponse(error.message, StatusCodes.InternalServerError); + operation.operationContext.fail(errorResponse); + } + } + operation.operationContext.complete(bulkOperationResult); + } + } catch (error) { + // Mark all operations in the batch as failed + for (const operation of this.batchOperationsList) { + operation.operationContext.addDiagnosticChild(diagnosticNode); + operation.operationContext.fail(error); + } + } finally { + // Clean up batch state + this.batchOperationsList = []; + } + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/BulkCongestionAlgorithm.ts b/sdk/cosmosdb/cosmos/src/bulk/BulkCongestionAlgorithm.ts new file mode 100644 index 000000000000..5016db1dc0b0 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/BulkCongestionAlgorithm.ts @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +import { Constants } from "../common"; +import type { BulkPartitionMetric } from "./BulkPartitionMetric"; +import type { Limiter } from "./Limiter"; +/** + * This class implements a congestion control algorithm which dynamically adjusts the degree + * of concurrency based on the throttling and number of processed items. + * It uses the @see {@link BulkPartitionMetric} to capture the metrics. + * @hidden + */ + +export class BulkCongestionAlgorithm { + // The semaphore to control the degree of concurrency. + private limiter: Limiter; + // captures metrics upto previous requests for a partition. + private oldPartitionMetric: BulkPartitionMetric; + // captures metrics upto current request for a partition. + private partitionMetric: BulkPartitionMetric; + // time to wait before adjusting the degree of concurrency. + private congestionWaitTimeInMs: number = 1000; + private congestionIncreaseFactor: number = 1; + private congestionDecreaseFactor: number = 5; + private currentDegreeOfConcurrency: number; + + constructor( + limiter: Limiter, + partitionMetric: BulkPartitionMetric, + oldPartitionMetric: BulkPartitionMetric, + ) { + this.limiter = limiter; + this.oldPartitionMetric = oldPartitionMetric; + this.partitionMetric = partitionMetric; + this.currentDegreeOfConcurrency = 1; + } + + run(): void { + const elapsedTimeInMs = + this.partitionMetric.timeTakenInMs - this.oldPartitionMetric.timeTakenInMs; + if (elapsedTimeInMs >= this.congestionWaitTimeInMs) { + const diffThrottle = + this.partitionMetric.numberOfThrottles - this.oldPartitionMetric.numberOfThrottles; + const changeItemsCount = + this.partitionMetric.numberOfItemsOperatedOn - + this.oldPartitionMetric.numberOfItemsOperatedOn; + + this.oldPartitionMetric.add(changeItemsCount, elapsedTimeInMs, diffThrottle); + // if the number of throttles increased, decrease the degree of concurrency. + if (diffThrottle > 0) { + this.decreaseConcurrency(); + } + // if there's no throttling and the number of items processed increased, increase the degree of concurrency. + if (changeItemsCount > 0 && diffThrottle === 0) { + this.increaseConcurrency(); + } + } + } + + private decreaseConcurrency(): void { + // decrease should not lead the degree of concurrency as 0. + const decreaseCount = Math.min( + this.congestionDecreaseFactor, + Math.floor(this.currentDegreeOfConcurrency / 2), + ); + // block permits + for (let i = 0; i < decreaseCount; i++) { + this.limiter.take(() => {}); + } + + this.currentDegreeOfConcurrency -= decreaseCount; + // In case of throttling increase the wait time to adjust the degree of concurrency. + this.congestionWaitTimeInMs += 1000; + } + + private increaseConcurrency(): void { + if ( + this.currentDegreeOfConcurrency + this.congestionIncreaseFactor <= + Constants.BulkMaxDegreeOfConcurrency + ) { + if (this.limiter.current() > 0) { + this.limiter.leave(this.congestionIncreaseFactor); + } + this.currentDegreeOfConcurrency += this.congestionIncreaseFactor; + } + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/BulkOperations.ts b/sdk/cosmosdb/cosmos/src/bulk/BulkOperations.ts new file mode 100644 index 000000000000..a25daeaa2058 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/BulkOperations.ts @@ -0,0 +1,161 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import type { PartitionKey } from "../documents"; +import type { JSONObject } from "../queryExecutionContext"; +import { ErrorResponse } from "../request/ErrorResponse"; +import { BulkOperationType } from "../utils/batch"; +import type { PatchRequestBody } from "../utils/patch"; +import type { ItemOperation } from "./ItemOperation"; + +/** + * Utility class for creating bulk operations. + */ +// eslint-disable-next-line @typescript-eslint/no-extraneous-class +export class BulkOperations { + /** + * Creates an input object for create operation in bulk execution. + * + * @param resourceBody - The JSON object representing the resource to be created. + * @param partitionKey - The partition key associated with the resource. + * @returns An object representing the create operation input for bulk execution. + */ + static getCreateItemOperation( + partitionKey: PartitionKey, + resourceBody: JSONObject, + ): ItemOperation { + if (resourceBody == null) { + throw new ErrorResponse("resourceBody cannot be null or undefined"); + } + if (partitionKey == null) { + throw new ErrorResponse("partitionKey cannot be null or undefined"); + } + if (resourceBody.id == null) { + throw new ErrorResponse("resourceBody.id cannot be null or undefined"); + } + return { + operationType: BulkOperationType.Create, + partitionKey: partitionKey, + resourceBody: resourceBody, + }; + } + + /** + * Generates an input object for upsert operation in bulk operations. + * + * @param partitionKey - The partition key associated with the resource. + * @param resourceBody - The JSON object representing the resource to be upserted. + * @returns An object representing the upsert operation input. + */ + static getUpsertItemOperation( + partitionKey: PartitionKey, + resourceBody: JSONObject, + ): ItemOperation { + if (resourceBody == null) { + throw new ErrorResponse("resourceBody cannot be null or undefined"); + } + if (partitionKey == null) { + throw new ErrorResponse("partitionKey cannot be null or undefined"); + } + if (resourceBody.id == null) { + throw new ErrorResponse("resourceBody.id cannot be null or undefined"); + } + return { + operationType: BulkOperationType.Upsert, + partitionKey: partitionKey, + resourceBody: resourceBody, + }; + } + + /** + * Generates an input object for read operation in bulk operations. + * + * @param id - The ID of the resource to read. + * @param partitionKey - The partition key associated with the resource. + * @returns An object representing the read operation input. + */ + static getReadItemOperation(id: string, partitionKey: PartitionKey): ItemOperation { + return { + operationType: BulkOperationType.Read, + id: id, + partitionKey: partitionKey, + }; + } + + /** + * Generates an input object for delete operation in bulk operations. + * + * @param id - The ID of the resource to delete. + * @param partitionKey - The partition key associated with the resource. + * @returns An object representing the delete operation input. + */ + static getDeleteItemOperation(id: string, partitionKey: PartitionKey): ItemOperation { + if (partitionKey == null) { + throw new ErrorResponse("partitionKey cannot be null or undefined"); + } + if (id == null) { + throw new ErrorResponse("id cannot be null or undefined"); + } + return { + operationType: BulkOperationType.Delete, + id: id, + partitionKey: partitionKey, + }; + } + + /** + * Generates an input object for replace operation in bulk operations. + * + * @param id - The ID of the resource to replace. + * @param partitionKey - The partition key associated with the resource. + * @param resourceBody - The JSON object representing the resource to replace. + * @returns An object representing the replace operation input. + */ + static getReplaceItemOperation( + id: string, + partitionKey: PartitionKey, + resourceBody: JSONObject, + ): ItemOperation { + if (resourceBody == null) { + throw new ErrorResponse("resourceBody cannot be null or undefined"); + } + if (partitionKey == null) { + throw new ErrorResponse("partitionKey cannot be null or undefined"); + } + if (resourceBody.id == null) { + throw new ErrorResponse("resourceBody.id cannot be null or undefined"); + } + return { + operationType: BulkOperationType.Replace, + id: id, + partitionKey: partitionKey, + resourceBody: resourceBody, + }; + } + + /** + * Generates an input object for patch operation in bulk operations. + * + * @param id - The ID of the resource to patch. + * @param partitionKey - The partition key associated with the resource. + * @param resourceBody - Patch request body @see {@link PatchRequestBody} + * @returns An object representing the patch operation input.**/ + static getPatchItemOperation( + id: string, + partitionKey: PartitionKey, + resourceBody: PatchRequestBody, + ): ItemOperation { + if (resourceBody == null) { + throw new ErrorResponse("patch request body cannot be null or undefined"); + } + if (partitionKey == null) { + throw new ErrorResponse("partitionKey cannot be null or undefined"); + } + return { + operationType: BulkOperationType.Patch, + id: id, + partitionKey: partitionKey, + resourceBody: resourceBody, + }; + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/BulkPartitionMetric.ts b/sdk/cosmosdb/cosmos/src/bulk/BulkPartitionMetric.ts new file mode 100644 index 000000000000..837bec931f90 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/BulkPartitionMetric.ts @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import semaphore from "semaphore"; + +/** + * Captures the metrics for the requests made for bulk. + */ +export class BulkPartitionMetric { + numberOfItemsOperatedOn: number; + timeTakenInMs: number; + numberOfThrottles: number; + private semaphore: semaphore.Semaphore; + + constructor() { + this.numberOfItemsOperatedOn = 0; + this.timeTakenInMs = 0; + this.numberOfThrottles = 0; + this.semaphore = semaphore(1); + } + + add(numberOfDoc: number, timeTakenInMs: number, numOfThrottles: number): void { + // these operations should be atomic as multiple dispatch could be updating these values + this.semaphore.take(() => { + try { + this.numberOfItemsOperatedOn += numberOfDoc; + this.timeTakenInMs += timeTakenInMs; + this.numberOfThrottles += numOfThrottles; + } finally { + this.semaphore.leave(); + } + }); + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/BulkResponse.ts b/sdk/cosmosdb/cosmos/src/bulk/BulkResponse.ts new file mode 100644 index 000000000000..29661f55dd82 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/BulkResponse.ts @@ -0,0 +1,164 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import type { ClientContext } from "../ClientContext"; +import { Constants, StatusCodes, SubStatusCodes } from "../common"; +import type { CosmosDiagnostics } from "../CosmosDiagnostics"; +import type { CosmosHeaders } from "../queryExecutionContext"; +import type { StatusCode, SubStatusCode, Response } from "../request"; +import type { BulkOperationResult } from "../utils/batch"; +import { isSuccessStatusCode } from "../utils/batch"; +import type { ItemBulkOperation } from "./ItemBulkOperation"; + +/** + * Represents a batch response for bulk request. + * @hidden + */ + +export class BulkResponse { + statusCode: StatusCode; + subStatusCode: SubStatusCode; + headers: CosmosHeaders; + operations: ItemBulkOperation[]; + results: BulkOperationResult[] = []; + diagnostics: CosmosDiagnostics; + + constructor( + statusCode: StatusCode, + subStatusCode: SubStatusCode, + headers: CosmosHeaders, + operations: ItemBulkOperation[], + ) { + this.statusCode = statusCode; + this.subStatusCode = subStatusCode; + this.headers = headers; + this.operations = operations; + } + + /** + * Generate empty response object + */ + static createEmptyResponse( + operations: ItemBulkOperation[], + statusCode: StatusCode, + subStatusCode: SubStatusCode, + headers: CosmosHeaders, + ): BulkResponse { + const bulkResponse = new BulkResponse(statusCode, subStatusCode, headers, operations); + bulkResponse.createAndPopulateResults(operations, 0); + return bulkResponse; + } + + /** + * static method to create BulkResponse from Response object + */ + static fromResponseMessage( + responseMessage: Response, + operations: ItemBulkOperation[], + clientContext: ClientContext, + ): BulkResponse { + // Create and populate the response object + let bulkResponse = this.populateFromResponse(responseMessage, operations, clientContext); + + if (!bulkResponse.results || bulkResponse.results.length !== operations.length) { + // Server should be guaranteeing number of results equal to operations when + // batch request is successful - so fail as InternalServerError if this is not the case. + if (isSuccessStatusCode(responseMessage.code)) { + bulkResponse = new BulkResponse( + StatusCodes.InternalServerError, + SubStatusCodes.Unknown, + responseMessage.headers, + operations, + ); + } + + // When the overall response status code is TooManyRequests, propagate the RetryAfter into the individual operations. + let retryAfterMilliseconds = 0; + + if (responseMessage.code === StatusCodes.TooManyRequests) { + const retryAfter = responseMessage.headers?.[Constants.HttpHeaders.RetryAfterInMs]; + retryAfterMilliseconds = !retryAfter || isNaN(Number(retryAfter)) ? 0 : Number(retryAfter); + } + + bulkResponse.createAndPopulateResults(operations, retryAfterMilliseconds, clientContext); + } + + return bulkResponse; + } + + private static populateFromResponse( + responseMessage: Response, + operations: ItemBulkOperation[], + clientContext: ClientContext, + ): BulkResponse { + const results: BulkOperationResult[] = []; + + if (responseMessage.result) { + for (let i = 0; i < operations.length; i++) { + const itemResponse = responseMessage.result[i]; + const result: BulkOperationResult = { + statusCode: itemResponse?.statusCode, + subStatusCode: itemResponse?.subStatusCode ?? SubStatusCodes.Unknown, + eTag: itemResponse?.eTag, + retryAfter: itemResponse.retryAfterMilliseconds ?? 0, + activityId: responseMessage.headers?.[Constants.HttpHeaders.ActivityId], + sessionToken: responseMessage.headers?.[Constants.HttpHeaders.SessionToken], + requestCharge: itemResponse?.requestCharge, + resourceBody: itemResponse?.resourceBody, + operationInput: operations[i].plainTextOperationInput, + diagnostics: operations[i].operationContext.diagnosticNode.toDiagnostic( + clientContext.getClientConfig(), + ), + }; + results.push(result); + } + } + let statusCode = responseMessage.code; + let subStatusCode = responseMessage.substatus; + + if (responseMessage.code === StatusCodes.MultiStatus) { + for (const result of results) { + if ( + result.statusCode !== StatusCodes.FailedDependency && + result.statusCode >= StatusCodes.BadRequest + ) { + statusCode = result.statusCode; + subStatusCode = result.subStatusCode; + break; + } + } + } + + const bulkResponse = new BulkResponse( + statusCode, + subStatusCode, + responseMessage.headers, + operations, + ); + bulkResponse.results = results; + return bulkResponse; + } + + private createAndPopulateResults( + operations: ItemBulkOperation[], + retryAfterInMs: number, + clientContext?: ClientContext, + ): void { + this.results = operations.map( + (operation): BulkOperationResult => ({ + statusCode: this.statusCode, + subStatusCode: this.subStatusCode, + eTag: this.headers?.[Constants.HttpHeaders.ETag], + retryAfter: retryAfterInMs, + activityId: this.headers?.[Constants.HttpHeaders.ActivityId], + sessionToken: this.headers?.[Constants.HttpHeaders.SessionToken], + requestCharge: this.headers?.[Constants.HttpHeaders.RequestCharge], + resourceBody: undefined, + operationInput: operation.plainTextOperationInput, + diagnostics: clientContext + ? operation.operationContext.diagnosticNode.toDiagnostic(clientContext.getClientConfig()) + : undefined, + }), + ); + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/BulkStreamer.ts b/sdk/cosmosdb/cosmos/src/bulk/BulkStreamer.ts new file mode 100644 index 000000000000..fe2c3ad28334 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/BulkStreamer.ts @@ -0,0 +1,358 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { readPartitionKeyDefinition } from "../client/ClientUtils"; +import type { Container } from "../client/Container"; +import type { ClientContext } from "../ClientContext"; +import type { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; +import { DiagnosticNodeType } from "../diagnostics/DiagnosticNodeInternal"; +import { ErrorResponse, type RequestOptions } from "../request"; +import type { PartitionKeyRangeCache } from "../routing"; +import type { BulkOperationResult, Operation } from "../utils/batch"; +import { BulkOperationType, isKeyInRange } from "../utils/batch"; +import { hashPartitionKey } from "../utils/hashing/hash"; +import { ResourceThrottleRetryPolicy } from "../retry"; +import { BulkStreamerPerPartition } from "./BulkStreamerPerPartition"; +import { ItemBulkOperationContext } from "./ItemBulkOperationContext"; +import { Constants, copyObject, getPathFromLink, ResourceType } from "../common"; +import { BulkResponse } from "./BulkResponse"; +import type { ItemBulkOperation } from "./ItemBulkOperation"; +import { addDignosticChild, withDiagnostics } from "../utils/diagnostics"; +import { BulkExecutionRetryPolicy } from "../retry/bulkExecutionRetryPolicy"; +import type { RetryPolicy } from "../retry/RetryPolicy"; +import { Limiter } from "./Limiter"; +import { convertToInternalPartitionKey, type PartitionKeyDefinition } from "../documents"; +import type { ItemOperation } from "./ItemOperation"; +import type { PatchOperation } from "../utils/patch"; + +/** + * BulkStreamer for bulk operations in a container. + * It maintains one @see {@link BulkStreamerPerPartition} for each Partition Key Range, which allows independent execution of requests. Semaphores are in place to rate limit the operations + * at the Streamer / Partition Key Range level, this means that we can send parallel and independent requests to different Partition Key Ranges, but for the same Range, requests + * will be limited. Two callback implementations define how a particular request should be executed, and how operations should be retried. When the streamer dispatches a batch + * the batch will create a request and call the execute callback (executeRequest), if conditions are met, it might call the retry callback (reBatchOperation). + * @hidden + */ + +export class BulkStreamer { + private readonly container: Container; + private readonly clientContext: ClientContext; + private readonly partitionKeyRangeCache: PartitionKeyRangeCache; + private readonly streamersByPartitionKeyRangeId: Map; + private readonly limitersByPartitionKeyRangeId: Map; + private options: RequestOptions; + private partitionKeyDefinition: PartitionKeyDefinition; + private partitionKeyDefinitionPromise: Promise; + private isCancelled: boolean; + + /** + * @internal + */ + constructor( + container: Container, + clientContext: ClientContext, + partitionKeyRangeCache: PartitionKeyRangeCache, + options: RequestOptions, + ) { + this.container = container; + this.clientContext = clientContext; + this.partitionKeyRangeCache = partitionKeyRangeCache; + this.streamersByPartitionKeyRangeId = new Map(); + this.limitersByPartitionKeyRangeId = new Map(); + this.options = options; + this.executeRequest = this.executeRequest.bind(this); + this.reBatchOperation = this.reBatchOperation.bind(this); + this.isCancelled = false; + } + + /** + * adds operation(s) to the streamer + * @param operationInput - bulk operation or list of bulk operations + */ + execute(operationInput: ItemOperation[]): Promise[] { + if (this.isCancelled) { + throw new ErrorResponse("Bulk execution cancelled due to a previous error."); + } + return operationInput.map((operation) => this.addOperation(operation)); + } + + /** + * dispose all the timers, streamers, and limiters + * @returns bulk response + */ + dispose(): void { + for (const streamer of this.streamersByPartitionKeyRangeId.values()) { + streamer.disposeTimers(); + } + this.streamersByPartitionKeyRangeId.clear(); + this.limitersByPartitionKeyRangeId.clear(); + } + + private async addOperation(operation: ItemOperation): Promise { + if (this.isCancelled) { + throw new ErrorResponse("Bulk execution cancelled due to a previous error."); + } + if (!operation) { + throw new ErrorResponse("Operation is required."); + } + return withDiagnostics( + async (diagnosticNode: DiagnosticNodeInternal) => { + if (!this.partitionKeyDefinition) { + if (!this.partitionKeyDefinitionPromise) { + this.partitionKeyDefinitionPromise = (async () => { + try { + const partitionKeyDefinition = await readPartitionKeyDefinition( + diagnosticNode, + this.container, + ); + this.partitionKeyDefinition = partitionKeyDefinition; + return partitionKeyDefinition; + } finally { + this.partitionKeyDefinitionPromise = null; + } + })(); + } + await this.partitionKeyDefinitionPromise; + } + const plainTextOperation = copyObject(operation); + // encrypt operations if encryption is enabled + let operationError: Error; + let partitionKeyRangeId: string; + try { + if (this.clientContext.enableEncryption) { + operation = copyObject(operation); + if (!this.container.isEncryptionInitialized) { + await this.container.initializeEncryption(); + } + this.options.containerRid = this.container._rid; + operation = await this.encryptionHelper(operation, diagnosticNode); + } + partitionKeyRangeId = await this.resolvePartitionKeyRangeId(operation, diagnosticNode); + } catch (error) { + operationError = error; + } + const streamerForPartition = this.getStreamerForPKRange(partitionKeyRangeId); + // TODO: change implementation to add just retry context instead of retry policy in operation context + const retryPolicy = this.getRetryPolicy(); + const context = new ItemBulkOperationContext( + partitionKeyRangeId, + retryPolicy, + diagnosticNode, + ); + const itemOperation: ItemBulkOperation = { + plainTextOperationInput: plainTextOperation, + operationInput: operation, + operationContext: context, + }; + // if there was an error during encryption or resolving pkRangeId, reject the operation + if (operationError) { + context.fail(operationError); + } else { + streamerForPartition.add(itemOperation); + } + return context.operationPromise; + }, + this.clientContext, + DiagnosticNodeType.CLIENT_REQUEST_NODE, + ); + } + + private async encryptionHelper( + operation: ItemOperation, + diagnosticNode: DiagnosticNodeInternal, + ): Promise { + const partitionKeyInternal = convertToInternalPartitionKey(operation.partitionKey); + operation.partitionKey = + await this.container.encryptionProcessor.getEncryptedPartitionKeyValue(partitionKeyInternal); + switch (operation.operationType) { + case BulkOperationType.Create: + case BulkOperationType.Upsert: + operation.resourceBody = await this.container.encryptionProcessor.encrypt( + operation.resourceBody, + diagnosticNode, + ); + break; + case BulkOperationType.Read: + case BulkOperationType.Delete: + operation.id = await this.container.encryptionProcessor.getEncryptedId(operation.id); + break; + case BulkOperationType.Replace: + operation.id = await this.container.encryptionProcessor.getEncryptedId(operation.id); + operation.resourceBody = await this.container.encryptionProcessor.encrypt( + operation.resourceBody, + diagnosticNode, + ); + break; + case BulkOperationType.Patch: { + operation.id = await this.container.encryptionProcessor.getEncryptedId(operation.id); + const body = operation.resourceBody; + const patchRequestBody = (Array.isArray(body) ? body : body.operations) as PatchOperation[]; + for (const patchOperation of patchRequestBody) { + if ("value" in patchOperation) { + patchOperation.value = await this.container.encryptionProcessor.encryptProperty( + patchOperation.path, + patchOperation.value, + ); + } + } + break; + } + } + return operation; + } + + private async resolvePartitionKeyRangeId( + operation: ItemOperation, + diagnosticNode: DiagnosticNodeInternal, + ): Promise { + try { + const partitionKeyRanges = ( + await this.partitionKeyRangeCache.onCollectionRoutingMap(this.container.url, diagnosticNode) + ).getOrderedParitionKeyRanges(); + + const partitionKey = convertToInternalPartitionKey(operation.partitionKey); + + const hashedKey = hashPartitionKey(partitionKey, this.partitionKeyDefinition); + + const matchingRange = partitionKeyRanges.find((range) => + isKeyInRange(range.minInclusive, range.maxExclusive, hashedKey), + ); + + if (!matchingRange) { + throw new Error("No matching partition key range found for the operation."); + } + return matchingRange.id; + } catch (error) { + console.error("Error determining partition key range ID:", error); + throw error; + } + } + + private getRetryPolicy(): RetryPolicy { + const nextRetryPolicy = new ResourceThrottleRetryPolicy(this.clientContext.getRetryOptions()); + return new BulkExecutionRetryPolicy( + this.container, + nextRetryPolicy, + this.partitionKeyRangeCache, + ); + } + + private async executeRequest( + operations: ItemBulkOperation[], + options: RequestOptions, + diagnosticNode: DiagnosticNodeInternal, + ): Promise { + if (this.isCancelled) { + throw new ErrorResponse("Bulk execution cancelled due to a previous error."); + } + if (!operations.length) return; + const pkRangeId = operations[0].operationContext.pkRangeId; + const limiter = this.getLimiterForPKRange(pkRangeId); + const path = getPathFromLink(this.container.url, ResourceType.item); + const requestBody: Operation[] = []; + for (const itemBulkOperation of operations) { + requestBody.push(this.prepareOperation(itemBulkOperation.operationInput)); + } + return new Promise((resolve, reject) => { + limiter.take(async () => { + try { + // Check if any split/merge has happened on other batches belonging to same partition. + // If so, don't send this request, and re-batch the operations. + const stopDispatch = await limiter.isStopped(); + if (stopDispatch) { + operations.map((operation) => { + this.reBatchOperation(operation, diagnosticNode); + }); + // Return empty response as the request is not sent. + return resolve(BulkResponse.createEmptyResponse(operations, 0, 0, {})); + } + const response = await addDignosticChild( + async (childNode: DiagnosticNodeInternal) => + this.clientContext.bulk({ + body: requestBody, + partitionKeyRangeId: pkRangeId, + path: path, + resourceId: this.container.url, + options: options, + diagnosticNode: childNode, + }), + diagnosticNode, + DiagnosticNodeType.BATCH_REQUEST, + ); + return resolve( + BulkResponse.fromResponseMessage(response, operations, this.clientContext), + ); + } catch (error) { + if (this.clientContext.enableEncryption) { + try { + await this.container.throwIfRequestNeedsARetryPostPolicyRefresh(error); + } catch (err) { + this.cancelExecution(); + return reject(err); + } + } + return resolve(BulkResponse.fromResponseMessage(error, operations, this.clientContext)); + } finally { + limiter.leave(); + } + }); + }); + } + + private prepareOperation(operationInput: ItemOperation): Operation { + operationInput.partitionKey = convertToInternalPartitionKey(operationInput.partitionKey); + return { + ...operationInput, + partitionKey: JSON.stringify(operationInput.partitionKey), + } as Operation; + } + + private async reBatchOperation( + operation: ItemBulkOperation, + diagnosticNode: DiagnosticNodeInternal, + ): Promise { + const partitionKeyRangeId = await this.resolvePartitionKeyRangeId( + operation.operationInput, + diagnosticNode, + ); + operation.operationContext.updatePKRangeId(partitionKeyRangeId); + const streamer = this.getStreamerForPKRange(partitionKeyRangeId); + streamer.add(operation); + } + + private getLimiterForPKRange(pkRangeId: string): Limiter { + let limiter = this.limitersByPartitionKeyRangeId.get(pkRangeId); + if (!limiter) { + limiter = new Limiter(Constants.BulkMaxDegreeOfConcurrency); + // starting with degree of concurrency as 1 + for (let i = 1; i < Constants.BulkMaxDegreeOfConcurrency; ++i) { + limiter.take(() => {}); + } + this.limitersByPartitionKeyRangeId.set(pkRangeId, limiter); + } + return limiter; + } + + private cancelExecution(): void { + this.isCancelled = true; + this.dispose(); + } + + private getStreamerForPKRange(pkRangeId: string): BulkStreamerPerPartition { + if (this.streamersByPartitionKeyRangeId.has(pkRangeId)) { + return this.streamersByPartitionKeyRangeId.get(pkRangeId); + } + const limiter = this.getLimiterForPKRange(pkRangeId); + const newStreamer = new BulkStreamerPerPartition( + this.executeRequest, + this.reBatchOperation, + limiter, + this.options, + this.clientContext.diagnosticLevel, + this.clientContext.enableEncryption, + this.container.encryptionProcessor, + ); + this.streamersByPartitionKeyRangeId.set(pkRangeId, newStreamer); + return newStreamer; + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/BulkStreamerPerPartition.ts b/sdk/cosmosdb/cosmos/src/bulk/BulkStreamerPerPartition.ts new file mode 100644 index 000000000000..ac0771bee63d --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/BulkStreamerPerPartition.ts @@ -0,0 +1,153 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { Constants } from "../common"; +import type { ExecuteCallback, RetryCallback } from "../utils/batch"; +import { BulkBatcher } from "./BulkBatcher"; +import semaphore from "semaphore"; +import type { ItemBulkOperation } from "./ItemBulkOperation"; +import type { RequestOptions } from "../request/RequestOptions"; +import { BulkPartitionMetric } from "./BulkPartitionMetric"; +import { BulkCongestionAlgorithm } from "./BulkCongestionAlgorithm"; +import type { Limiter } from "./Limiter"; +import type { CosmosDbDiagnosticLevel } from "../diagnostics/CosmosDbDiagnosticLevel"; +import type { EncryptionProcessor } from "../encryption"; + +/** + * Handles operation queueing and dispatching. Fills batches efficiently and maintains a timer for early dispatching in case of partially-filled batches and to optimize for throughput. + * There is always one batch at a time being filled. Locking is in place to avoid concurrent threads trying to Add operations while the timer might be Dispatching the current batch. + * The current batch is dispatched and a new one is readied to be filled by new operations, the dispatched batch runs independently through a fire and forget pattern. + * @hidden + */ + +export class BulkStreamerPerPartition { + private readonly executor: ExecuteCallback; + private readonly retrier: RetryCallback; + private readonly options: RequestOptions; + + private currentBatcher: BulkBatcher; + private readonly lock: semaphore.Semaphore; + private dispatchTimer: NodeJS.Timeout; + private limiterSemaphore: Limiter; + + private readonly oldPartitionMetric: BulkPartitionMetric; + private readonly partitionMetric: BulkPartitionMetric; + private readonly congestionControlAlgorithm: BulkCongestionAlgorithm; + private congestionControlTimer: NodeJS.Timeout; + private readonly congestionControlDelayInMs: number = 100; + private readonly diagnosticLevel: CosmosDbDiagnosticLevel; + private readonly encryptionEnabled: boolean; + private readonly encryptionProcessor: EncryptionProcessor; + + constructor( + executor: ExecuteCallback, + retrier: RetryCallback, + limiter: Limiter, + options: RequestOptions, + diagnosticLevel: CosmosDbDiagnosticLevel, + encryptionEnabled: boolean, + encryptionProcessor: EncryptionProcessor, + ) { + this.executor = executor; + this.retrier = retrier; + this.limiterSemaphore = limiter; + this.options = options; + this.diagnosticLevel = diagnosticLevel; + this.encryptionEnabled = encryptionEnabled; + this.encryptionProcessor = encryptionProcessor; + this.oldPartitionMetric = new BulkPartitionMetric(); + this.partitionMetric = new BulkPartitionMetric(); + this.congestionControlAlgorithm = new BulkCongestionAlgorithm( + this.limiterSemaphore, + this.partitionMetric, + this.oldPartitionMetric, + ); + this.currentBatcher = this.createBulkBatcher(); + + this.lock = semaphore(1); + this.runDispatchTimer(); + this.runCongestionControlTimer(); + } + + /** + * adds a bulk operation to current batcher and dispatches if batch is full + * @param operation - operation to add + */ + add(operation: ItemBulkOperation): void { + let toDispatch: BulkBatcher; + this.lock.take(() => { + try { + // attempt to add operation until it fits in the current batch for the streamer + while (!this.currentBatcher.tryAdd(operation)) { + toDispatch = this.getBatchToDispatchAndCreate(); + } + } finally { + this.lock.leave(); + } + }); + + if (toDispatch) { + // dispatch with fire and forget. No need to wait for the dispatch to complete. + toDispatch.dispatch(this.partitionMetric); + } + } + + /** + * @returns the batch to be dispatched and creates a new one + */ + private getBatchToDispatchAndCreate(): BulkBatcher { + if (this.currentBatcher.isEmpty()) return null; + const previousBatcher = this.currentBatcher; + this.currentBatcher = this.createBulkBatcher(); + return previousBatcher; + } + + private createBulkBatcher(): BulkBatcher { + return new BulkBatcher( + this.limiterSemaphore, + this.executor, + this.retrier, + this.options, + this.diagnosticLevel, + this.encryptionEnabled, + this.encryptionProcessor, + ); + } + + /** + * Initializes a timer to periodically dispatch partially-filled batches. + */ + private runDispatchTimer(): void { + this.dispatchTimer = setInterval(() => { + let toDispatch: BulkBatcher; + try { + this.lock.take(() => { + toDispatch = this.getBatchToDispatchAndCreate(); + }); + } finally { + this.lock.leave(); + } + if (toDispatch) { + toDispatch.dispatch(this.partitionMetric); + } + }, Constants.BulkTimeoutInMs); + } + + private runCongestionControlTimer(): void { + this.congestionControlTimer = setInterval(() => { + this.congestionControlAlgorithm.run(); + }, this.congestionControlDelayInMs); + } + + /** + * Dispose the active timers after bulk is complete. + */ + disposeTimers(): void { + if (this.dispatchTimer) { + clearInterval(this.dispatchTimer); + } + if (this.congestionControlTimer) { + clearInterval(this.congestionControlTimer); + } + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/ItemBulkOperation.ts b/sdk/cosmosdb/cosmos/src/bulk/ItemBulkOperation.ts new file mode 100644 index 000000000000..80aabd3a7b31 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/ItemBulkOperation.ts @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import type { ItemBulkOperationContext } from "./ItemBulkOperationContext"; +import type { ItemOperation } from "./ItemOperation"; + +/** + * Represents an operation and its context on an item which will be executed as part of a batch request. + * @hidden + */ + +export interface ItemBulkOperation { + // stores unenecrypted operationInput to avoid decryption of operationInput in bulk response. + plainTextOperationInput: ItemOperation; + operationInput: ItemOperation; + operationContext: ItemBulkOperationContext; +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/ItemBulkOperationContext.ts b/sdk/cosmosdb/cosmos/src/bulk/ItemBulkOperationContext.ts new file mode 100644 index 000000000000..e226828bca2a --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/ItemBulkOperationContext.ts @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; +import type { RetryPolicy } from "../retry/RetryPolicy"; +import type { BulkOperationResult } from "../utils/batch"; +import { TaskCompletionSource } from "../utils/batch"; + +/** + * Context for a particular @see {@link ItemBulkOperation}. + * @hidden + */ +export class ItemBulkOperationContext { + pkRangeId: string; + retryPolicy: RetryPolicy; + diagnosticNode: DiagnosticNodeInternal; + private readonly taskCompletionSource: TaskCompletionSource; + + constructor(pkRangeId: string, retryPolicy: RetryPolicy, diagnosticNode: DiagnosticNodeInternal) { + this.pkRangeId = pkRangeId; + this.retryPolicy = retryPolicy; + this.diagnosticNode = diagnosticNode; + this.taskCompletionSource = new TaskCompletionSource(); + } + + public get operationPromise(): Promise { + return this.taskCompletionSource.task; + } + + addDiagnosticChild(diagnosticNode: DiagnosticNodeInternal): void { + this.diagnosticNode.children.push(diagnosticNode); + } + + updatePKRangeId(pkRangeId: string): void { + this.pkRangeId = pkRangeId; + } + + complete(result: BulkOperationResult): void { + this.taskCompletionSource.setResult(result); + } + + fail(error: Error): void { + this.taskCompletionSource.setException(error); + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/ItemOperation.ts b/sdk/cosmosdb/cosmos/src/bulk/ItemOperation.ts new file mode 100644 index 000000000000..ae14414cbd9c --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/ItemOperation.ts @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import type { PartitionKey } from "../documents"; +import type { JSONObject } from "../queryExecutionContext"; +import type { PatchRequestBody } from "../utils/patch"; + +/** + * Represents an item operation in bulk execution. + */ +export interface ItemOperation { + /** + * The operation type. + */ + operationType: string; + /** + * The partition key associated with the resource. + */ + partitionKey: PartitionKey; + /** + * The ID of the resource. + */ + id?: string; + /** + * The resource body. + */ + resourceBody?: JSONObject | PatchRequestBody; +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/Limiter.ts b/sdk/cosmosdb/cosmos/src/bulk/Limiter.ts new file mode 100644 index 000000000000..2f40c2d7508e --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/Limiter.ts @@ -0,0 +1,193 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +import semaphore from "semaphore"; +/** + * Semaphores and locks for execution of Bulk. + * This class controls number of concurrent requests for bulk per partition + * and helps in blocking requests to a partition upon encountering 410 error. + * @hidden + */ +export class Limiter { + // semaphore to control number of concurrent requests for a partition. + private limiter: semaphore.Semaphore; + // flag indicating whether dispatch has been stopped due to 410 error + private dispatchStopped: boolean = false; + // read write lock to safely control access to `dispatchStopped` flag + private readWriteLock: ReadWriteLock; + + constructor(capacity: number) { + this.limiter = semaphore(capacity); + this.readWriteLock = new ReadWriteLock(); + } + + /** + * acquires a slot to execute specified callback + * @param callback - callback function to take the slot + */ + take(callback: () => void): void { + this.limiter.take(() => { + callback(); + }); + } + + /** + * @returns number of currently acquired slots + */ + current(): number { + return this.limiter.current; + } + + /** + * releases the specified number of slots + * @param number - number of slots to release + */ + leave(number?: number): void { + this.limiter.leave(number); + } + + /** + * checks if we have encountered 410 error during bulk and stopped dispatch + * @returns true if dispatch is stopped + */ + async isStopped(): Promise { + await this.readWriteLock.acquireRead(); + const stopDispatch = this.dispatchStopped; + this.readWriteLock.releaseRead(); + return stopDispatch; + } + + /** + * stops dispatching by setting the `dispatchStopped` flag to `true`. + */ + async stopDispatch(): Promise { + await this.readWriteLock.acquireWrite(); + this.dispatchStopped = true; + this.readWriteLock.releaseWrite(); + } +} + +/** + * ReadWriteLock class to manage read and write locks + */ +export class ReadWriteLock { + private readers = 0; // Count of active readers + private writer = false; // Indicates if a writer is active + private waitingWriters: Array<() => void> = []; // Queue for waiting writers + private waitingReaders: Array<() => void> = []; // Queue for waiting readers + private mutex = semaphore(1); + + /** + * Acquire a shared read lock. + * Allows multiple readers unless a writer is active or waiting. + */ + async acquireRead(): Promise { + return new Promise((resolve) => { + this.mutex.take(() => { + try { + if (!this.writer && this.waitingWriters.length === 0) { + // No writer active or waiting, proceed immediately + this.readers++; + resolve(); + } else { + // Queue this reader + this.waitingReaders.push(() => { + this.readers++; + resolve(); + }); + } + } finally { + this.mutex.leave(); + } + }); + }); + } + + /** + * Release a shared read lock. + */ + releaseRead(): void { + this.mutex.take(() => { + try { + if (this.readers <= 0) { + throw new Error("Cannot release read lock: No active read lock held."); + } + this.readers--; + if (this.readers === 0) { + // Process the next writer or queued readers + this._processNext(); + } + } finally { + this.mutex.leave(); + } + }); + } + + /** + * Acquire an exclusive write lock. + * Blocks all readers and writers until the lock is released. + */ + async acquireWrite(): Promise { + return new Promise((resolve) => { + this.mutex.take(() => { + try { + if (!this.writer && this.readers === 0) { + // No active readers or writers, proceed immediately + this.writer = true; + resolve(); + } else { + // Queue this writer + this.waitingWriters.push(() => { + this.writer = true; + resolve(); + }); + } + } finally { + this.mutex.leave(); + } + }); + }); + } + + /** + * Release an exclusive write lock. + */ + releaseWrite(): void { + this.mutex.take(() => { + try { + if (!this.writer) { + this.mutex.leave(); + throw new Error("Cannot release write lock: No active write lock held."); + } + this.writer = false; + // Process the next writer or queued readers + this._processNext(); + } finally { + this.mutex.leave(); + } + }); + } + + /** + * Internal method to process the next lock request. + * Prioritizes writers over readers + */ + private _processNext(): void { + if (this.waitingWriters.length > 0) { + // Writers take priority + const resolveWriter = this.waitingWriters.shift(); + if (resolveWriter) { + this.writer = true; + resolveWriter(); + } + } else if (this.waitingReaders.length > 0) { + // Allow all queued readers to proceed + while (this.waitingReaders.length > 0) { + const resolveReader = this.waitingReaders.shift(); + if (resolveReader) { + this.readers++; + resolveReader(); + } + } + } + } +} diff --git a/sdk/cosmosdb/cosmos/src/bulk/index.ts b/sdk/cosmosdb/cosmos/src/bulk/index.ts new file mode 100644 index 000000000000..7ce0fd671b32 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/bulk/index.ts @@ -0,0 +1,9 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +export { ItemBulkOperationContext } from "./ItemBulkOperationContext"; +export { ItemBulkOperation } from "./ItemBulkOperation"; +export { BulkResponse } from "./BulkResponse"; +export { BulkOperations } from "./BulkOperations"; +export { BulkStreamer } from "./BulkStreamer"; +export { ItemOperation } from "./ItemOperation"; diff --git a/sdk/cosmosdb/cosmos/src/client/Container/Containers.ts b/sdk/cosmosdb/cosmos/src/client/Container/Containers.ts index 8d0fe674af90..e89d3f15eac9 100644 --- a/sdk/cosmosdb/cosmos/src/client/Container/Containers.ts +++ b/sdk/cosmosdb/cosmos/src/client/Container/Containers.ts @@ -226,7 +226,6 @@ export class Containers { ); } } - const response = await this.clientContext.create({ body, path, @@ -235,13 +234,22 @@ export class Containers { diagnosticNode, options, }); + if (!response || !response.result) { + throw new ErrorResponse("Failed to create container with id: " + body.id); + } + let containerId = response.result.id; + // for AAD containers we need to extract the containerId from result.body + if (!containerId && "body" in response.result && response.result.body) { + containerId = (response.result as any).body.id; + } const ref = new Container( this.database, - response.result.id, + containerId, this.clientContext, this.encryptionManager, response.result._rid, ); + this.clientContext.partitionKeyDefinitionCache[ref.url] = response.result.partitionKey; return new ContainerResponse( response.result, response.headers, diff --git a/sdk/cosmosdb/cosmos/src/client/Item/Items.ts b/sdk/cosmosdb/cosmos/src/client/Item/Items.ts index 9387defddb1d..e8abaa26d092 100644 --- a/sdk/cosmosdb/cosmos/src/client/Item/Items.ts +++ b/sdk/cosmosdb/cosmos/src/client/Item/Items.ts @@ -47,7 +47,7 @@ import type { ChangeFeedIteratorOptions, } from "../../client/ChangeFeed"; import { validateChangeFeedIteratorOptions } from "../../client/ChangeFeed/changeFeedUtils"; -import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal"; +import { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal"; import { DiagnosticNodeType } from "../../diagnostics/DiagnosticNodeInternal"; import { getEmptyCosmosDiagnostics, @@ -62,6 +62,7 @@ import type { EncryptionSqlParameter } from "../../encryption/EncryptionQueryBui import type { Resource } from "../Resource"; import { TypeMarker } from "../../encryption/enums/TypeMarker"; import { EncryptionItemQueryIterator } from "../../encryption/EncryptionItemQueryIterator"; +import { BulkStreamer } from "../../bulk/BulkStreamer"; /** * @hidden @@ -554,6 +555,40 @@ export class Items { }, this.clientContext); } + /** + * provides streamer for bulk operations + * @param options - used for modifying the request + * @returns an instance of bulk streamer + * @example + * ```typescript + * const createOperations: OperationInput[] = [ + * { + * operationType: "Create", + * resourceBody: { id: "doc1", name: "sample", key: "A" } + * }, + * { + * operationType: "Create", + * resourceBody: { id: "doc2", name: "other", key: "A" + * } + * ]; + * const readOperation: OperationInput = { operationType: "Read", id: "doc1", partitionKey: "A" }; + * + * const bulkStreamer = container.items.getBulkStreamer(); + * bulkStreamer.add(createOperations); + * bulkStreamer.add(readOperation); + * const response = await bulkStreamer.endStream(); + * ``` + */ + public getBulkStreamer(options: RequestOptions = {}): BulkStreamer { + const bulkStreamer = new BulkStreamer( + this.container, + this.clientContext, + this.partitionKeyRangeCache, + options, + ); + return bulkStreamer; + } + /** * Execute bulk operations on items. * diff --git a/sdk/cosmosdb/cosmos/src/common/constants.ts b/sdk/cosmosdb/cosmos/src/common/constants.ts index 6ad6e12ddb14..661e3b377b90 100644 --- a/sdk/cosmosdb/cosmos/src/common/constants.ts +++ b/sdk/cosmosdb/cosmos/src/common/constants.ts @@ -226,6 +226,9 @@ export const Constants = { // Bulk Operations DefaultMaxBulkRequestBodySizeInBytes: 220201, + MaxBulkOperationsCount: 100, + BulkTimeoutInMs: 100, + BulkMaxDegreeOfConcurrency: 50, // Encryption Encryption: { diff --git a/sdk/cosmosdb/cosmos/src/common/statusCodes.ts b/sdk/cosmosdb/cosmos/src/common/statusCodes.ts index d94bb3db6b3e..96e11bea766a 100644 --- a/sdk/cosmosdb/cosmos/src/common/statusCodes.ts +++ b/sdk/cosmosdb/cosmos/src/common/statusCodes.ts @@ -10,6 +10,7 @@ export interface StatusCodesType { Created: 201; Accepted: 202; NoContent: 204; + MultiStatus: 207; NotModified: 304; // Client error @@ -23,6 +24,7 @@ export interface StatusCodesType { Gone: 410; PreconditionFailed: 412; RequestEntityTooLarge: 413; + FailedDependency: 424; TooManyRequests: 429; RetryWith: 449; @@ -47,6 +49,7 @@ export const StatusCodes: StatusCodesType = { Created: 201, Accepted: 202, NoContent: 204, + MultiStatus: 207, NotModified: 304, // Client error @@ -60,6 +63,7 @@ export const StatusCodes: StatusCodesType = { Gone: 410, PreconditionFailed: 412, RequestEntityTooLarge: 413, + FailedDependency: 424, TooManyRequests: 429, RetryWith: 449, @@ -89,6 +93,8 @@ export interface SubStatusCodesType { // 410: StatusCodeType_Gone: substatus PartitionKeyRangeGone: 1002; CompletingSplit: 1007; + CompletingPartitionMigration: 1008; + NameCacheIsStale: 1000; // 404: NotFound Substatus ReadSessionNotAvailable: 1002; @@ -96,6 +102,9 @@ export interface SubStatusCodesType { // 403: Forbidden Substatus WriteForbidden: 3; DatabaseAccountNotFound: 1008; + + // 413: Request Entity Too Large Substatus + ResponseSizeExceeded: 3402; } /** @@ -112,10 +121,16 @@ export const SubStatusCodes: SubStatusCodesType = { // 410: StatusCodeType_Gone: substatus PartitionKeyRangeGone: 1002, CompletingSplit: 1007, + CompletingPartitionMigration: 1008, + NameCacheIsStale: 1000, + // 404: NotFound Substatus ReadSessionNotAvailable: 1002, // 403: Forbidden Substatus WriteForbidden: 3, DatabaseAccountNotFound: 1008, + + // 413: Request Entity Too Large Substatus + ResponseSizeExceeded: 3402, }; diff --git a/sdk/cosmosdb/cosmos/src/index.ts b/sdk/cosmosdb/cosmos/src/index.ts index c1f9b731651c..1ae8d251f948 100644 --- a/sdk/cosmosdb/cosmos/src/index.ts +++ b/sdk/cosmosdb/cosmos/src/index.ts @@ -24,6 +24,7 @@ export { DeleteOperationInput, PatchOperationInput, BulkPatchOperation, + BulkOperationResult, } from "./utils/batch"; export { PatchOperation, @@ -158,3 +159,4 @@ export { EncryptionTimeToLive, EncryptionPolicy, } from "./encryption"; +export { BulkStreamer, BulkOperations, ItemOperation } from "./bulk"; diff --git a/sdk/cosmosdb/cosmos/src/request/ErrorResponse.ts b/sdk/cosmosdb/cosmos/src/request/ErrorResponse.ts index 944928f4462f..5d5405904490 100644 --- a/sdk/cosmosdb/cosmos/src/request/ErrorResponse.ts +++ b/sdk/cosmosdb/cosmos/src/request/ErrorResponse.ts @@ -100,4 +100,10 @@ export class ErrorResponse extends Error { retryAfterInMilliseconds?: number; [key: string]: any; diagnostics?: CosmosDiagnostics; + + constructor(message?: string, code?: number, substatus?: number) { + super(message); + this.code = code; + this.substatus = substatus; + } } diff --git a/sdk/cosmosdb/cosmos/src/retry/bulkExecutionRetryPolicy.ts b/sdk/cosmosdb/cosmos/src/retry/bulkExecutionRetryPolicy.ts new file mode 100644 index 000000000000..7b7aecfdd070 --- /dev/null +++ b/sdk/cosmosdb/cosmos/src/retry/bulkExecutionRetryPolicy.ts @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +import type { Container } from "../client"; +import { sleep, StatusCodes, SubStatusCodes } from "../common"; +import type { DiagnosticNodeInternal } from "../diagnostics/DiagnosticNodeInternal"; +import type { ErrorResponse } from "../request"; +import type { PartitionKeyRangeCache } from "../routing"; +import type { RetryPolicy } from "./RetryPolicy"; + +/** + * This class implements the retry policy for bulk operations. + * @hidden + */ +export class BulkExecutionRetryPolicy implements RetryPolicy { + retryAfterInMs: number; + private retriesOn410: number; + private readonly MaxRetriesOn410 = 10; + private readonly SubstatusCodeBatchResponseSizeExceeded = 3402; + nextRetryPolicy: RetryPolicy; + private container: Container; + private partitionKeyRangeCache: PartitionKeyRangeCache; + + constructor( + container: Container, + nextRetryPolicy: RetryPolicy, + partitionKeyRangeCache: PartitionKeyRangeCache, + ) { + this.container = container; + this.nextRetryPolicy = nextRetryPolicy; + this.partitionKeyRangeCache = partitionKeyRangeCache; + this.retriesOn410 = 0; + } + + public async shouldRetry( + err: ErrorResponse, + diagnosticNode: DiagnosticNodeInternal, + ): Promise { + if (!err) { + return false; + } + if (err.code === StatusCodes.Gone) { + this.retriesOn410++; + + if (this.retriesOn410 > this.MaxRetriesOn410) { + return false; + } + if ( + err.substatus === SubStatusCodes.PartitionKeyRangeGone || + err.substatus === SubStatusCodes.CompletingSplit || + err.substatus === SubStatusCodes.CompletingPartitionMigration + ) { + await this.partitionKeyRangeCache.onCollectionRoutingMap( + this.container.url, + diagnosticNode, + true, + ); + return true; + } + if (err.substatus === SubStatusCodes.NameCacheIsStale) { + return true; + } + } + + // API can return 413 which means the response is bigger than 4Mb. + // Operations that exceed the 4Mb limit are returned as 413/3402, while the operations within the 4Mb limit will be 200 + // TODO: better way to handle this error + if ( + err.code === StatusCodes.RequestEntityTooLarge && + err.substatus === this.SubstatusCodeBatchResponseSizeExceeded + ) { + return true; + } + // check for 429 error + let shouldRetryForThrottle = false; + if (err.code === StatusCodes.TooManyRequests) { + const retryResult = await this.nextRetryPolicy.shouldRetry(err, diagnosticNode); + shouldRetryForThrottle = Array.isArray(retryResult) ? retryResult[0] : retryResult; + } + if (shouldRetryForThrottle) { + await sleep(this.nextRetryPolicy.retryAfterInMs); + } + return shouldRetryForThrottle; + } +} diff --git a/sdk/cosmosdb/cosmos/src/retry/retryUtility.ts b/sdk/cosmosdb/cosmos/src/retry/retryUtility.ts index e5149e5cfb9d..797c7a8b8f99 100644 --- a/sdk/cosmosdb/cosmos/src/retry/retryUtility.ts +++ b/sdk/cosmosdb/cosmos/src/retry/retryUtility.ts @@ -129,7 +129,7 @@ export async function execute({ err.substatus === SubStatusCodes.WriteForbidden)) ) { retryPolicy = retryPolicies.endpointDiscoveryRetryPolicy; - } else if (err.code === StatusCodes.TooManyRequests) { + } else if (err.code === StatusCodes.TooManyRequests && !isBulkRequest(requestContext)) { retryPolicy = retryPolicies.resourceThrottleRetryPolicy; } else if ( err.code === StatusCodes.NotFound && @@ -183,3 +183,13 @@ export async function execute({ DiagnosticNodeType.HTTP_REQUEST, ); } + +/** + * @hidden + */ +function isBulkRequest(requestContext: RequestContext): boolean { + return ( + requestContext.operationType === "batch" && + !requestContext.headers[Constants.HttpHeaders.IsBatchAtomic] + ); +} diff --git a/sdk/cosmosdb/cosmos/src/utils/batch.ts b/sdk/cosmosdb/cosmos/src/utils/batch.ts index cb8bdcf68c00..08932bd868b8 100644 --- a/sdk/cosmosdb/cosmos/src/utils/batch.ts +++ b/sdk/cosmosdb/cosmos/src/utils/batch.ts @@ -3,7 +3,7 @@ import type { JSONObject } from "../queryExecutionContext"; import { extractPartitionKeys, undefinedPartitionKey } from "../extractPartitionKey"; -import type { CosmosDiagnostics, RequestOptions } from ".."; +import type { CosmosDiagnostics, DiagnosticNodeInternal, RequestOptions, StatusCode } from ".."; import type { PartitionKey, PartitionKeyDefinition, @@ -15,6 +15,8 @@ import { assertNotUndefined } from "./typeChecks"; import { bodyFromData } from "../request/request"; import { Constants } from "../common/constants"; import { randomUUID } from "@azure/core-util"; +import type { BulkResponse, ItemBulkOperation } from "../bulk"; +import type { ItemOperation } from "../bulk/ItemOperation"; export type Operation = | CreateOperation @@ -34,6 +36,34 @@ export interface Batch { export type BulkOperationResponse = OperationResponse[] & { diagnostics: CosmosDiagnostics }; +/** + * response for a specific operation in streamed bulk operation + */ +export interface BulkOperationResult extends OperationResponse { + /** + * details of status of an operation + */ + subStatusCode?: number; + /** + * activity id related to the operation + */ + activityId?: string; + /** + * session Token assigned to the result + */ + sessionToken?: string; + /** + * in case the operation is rate limited, indicates the time post which a retry can be attempted. + */ + retryAfter?: number; + /** + * represents operation details + */ + operationInput?: ItemOperation; + /** diagnostic details associated with operation */ + diagnostics?: CosmosDiagnostics; +} + export interface OperationResponse { statusCode: number; requestCharge: number; @@ -282,7 +312,8 @@ export function splitBatchBasedOnBodySize(originalBatch: Batch): Batch[] { * @hidden */ export function calculateObjectSizeInBytes(obj: unknown): number { - return new TextEncoder().encode(bodyFromData(obj as any)).length; + return new TextEncoder().encode(bodyFromData(sanitizeObject(obj)) as any).length; + // return new TextEncoder().encode(bodyFromData(obj as any)).length; } export function decorateBatchOperation( @@ -302,3 +333,62 @@ export function decorateBatchOperation( } return operation as Operation; } + +export function isSuccessStatusCode(statusCode: StatusCode): boolean { + return statusCode >= 200 && statusCode <= 299; +} + +export type ExecuteCallback = ( + operations: ItemBulkOperation[], + options: RequestOptions, + diagnosticNode: DiagnosticNodeInternal, +) => Promise; +export type RetryCallback = ( + operation: ItemBulkOperation, + diagnosticNode: DiagnosticNodeInternal, +) => Promise; + +export class TaskCompletionSource { + private readonly promise: Promise; + private resolveFn!: (value: T) => void; + private rejectFn!: (reason?: any) => void; + + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolveFn = resolve; + this.rejectFn = reject; + }); + } + + public get task(): Promise { + return this.promise; + } + + public setResult(value: T): void { + this.resolveFn(value); + } + + public setException(error: Error): void { + this.rejectFn(error); + } +} + +/** + * Removes circular references and unnecessary properties from the object. + * workaround for TypeError: Converting circular structure to JSON + * @internal + */ +function sanitizeObject(obj: any): any { + const seen = new WeakSet(); + return JSON.parse( + JSON.stringify(obj, (key, value) => { + if (typeof value === "object" && value !== null) { + if (seen.has(value)) { + return undefined; // Remove circular references + } + seen.add(value); + } + return key === "diagnosticNode" || key === "retryPolicy" ? undefined : value; // Exclude unnecessary properties + }), + ); +} diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/bulkCongestionAlgorithm.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/bulkCongestionAlgorithm.spec.ts new file mode 100644 index 000000000000..2b346f663ddf --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/bulkCongestionAlgorithm.spec.ts @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import assert from "assert"; +import { BulkCongestionAlgorithm } from "../../../src/bulk/BulkCongestionAlgorithm"; +import { BulkPartitionMetric } from "../../../src/bulk/BulkPartitionMetric"; +import { Limiter } from "../../../src/bulk/Limiter"; + +describe("BulkCongestionAlgorithm", () => { + let limiter: Limiter; + let oldPartitionMetric: BulkPartitionMetric; + let partitionMetric: BulkPartitionMetric; + + beforeEach(() => { + limiter = new Limiter(50); + oldPartitionMetric = new BulkPartitionMetric(); + partitionMetric = new BulkPartitionMetric(); + }); + it("should increase concurrency by 1 when there is no throttling and items are processed", () => { + const itemsCount = 10; // 10 items processed + const timeTakenInMs = 1100; // should be greater than congestionWaitTimeInMs (1000 ms) + const numberOfThrottles = 0; // no throttling + const algorithm = new BulkCongestionAlgorithm(limiter, partitionMetric, oldPartitionMetric); + algorithm["currentDegreeOfConcurrency"] = 1; + partitionMetric.add(itemsCount, timeTakenInMs, numberOfThrottles); + algorithm.run(); + assert.strictEqual(algorithm["currentDegreeOfConcurrency"], 2, "increase factor should be 1"); + }); + + it("should decrease concurrency when there is throttling", () => { + const itemsCount = 10; // 10 items processed + let timeTakenInMs = 1100; // should be greater than congestionWaitTimeInMs (1000 ms) + const numberOfThrottles = 2; // throttling + const algorithm = new BulkCongestionAlgorithm(limiter, partitionMetric, oldPartitionMetric); + algorithm["currentDegreeOfConcurrency"] = 12; + partitionMetric.add(itemsCount, timeTakenInMs, numberOfThrottles); + algorithm.run(); + assert.strictEqual(algorithm["currentDegreeOfConcurrency"], 7, "decrease factor should be 5"); + + // The decrease factor should be min(5, degreeOfConcurrency/2) + timeTakenInMs += 1000; // should be greater than congestionWaitTimeInMs, will increase after throttle (2000 ms) + partitionMetric.add(itemsCount, timeTakenInMs, numberOfThrottles); + algorithm["currentDegreeOfConcurrency"] = 5; + algorithm.run(); + assert.strictEqual(algorithm["currentDegreeOfConcurrency"], 3, "decrease factor should be 2"); + timeTakenInMs += 1000; // should be greater than congestionWaitTimeInMs, will again increase after throttle (3000 ms) + partitionMetric.add(itemsCount, timeTakenInMs, numberOfThrottles); + algorithm.run(); + assert.strictEqual(algorithm["currentDegreeOfConcurrency"], 2, "decrease factor should be 1"); + }); + + it("should not modify degree of concurrency when elapsed time is less than congestionWaitTimeInMs(1000)", () => { + // should not decrease concurrency even if there is throttling + let itemsCount = 10; + const timeTakenInMs = 100; + let numberOfThrottles = 2; + const algorithm = new BulkCongestionAlgorithm(limiter, partitionMetric, oldPartitionMetric); + algorithm["currentDegreeOfConcurrency"] = 10; + partitionMetric.add(itemsCount, timeTakenInMs, numberOfThrottles); + algorithm.run(); + assert.strictEqual(algorithm["currentDegreeOfConcurrency"], 10); + + // should not increase concurrency even if there is no throttling + itemsCount += 10; + numberOfThrottles = 0; + partitionMetric.add(itemsCount, timeTakenInMs, numberOfThrottles); + algorithm.run(); + assert.strictEqual(algorithm["currentDegreeOfConcurrency"], 10); + }); + + it("degree of concurrency should not be less than 1", () => { + const itemsCount = 10; + const timeTakenInMs = 1100; + const numberOfThrottles = 2; + const algorithm = new BulkCongestionAlgorithm(limiter, partitionMetric, oldPartitionMetric); + algorithm["currentDegreeOfConcurrency"] = 1; + partitionMetric.add(itemsCount, timeTakenInMs, numberOfThrottles); + algorithm.run(); + assert.strictEqual(algorithm["currentDegreeOfConcurrency"], 1, "decrease factor should be 0"); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/bulkExecutionRetryPolicy.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/bulkExecutionRetryPolicy.spec.ts new file mode 100644 index 000000000000..8074499c3e6c --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/bulkExecutionRetryPolicy.spec.ts @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import assert from "assert"; +import type { Container } from "../../../src/client/Container/Container"; +import { BulkExecutionRetryPolicy } from "../../../src/retry/bulkExecutionRetryPolicy"; +import { ResourceThrottleRetryPolicy } from "../../../src/retry/resourceThrottleRetryPolicy"; +import type { PartitionKeyRangeCache } from "../../../src/routing"; +import type { RetryOptions } from "../../../src"; +import { ErrorResponse, StatusCodes } from "../../../src"; +import { SubStatusCodes } from "../../../src/common"; + +describe("BulkExecutionRetryPolicy", () => { + let retryPolicy: BulkExecutionRetryPolicy; + let mockPartitionKeyRangeCache: PartitionKeyRangeCache; + let mockContainer: Container; + let calledPartitionkeyRefresh: boolean; + + beforeEach(() => { + mockContainer = {} as Container; + mockPartitionKeyRangeCache = { + onCollectionRoutingMap: async () => { + calledPartitionkeyRefresh = true; + }, + } as unknown as PartitionKeyRangeCache; + retryPolicy = new BulkExecutionRetryPolicy( + mockContainer, + new ResourceThrottleRetryPolicy({}), + mockPartitionKeyRangeCache, + ); + }); + it("shouldRetry returns false if no error is provided", async () => { + const shouldRetryResult = await retryPolicy.shouldRetry(null, {} as any); + assert.strictEqual(shouldRetryResult, false); + }); + it("handles partition key range Gone error", async () => { + const err = new ErrorResponse(null, StatusCodes.Gone, SubStatusCodes.PartitionKeyRangeGone); + // MaxRetriesOn410 is 10 + for (let i = 0; i < 10; i++) { + calledPartitionkeyRefresh = false; + const shouldRetryResult = await retryPolicy.shouldRetry(err, {} as any); + assert.strictEqual(calledPartitionkeyRefresh, true); + assert.strictEqual(shouldRetryResult, true); + } + calledPartitionkeyRefresh = false; + const shouldRetryResult = await retryPolicy.shouldRetry(err, {} as any); + assert.strictEqual(calledPartitionkeyRefresh, false); + assert.strictEqual(shouldRetryResult, false); + }); + + it("handles 413 error", async () => { + const err = new ErrorResponse( + null, + StatusCodes.RequestEntityTooLarge, + SubStatusCodes.ResponseSizeExceeded, + ); + const shouldRetryResult = await retryPolicy.shouldRetry(err, {} as any); + assert.strictEqual(shouldRetryResult, true); + }); + + it("handles throttling error", async () => { + const err = new ErrorResponse(null, StatusCodes.TooManyRequests, null); + err.retryAfterInMs = 5; + const throttlingRetryPolicy = retryPolicy.nextRetryPolicy as ResourceThrottleRetryPolicy; + + // default maxTries is 9 + while (throttlingRetryPolicy.currentRetryAttemptCount < 9) { + const shouldRetryResult = await throttlingRetryPolicy.shouldRetry(err, { + addData: () => {}, + } as any); + assert.strictEqual(throttlingRetryPolicy.retryAfterInMs, 5); + assert.strictEqual(shouldRetryResult, true); + } + const shouldRetryResult = await retryPolicy.shouldRetry(err, {} as any); + assert.strictEqual(shouldRetryResult, false); + }); + + it("handles throttling error with custom policy", async () => { + const err = new ErrorResponse(null, StatusCodes.TooManyRequests, null); + err.retryAfterInMs = 50; + const retryOptions: RetryOptions = { + maxRetryAttemptCount: 5, + fixedRetryIntervalInMilliseconds: 10, + }; + retryPolicy.nextRetryPolicy = new ResourceThrottleRetryPolicy(retryOptions); + const throttlingRetryPolicy = retryPolicy.nextRetryPolicy as ResourceThrottleRetryPolicy; + + while (throttlingRetryPolicy.currentRetryAttemptCount < 5) { + const shouldRetryResult = await throttlingRetryPolicy.shouldRetry(err, { + addData: () => {}, + } as any); + assert.strictEqual(throttlingRetryPolicy.retryAfterInMs, 10); + assert.strictEqual(shouldRetryResult, true); + } + const shouldRetryResult = await retryPolicy.shouldRetry(err, {} as any); + assert.strictEqual(shouldRetryResult, false); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/bulkStreamerPerPartition.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/bulkStreamerPerPartition.spec.ts new file mode 100644 index 000000000000..0287e1eb06a8 --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/bulkStreamerPerPartition.spec.ts @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { Limiter } from "../../../src/bulk/Limiter"; +import type { ExecuteCallback, RetryCallback } from "../../../src/utils/batch"; +import type { BulkResponse, ItemBulkOperation, ItemBulkOperationContext } from "../../../src/bulk"; +import { CosmosDbDiagnosticLevel } from "../../../src"; +import { BulkStreamerPerPartition } from "../../../src/bulk/BulkStreamerPerPartition"; +import assert from "assert"; + +describe("BulkStreamerPerPartition", () => { + const mockExecutor: ExecuteCallback = async () => { + return {} as BulkResponse; + }; + const mockRetrier: RetryCallback = async () => {}; + const limiter = new Limiter(50); + let streamerPerPartition: BulkStreamerPerPartition; + + beforeEach(() => { + streamerPerPartition = new BulkStreamerPerPartition( + mockExecutor, + mockRetrier, + limiter, + {}, + CosmosDbDiagnosticLevel.info, + false, + undefined, + ); + }); + afterEach(() => { + streamerPerPartition.disposeTimers(); + }); + it("dispose should dispose all the timers", async () => { + let dispatchCount = 0; + let congestionCount = 0; + // dispose actual timers started during initialization before setting custom timers + streamerPerPartition.disposeTimers(); + // Set custom timers + streamerPerPartition["dispatchTimer"] = setInterval(() => { + dispatchCount++; + }, 10); + streamerPerPartition["congestionControlTimer"] = setInterval(() => { + congestionCount++; + }, 10); + await new Promise((resolve) => setTimeout(resolve, 100)); + assert.ok(dispatchCount > 0, "dispatchTimer should be running"); + assert.ok(congestionCount > 0, "congestionControlTimer should be running"); + streamerPerPartition.disposeTimers(); + const updatedDispatchCount = dispatchCount; + const updatedCongestionCount = congestionCount; + await new Promise((resolve) => setTimeout(resolve, 100)); + assert.equal(dispatchCount, updatedDispatchCount, "dispatchTimer should have stopped running"); + assert.equal( + congestionCount, + updatedCongestionCount, + "congestionControlTimer should have stopped running", + ); + }); + + it("should add operations to the batch and dispatch when full", () => { + let dispatchCalled = false; + let isFirstCall = true; + // tryAdd will return false in case of full batcher + const batcher = { + tryAdd: () => { + if (isFirstCall) { + isFirstCall = false; + return false; + } + return true; + }, + dispatch: () => { + dispatchCalled = true; + }, + isEmpty: () => false, + }; + streamerPerPartition["currentBatcher"] = batcher as any; + const operation = { + operationContext: {} as ItemBulkOperationContext, + } as unknown as ItemBulkOperation; + streamerPerPartition.add(operation); + assert.ok(dispatchCalled, "dispatch should be called when batcher is full"); + const newBatcher = streamerPerPartition["currentBatcher"]; + assert.notEqual(newBatcher, batcher, "new batcher should be created after dispatch"); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/internal/unit/readWriteLock.spec.ts b/sdk/cosmosdb/cosmos/test/internal/unit/readWriteLock.spec.ts new file mode 100644 index 000000000000..b329389ed7da --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/internal/unit/readWriteLock.spec.ts @@ -0,0 +1,331 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import assert from "assert"; +import { ReadWriteLock } from "../../../src/bulk/Limiter"; + +describe("ReadWriteLock", () => { + let lock: ReadWriteLock; + + beforeEach(() => { + lock = new ReadWriteLock(); + }); + + /** + * Helper function to delay execution for a specified time. + * @param ms - Milliseconds to delay. + */ + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + + it("should allow multiple readers to acquire the lock simultaneously", async () => { + const results: string[] = []; + + const reader1 = async () => { + await lock.acquireRead(); + results.push("reader1 acquired"); + await delay(100); + results.push("reader1 releasing"); + lock.releaseRead(); + }; + + const reader2 = async () => { + await lock.acquireRead(); + results.push("reader2 acquired"); + await delay(100); + results.push("reader2 releasing"); + lock.releaseRead(); + }; + + await Promise.all([reader1(), reader2()]); + + assert.deepStrictEqual(results, [ + "reader1 acquired", + "reader2 acquired", + "reader1 releasing", + "reader2 releasing", + ]); + }); + + it("should allow a writer to acquire the lock exclusively", async () => { + const results: string[] = []; + + const reader = async () => { + await lock.acquireRead(); + results.push("reader acquired"); + await delay(200); + results.push("reader releasing"); + lock.releaseRead(); + }; + + const writer = async () => { + await delay(50); // Ensure writer attempts to acquire after reader + await lock.acquireWrite(); + results.push("writer acquired"); + await delay(100); + results.push("writer releasing"); + lock.releaseWrite(); + }; + + await Promise.all([reader(), writer()]); + + assert.deepStrictEqual(results, [ + "reader acquired", + "reader releasing", + "writer acquired", + "writer releasing", + ]); + }); + + it("writers should have priority over new readers", async () => { + const results: string[] = []; + + const reader1 = async () => { + await lock.acquireRead(); + results.push("reader1 acquired"); + await delay(300); + results.push("reader1 releasing"); + lock.releaseRead(); + }; + + const writer = async () => { + await delay(50); // Writer attempts to acquire after reader1 + await lock.acquireWrite(); + results.push("writer acquired"); + await delay(100); + results.push("writer releasing"); + await lock.releaseWrite(); + }; + + const reader2 = async () => { + await delay(100); // reader2 attempts to acquire while writer is waiting + await lock.acquireRead(); + results.push("reader2 acquired"); + await delay(100); + results.push("reader2 releasing"); + lock.releaseRead(); + }; + + await Promise.all([reader1(), writer(), reader2()]); + + assert.deepStrictEqual(results, [ + "reader1 acquired", + "reader1 releasing", + "writer acquired", + "writer releasing", + "reader2 acquired", + "reader2 releasing", + ]); + }); + + it("writer cannot acquire lock while another writer holds it", async () => { + const results: string[] = []; + + const writer1 = async () => { + await lock.acquireWrite(); + results.push("writer1 acquired"); + await delay(200); + results.push("writer1 releasing"); + lock.releaseWrite(); + }; + + const writer2 = async () => { + await delay(50); // Writer2 attempts to acquire after writer1 + await lock.acquireWrite(); + results.push("writer2 acquired"); + await delay(100); + results.push("writer2 releasing"); + lock.releaseWrite(); + }; + + await Promise.all([writer1(), writer2()]); + + assert.deepStrictEqual(results, [ + "writer1 acquired", + "writer1 releasing", + "writer2 acquired", + "writer2 releasing", + ]); + }); + + it("should release roomEmpty after all readers release the lock", async () => { + const results: string[] = []; + + const reader1 = async () => { + await lock.acquireRead(); + results.push("reader1 acquired"); + await delay(100); + results.push("reader1 releasing"); + lock.releaseRead(); + }; + + const reader2 = async () => { + await lock.acquireRead(); + results.push("reader2 acquired"); + await delay(150); + results.push("reader2 releasing"); + lock.releaseRead(); + }; + + const writer = async () => { + await delay(50); // Writer attempts to acquire after readers + await lock.acquireWrite(); + results.push("writer acquired"); + await delay(100); + results.push("writer releasing"); + lock.releaseWrite(); + }; + + await Promise.all([reader1(), reader2(), writer()]); + + assert.deepStrictEqual(results, [ + "reader1 acquired", + "reader2 acquired", + "reader1 releasing", + "reader2 releasing", + "writer acquired", + "writer releasing", + ]); + }); + + it("should handle releasing write lock properly", async () => { + const results: string[] = []; + + const writer = async () => { + await lock.acquireWrite(); + results.push("writer acquired"); + await delay(100); + results.push("writer releasing"); + lock.releaseWrite(); + }; + + const reader = async () => { + await delay(50); // Attempt to acquire after writer has begun + await lock.acquireRead(); + results.push("reader acquired"); + await delay(100); + results.push("reader releasing"); + lock.releaseRead(); + }; + + await Promise.all([writer(), reader()]); + + assert.deepStrictEqual(results, [ + "writer acquired", + "writer releasing", + "reader acquired", + "reader releasing", + ]); + }); + + it("should prevent new readers from acquiring the lock when a writer is waiting", async () => { + const results: string[] = []; + + const reader1 = async () => { + await lock.acquireRead(); + results.push("reader1 acquired"); + await delay(300); + results.push("reader1 releasing"); + lock.releaseRead(); + }; + + const writer = async () => { + await delay(50); // Writer attempts to acquire after reader1 + await lock.acquireWrite(); + results.push("writer acquired"); + await delay(100); + results.push("writer releasing"); + lock.releaseWrite(); + }; + + const reader2 = async () => { + await delay(100); // reader2 attempts to acquire while writer is waiting + await lock.acquireRead(); + results.push("reader2 acquired"); + await delay(100); + results.push("reader2 releasing"); + lock.releaseRead(); + }; + + await Promise.all([reader1(), writer(), reader2()]); + + assert.deepStrictEqual(results, [ + "reader1 acquired", + "reader1 releasing", + "writer acquired", + "writer releasing", + "reader2 acquired", + "reader2 releasing", + ]); + }); + + it("should allow writer to acquire lock after all readers have released", async () => { + const results: string[] = []; + + const reader1 = async () => { + await lock.acquireRead(); + results.push("reader1 acquired"); + await delay(100); + results.push("reader1 releasing"); + lock.releaseRead(); + }; + + const writer = async () => { + await delay(50); // Writer attempts to acquire after reader1 + await lock.acquireWrite(); + results.push("writer acquired"); + await delay(100); + results.push("writer releasing"); + lock.releaseWrite(); + }; + + await Promise.all([reader1(), writer()]); + + assert.deepStrictEqual(results, [ + "reader1 acquired", + "reader1 releasing", + "writer acquired", + "writer releasing", + ]); + }); + + it("should handle multiple writers correctly", async () => { + const results: string[] = []; + + const writer1 = async () => { + await lock.acquireWrite(); + results.push("writer1 acquired"); + await delay(100); + results.push("writer1 releasing"); + lock.releaseWrite(); + }; + + const writer2 = async () => { + await delay(50); // Writer2 attempts to acquire after writer1 + await lock.acquireWrite(); + results.push("writer2 acquired"); + await delay(100); + results.push("writer2 releasing"); + lock.releaseWrite(); + }; + + const writer3 = async () => { + await delay(75); // Writer3 attempts to acquire after writer2 + await lock.acquireWrite(); + results.push("writer3 acquired"); + await delay(100); + results.push("writer3 releasing"); + lock.releaseWrite(); + }; + + await Promise.all([writer1(), writer2(), writer3()]); + + assert.deepStrictEqual(results, [ + "writer1 acquired", + "writer1 releasing", + "writer2 acquired", + "writer2 releasing", + "writer3 acquired", + "writer3 releasing", + ]); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/public/functional/clientSideEncryption.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/clientSideEncryption.spec.ts index aab800920615..40d745ee3ae3 100644 --- a/sdk/cosmosdb/cosmos/test/public/functional/clientSideEncryption.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/functional/clientSideEncryption.spec.ts @@ -9,6 +9,8 @@ import type { ContainerDefinition, OperationInput, PatchOperation, + BulkStreamer, + BulkOperationResult, } from "../../../src"; import { CosmosClient, @@ -30,6 +32,7 @@ import { ChangeFeedRetentionTimeSpan, PartitionKeyKind, PermissionMode, + BulkOperations, } from "../../../src"; import { masterKey } from "../common/_fakeTestSecrets"; import { endpoint } from "../common/_testConfig"; @@ -411,6 +414,88 @@ describe("ClientSideEncryption", function (this: Suite) { clientWithBulk.dispose(); }); + it("encryption streamer bulk operation", async () => { + const docToCreate = TestDoc.create(); + + const { resource: docToReplace } = await testCreateItem(encryptionContainer); + docToReplace.nonsensitive = randomUUID(); + docToReplace.sensitive_StringFormat = randomUUID(); + + const { resource: docToUpsert } = await testCreateItem(encryptionContainer); + docToUpsert.nonsensitive = randomUUID(); + docToUpsert.sensitive_StringFormat = randomUUID(); + + // doc not created before + const docToUpsert2 = TestDoc.create(); + + const { resource: docToDelete } = await testCreateItem(encryptionContainer); + + const clientWithBulk = new CosmosClient({ + endpoint: endpoint, + key: masterKey, + encryptionPolicy: { + enableEncryption: true, + keyEncryptionKeyResolver: new MockKeyVaultEncryptionKeyResolver(), + encryptionKeyResolverName: testKeyVault, + }, + }); + + const databaseWithBulk = clientWithBulk.database(database.id); + const encryptionContainerWithBulk = databaseWithBulk.container(encryptionContainer.id); + + const operations = [ + BulkOperations.getCreateItemOperation( + docToCreate.PK, + JSON.parse(JSON.stringify(docToCreate)), + ), + BulkOperations.getUpsertItemOperation( + docToUpsert2.PK, + JSON.parse(JSON.stringify(docToUpsert2)), + ), + BulkOperations.getReplaceItemOperation( + docToReplace.id, + docToReplace.PK, + JSON.parse(JSON.stringify(docToReplace)), + ), + BulkOperations.getUpsertItemOperation( + docToUpsert.PK, + JSON.parse(JSON.stringify(docToUpsert)), + ), + { + operationType: BulkOperationType.Delete, + id: docToDelete.id, + partitionKey: docToDelete.PK, + }, + ]; + let bulkStreamer: BulkStreamer; + let response: BulkOperationResult[]; + try { + bulkStreamer = encryptionContainerWithBulk.items.getBulkStreamer(); + response = await Promise.all(bulkStreamer.execute(operations)); + } finally { + bulkStreamer.dispose(); + } + bulkStreamer.dispose(); + operations.forEach((originalOp, index) => { + assert.deepEqual( + response[index].operationInput, + originalOp, + `Expected operationInput at index ${index} to match the original operation`, + ); + }); + assert.equal(StatusCodes.Created, response[0].statusCode); + verifyExpectedDocResponse(docToCreate, response[0].resourceBody); + assert.equal(StatusCodes.Created, response[1].statusCode); + verifyExpectedDocResponse(docToUpsert2, response[1].resourceBody); + assert.equal(StatusCodes.Ok, response[2].statusCode); + verifyExpectedDocResponse(new TestDoc(docToReplace), response[2].resourceBody); + assert.equal(StatusCodes.Ok, response[3].statusCode); + verifyExpectedDocResponse(new TestDoc(docToUpsert), response[3].resourceBody); + assert.equal(StatusCodes.NoContent, response[4].statusCode); + assert.isNotObject(response[4].resourceBody); + clientWithBulk.dispose(); + }); + it("encryption create client encryption key", async () => { let cekId = "anotherCek"; let cmkpath5 = new EncryptionKeyWrapMetadata( @@ -1805,6 +1890,133 @@ describe("ClientSideEncryption", function (this: Suite) { otherClient.dispose(); }); + it("encryptionvalidatepolicyrefreshpostcontainerdeletewithbulkstreamer", async () => { + // create a container with 1st client + let paths = [ + "/sensitive_IntArray", + "/sensitive_NestedObjectFormatL1", + "/sensitive_DoubleFormat", + ].map( + (path) => + new ClientEncryptionIncludedPath( + path, + "key1", + EncryptionType.DETERMINISTIC, + EncryptionAlgorithm.AEAD_AES_256_CBC_HMAC_SHA256, + ), + ); + let encryptionPolicy = new ClientEncryptionPolicy(paths, 2); + let containerProperties: ContainerDefinition = { + id: randomUUID(), + partitionKey: { paths: ["/sensitive_DoubleFormat"] }, + clientEncryptionPolicy: encryptionPolicy, + }; + const encryptionContainerToDelete = (await database.containers.create(containerProperties)) + .container; + await encryptionContainerToDelete.initializeEncryption(); + // create a document with 2nd client on same database and container + const otherClient = new CosmosClient({ + endpoint: endpoint, + key: masterKey, + encryptionPolicy: { + enableEncryption: true, + keyEncryptionKeyResolver: new MockKeyVaultEncryptionKeyResolver(), + encryptionKeyResolverName: testKeyVault, + }, + }); + const otherDatabase = otherClient.database(database.id); + const otherEncryptionContainer = otherDatabase.container(encryptionContainerToDelete.id); + const testDoc = TestDoc.create(); + const createResponse = await otherEncryptionContainer.items.create(testDoc); + assert.equal(StatusCodes.Created, createResponse.statusCode); + verifyExpectedDocResponse(testDoc, createResponse.resource); + // Client 1 Deletes the Container referenced in Client 2 and Recreate with different policy + await database.container(encryptionContainerToDelete.id).delete(); + paths = [ + new ClientEncryptionIncludedPath( + "/sensitive_StringFormat", + "key1", + EncryptionType.DETERMINISTIC, + EncryptionAlgorithm.AEAD_AES_256_CBC_HMAC_SHA256, + ), + new ClientEncryptionIncludedPath( + "/sensitive_BoolFormat", + "key2", + EncryptionType.DETERMINISTIC, + EncryptionAlgorithm.AEAD_AES_256_CBC_HMAC_SHA256, + ), + new ClientEncryptionIncludedPath( + "/PK", + "key2", + EncryptionType.DETERMINISTIC, + EncryptionAlgorithm.AEAD_AES_256_CBC_HMAC_SHA256, + ), + ]; + encryptionPolicy = new ClientEncryptionPolicy(paths, 2); + containerProperties = { + id: encryptionContainerToDelete.id, + partitionKey: { paths: ["/PK"] }, + clientEncryptionPolicy: encryptionPolicy, + }; + await database.containers.create(containerProperties); + try { + await testCreateItem(encryptionContainerToDelete); + assert.fail("create operation should fail"); + } catch (err) { + verifyDiagnostics(err.diagnostics, true, false, 3, 3); + assert.ok( + err.message.includes( + "Operation has failed due to a possible mismatch in Client Encryption Policy configured on the container.", + ), + ); + } + const docToReplace = (await testCreateItem(encryptionContainerToDelete)).resource; + docToReplace.sensitive_StringFormat = "docToBeReplaced"; + const docToUpsert = (await testCreateItem(encryptionContainerToDelete)).resource; + docToUpsert.sensitive_StringFormat = "docToBeUpserted"; + const docToCreate = TestDoc.create(); + docToCreate.PK = "newPK"; + const operations = [ + BulkOperations.getUpsertItemOperation(docToUpsert.PK, docToUpsert), + BulkOperations.getReplaceItemOperation(docToReplace.id, docToReplace.PK, docToReplace), + BulkOperations.getCreateItemOperation( + docToCreate.PK, + JSON.parse(JSON.stringify(docToCreate)), + ), + ]; + let bulkStreamer: BulkStreamer; + try { + bulkStreamer = otherEncryptionContainer.items.getBulkStreamer(); + await Promise.all(bulkStreamer.execute(operations)); + assert.fail("bulk operation should fail"); + } catch (error) { + assert.ok( + error.message.includes( + "Operation has failed due to a possible mismatch in Client Encryption Policy configured on the container.", + ), + ); + } finally { + bulkStreamer.dispose(); + } + // retry bulk operation with 2nd client + let res: BulkOperationResult[]; + try { + bulkStreamer = otherEncryptionContainer.items.getBulkStreamer(); + res = await Promise.all(bulkStreamer.execute(operations)); + } finally { + bulkStreamer.dispose(); + } + assert.equal(StatusCodes.Ok, res[0].statusCode); + assert.equal(StatusCodes.Ok, res[1].statusCode); + assert.equal(StatusCodes.Created, res[2].statusCode); + await verifyItemByRead(encryptionContainerToDelete, docToReplace); + await testCreateItem(encryptionContainerToDelete); + await verifyItemByRead(encryptionContainerToDelete, docToUpsert); + // validate if the right policy was used, by reading them all back + await otherEncryptionContainer.items.readAll().fetchAll(); + otherClient.dispose(); + }); + it("encryption validate policy refresh post database delete", async () => { const mainCLient = new CosmosClient({ endpoint: endpoint, diff --git a/sdk/cosmosdb/cosmos/test/public/functional/item/bulk.item.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/item/bulk.item.spec.ts index 47421b8ef298..7972b698f540 100644 --- a/sdk/cosmosdb/cosmos/test/public/functional/item/bulk.item.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/functional/item/bulk.item.spec.ts @@ -18,7 +18,12 @@ import { StatusCodes, ErrorResponse, } from "../../../../src"; -import { addEntropy, getTestContainer, testForDiagnostics } from "../../common/TestHelpers"; +import { + addEntropy, + getTestContainer, + removeAllDatabases, + testForDiagnostics, +} from "../../common/TestHelpers"; import type { OperationInput } from "../../../../src"; import { BulkOperationType } from "../../../../src"; import { generateOperationOfSize } from "../../../internal/unit/utils/batch.spec"; @@ -33,6 +38,7 @@ describe("test bulk operations", async function () { describe("Check size based splitting of batches", function () { let container: Container; before(async function () { + await removeAllDatabases(); container = await getTestContainer("bulk container", undefined, { partitionKey: { paths: ["/key"], @@ -42,7 +48,9 @@ describe("test bulk operations", async function () { }); }); after(async () => { - await container.database.delete(); + if (container) { + await container.database.delete(); + } }); it("Check case when cumulative size of all operations is less than threshold", async function () { const operations: OperationInput[] = [...Array(10).keys()].map( @@ -98,6 +106,7 @@ describe("test bulk operations", async function () { let replaceItemId: string; let deleteItemId: string; before(async function () { + await removeAllDatabases(); container = await getTestContainer("bulk container", undefined, { partitionKey: { paths: ["/key"], @@ -125,7 +134,9 @@ describe("test bulk operations", async function () { }); }); after(async () => { - await container.database.delete(); + if (container) { + await container.database.delete(); + } }); it("multi partition container handles create, upsert, replace, delete", async function () { const operations = [ @@ -224,6 +235,7 @@ describe("test bulk operations", async function () { let readItemId: string; let replaceItemId: string; before(async function () { + await removeAllDatabases(); container = await getTestContainer("bulk container"); deleteItemId = addEntropy("item2"); readItemId = addEntropy("item2"); @@ -239,6 +251,11 @@ describe("test bulk operations", async function () { class: "2010", }); }); + after(async () => { + if (container) { + await container.database.delete(); + } + }); it("deletes operation with default partition", async function () { const operation: OperationInput = { operationType: BulkOperationType.Delete, @@ -309,7 +326,7 @@ describe("test bulk operations", async function () { await splitContainer.database.delete(); }); - it("container handles Create, Read, Upsert, Delete opertion with partition split", async function () { + it("container handles Create, Read, Upsert, Delete operation with partition split", async function () { const operations = [ { operationType: BulkOperationType.Create, @@ -364,7 +381,9 @@ describe("test bulk operations", async function () { assert.equal(response[3].statusCode, 200); // cleanup - await splitContainer.database.delete(); + if (splitContainer) { + await splitContainer.database.delete(); + } }); async function getSplitContainer(): Promise { @@ -1155,7 +1174,9 @@ describe("test bulk operations", async function () { assert.strictEqual(res.resourceBody.id, "item" + index, "Read Items id should match"); }); // Delete database after use - await container.database.delete(); + if (container) { + await container.database.delete(); + } }); }); }); @@ -1192,7 +1213,9 @@ describe("test bulk operations", async function () { }); }); after(async () => { - await container.database.delete(); + if (container) { + await container.database.delete(); + } }); it("test diagnostics for bulk", async function () { const operations = [ diff --git a/sdk/cosmosdb/cosmos/test/public/functional/item/bulkStreamer.item.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/item/bulkStreamer.item.spec.ts new file mode 100644 index 000000000000..d2ab38a77f30 --- /dev/null +++ b/sdk/cosmosdb/cosmos/test/public/functional/item/bulkStreamer.item.spec.ts @@ -0,0 +1,942 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import assert from "assert"; +import type { + BulkOperationResult, + Container, + ContainerRequest, + PluginConfig, +} from "../../../../src"; +import { + Constants, + CosmosClient, + PatchOperationType, + CosmosDbDiagnosticLevel, + PluginOn, + StatusCodes, + ErrorResponse, + ResourceType, + BulkOperations, +} from "../../../../src"; +import { addEntropy, getTestContainer, removeAllDatabases } from "../../common/TestHelpers"; +import { BulkOperationType } from "../../../../src"; +import type { PartitionKey } from "../../../../src/documents"; +import { PartitionKeyDefinitionVersion, PartitionKeyKind } from "../../../../src/documents"; +import { endpoint } from "../../common/_testConfig"; +import { masterKey } from "../../common/_fakeTestSecrets"; +import type { Response } from "../../../../src/request/Response"; +import type { ItemOperation } from "../../../../src/bulk/ItemOperation"; +import { calculateObjectSizeInBytes } from "../../../../src/utils/batch"; +import { randomUUID } from "@azure/core-util"; + +const operationSkeleton = { + operationType: BulkOperationType.Create, + resourceBody: { + value: "", + id: "", + partitionKey: "", + }, +}; + +const constantSize = calculateObjectSizeInBytes(operationSkeleton); + +function generateItemOperationOfSize( + sizeInBytes: number, + id: string, + partitionKey: PartitionKey, +): ItemOperation { + if (sizeInBytes < constantSize) { + throw new Error(`Not possible to generate operation of size less than ${constantSize}`); + } + let sizeToAdd = sizeInBytes - constantSize; + if (partitionKey !== undefined) { + sizeToAdd -= calculateObjectSizeInBytes({ partitionKey }) + calculateObjectSizeInBytes({}); + sizeToAdd -= calculateObjectSizeInBytes({ id }); + } + return BulkOperations.getCreateItemOperation(partitionKey, { + id: id, + value: new Array(sizeToAdd + 1).join("a"), + key: partitionKey, + }); +} +async function getSplitContainer(): Promise { + let numpkRangeRequests = 0; + const plugins: PluginConfig[] = [ + { + on: PluginOn.request, + plugin: async (context, _diagNode, next) => { + if (context.resourceType === ResourceType.pkranges) { + let response: Response; + if (numpkRangeRequests === 0) { + response = { + headers: {}, + result: { + PartitionKeyRanges: [ + { + _rid: "RRsbAKHytdECAAAAAAAAUA==", + id: "1", + _etag: '"00000000-0000-0000-683c-819a242201db"', + minInclusive: "", + maxExclusive: "FF", + }, + ], + }, + }; + response.code = 200; + numpkRangeRequests++; + return response; + } + numpkRangeRequests++; + } + const res = await next(context); + return res; + }, + }, + ]; + + const client = new CosmosClient({ + key: masterKey, + endpoint, + diagnosticLevel: CosmosDbDiagnosticLevel.debug, + plugins, + }); + const splitContainer = await getTestContainer("split container", client, { + partitionKey: { paths: ["/key"] }, + }); + return splitContainer; +} + +describe("BulkStreamer", async function () { + describe("v1 container", async function () { + describe("multi partition container", async function () { + let container: Container; + let readItemId: string; + let replaceItemId: string; + let deleteItemId: string; + let patchItemId: string; + before(async function () { + await removeAllDatabases(); + container = await getTestContainer("bulk container", undefined, { + partitionKey: { + paths: ["/key"], + version: undefined, + }, + throughput: 25100, + }); + }); + after(async () => { + if (container) { + await container.database.delete(); + } + }); + + it("handles bulk CRUD operations with BulkOperations utility class", async function () { + readItemId = addEntropy("item1"); + await container.items.create({ + id: readItemId, + key: "A", + class: "2010", + }); + deleteItemId = addEntropy("item2"); + await container.items.create({ + id: deleteItemId, + key: "A", + class: "2010", + }); + replaceItemId = addEntropy("item3"); + await container.items.create({ + id: replaceItemId, + key: 5, + class: "2010", + }); + patchItemId = addEntropy("item4"); + await container.items.create({ + id: patchItemId, + key: 5, + class: "2010", + }); + const createOperation = BulkOperations.getCreateItemOperation("A", { + id: addEntropy("doc1"), + name: "sample", + key: "A", + }); + const upsertOperation = BulkOperations.getUpsertItemOperation("A", { + id: addEntropy("doc2"), + name: "other", + key: "A", + }); + const readOperation = BulkOperations.getReadItemOperation(readItemId, "A"); + const deleteOperation = BulkOperations.getDeleteItemOperation(deleteItemId, "A"); + const replaceOperation = BulkOperations.getReplaceItemOperation(replaceItemId, 5, { + id: replaceItemId, + name: "nice", + key: 5, + }); + const patchOperation = BulkOperations.getPatchItemOperation(patchItemId, 5, { + operations: [{ op: PatchOperationType.add, path: "/great", value: "goodValue" }], + }); + + const operations: ItemOperation[] = [ + createOperation, + upsertOperation, + readOperation, + deleteOperation, + replaceOperation, + patchOperation, + ]; + const bulkStreamer = container.items.getBulkStreamer(); + const promises = bulkStreamer.execute(operations); + const response = await Promise.all(promises); + bulkStreamer.dispose(); + assert.equal(response.length, 6); + // Create + assert.equal(response[0].resourceBody.name, "sample"); + assert.equal(response[0].statusCode, 201); + // Upsert + assert.equal(response[1].resourceBody.name, "other"); + assert.equal(response[1].statusCode, 201); + // Read + assert.equal(response[2].resourceBody.class, "2010"); + assert.equal(response[2].statusCode, 200); + // Delete + assert.equal(response[3].statusCode, 204); + // Replace + assert.equal(response[4].resourceBody.name, "nice"); + assert.equal(response[4].statusCode, 200); + // Patch + assert.equal(response[5].resourceBody.great, "goodValue"); + assert.equal(response[5].statusCode, 200); + }); + it("handle different batch sizes", async function () { + container = await getTestContainer("bulk container", undefined, { + partitionKey: { paths: ["/key"], version: 2 }, + throughput: 25100, + }); + const cases = [ + // cumulative size of all operations is less than threshold + { count: 10, size: 200 }, + // cumulative size is 5x greater than threshold + { count: 10, size: Math.floor(Constants.DefaultMaxBulkRequestBodySizeInBytes / 2) }, + // cumulative size is 50x greater than threshold + { count: 100, size: Math.floor(Constants.DefaultMaxBulkRequestBodySizeInBytes / 2) }, + ]; + + for (const testCase of cases) { + const operations: ItemOperation[] = Array.from({ length: testCase.count }, () => + generateItemOperationOfSize(testCase.size, randomUUID(), "key_value"), + ); + let bulkStreamer = container.items.getBulkStreamer(); + let executionPromises = bulkStreamer.execute(operations); + let response = await Promise.all(executionPromises); + bulkStreamer.dispose(); + + assert.equal(response.length, testCase.count); + response.forEach((res) => assert.strictEqual(res.statusCode, 201)); + + // surfaces 413 error if individual operation is greater than threshold + const operation: ItemOperation = generateItemOperationOfSize( + Constants.DefaultMaxBulkRequestBodySizeInBytes * 10, + randomUUID(), + "key_value", + ); + bulkStreamer = container.items.getBulkStreamer(); + executionPromises = bulkStreamer.execute([operation]); + response = await Promise.all(executionPromises); + bulkStreamer.dispose(); + assert.equal(response.length, 1); + assert.strictEqual(response[0].statusCode, StatusCodes.RequestEntityTooLarge); + } + }); + }); + describe("single partition container", async function () { + let container: Container; + let deleteItemId: string; + let readItemId: string; + let replaceItemId: string; + before(async function () { + await removeAllDatabases(); + container = await getTestContainer("bulk container"); + deleteItemId = addEntropy("item2"); + readItemId = addEntropy("item2"); + replaceItemId = addEntropy("item2"); + await container.items.create({ + id: deleteItemId, + key: "A", + class: "2010", + }); + await container.items.create({ + id: readItemId, + key: "B", + class: "2010", + }); + }); + + after(async () => { + if (container) { + await container.database.delete(); + } + }); + + it("container handles Create, Read, Upsert, Delete opertion with partition split", async function () { + const operations = [ + BulkOperations.getCreateItemOperation("A", { + id: addEntropy("doc1"), + name: "sample", + key: "A", + }), + BulkOperations.getReadItemOperation(readItemId, "A"), + BulkOperations.getDeleteItemOperation(deleteItemId, "A"), + BulkOperations.getReplaceItemOperation(replaceItemId, 5, { + id: replaceItemId, + name: "nice", + key: 5, + }), + ]; + const splitContainer = await getSplitContainer(); + await splitContainer.items.create({ + id: deleteItemId, + key: "A", + class: "2010", + }); + await splitContainer.items.create({ + id: readItemId, + key: "A", + class: "2010", + }); + await splitContainer.items.create({ + id: replaceItemId, + key: 5, + class: "2010", + }); + + const bulkStreamer = splitContainer.items.getBulkStreamer(); + const executionPromises = bulkStreamer.execute(operations); + const response = await Promise.all(executionPromises); + bulkStreamer.dispose(); + assert.equal(response.length, 4); + + // Create + assert.equal(response[0].resourceBody.name, "sample"); + assert.equal(response[0].statusCode, 201); + // Read + assert.equal(response[1].resourceBody.class, "2010"); + assert.equal(response[1].statusCode, 200); + // Delete + assert.equal(response[2].statusCode, 204); + // Replace + assert.equal(response[3].resourceBody.name, "nice"); + assert.equal(response[3].statusCode, 200); + + // cleanup + if (splitContainer) { + await splitContainer.database.delete(); + } + }); + }); + }); + describe("v2 container", function () { + describe("multi partition container", async function () { + let readItemId: string; + let replaceItemId: string; + let patchItemId: string; + let deleteItemId: string; + type BulkTestItem = { + id: string; + key: any; + key2?: any; + key3?: any; + class?: string; + }; + type BulkTestDataSet = { + dbName: string; + containerRequest: ContainerRequest; + documentToCreate: BulkTestItem[]; + operations: { + description?: string; + operation: ItemOperation; + expectedOutput?: { + description?: string; + statusCode: number; + propertysToMatch: { + name: string; + value: any; + }[]; + }; + }[]; + }; + const defaultBulkTestDataSet: BulkTestDataSet = { + dbName: "bulkTestDB", + containerRequest: { + id: "bulkContainer", + partitionKey: { + paths: ["/key"], + version: 2, + }, + throughput: 25100, + }, + documentToCreate: [], + operations: [], + }; + async function runBulkTestDataSet(dataset: BulkTestDataSet) { + const client = new CosmosClient({ + key: masterKey, + endpoint, + diagnosticLevel: CosmosDbDiagnosticLevel.debug, + }); + const db = await client.databases.createIfNotExists({ id: dataset.dbName }); + const database = db.database; + const { container } = await database.containers.createIfNotExists(dataset.containerRequest); + try { + for (const doc of dataset.documentToCreate) { + await container.items.create(doc); + } + const bulkStreamer = container.items.getBulkStreamer({}); + const promises: Promise[] = []; + dataset.operations.forEach((operation) => + promises.push(...bulkStreamer.execute([operation.operation])), + ); + const response = await Promise.all(promises); + bulkStreamer.dispose(); + dataset.operations.forEach(({ description, expectedOutput }, index) => { + if (expectedOutput) { + assert.strictEqual( + response[index].statusCode, + expectedOutput.statusCode, + `Failed during - ${description}`, + ); + expectedOutput.propertysToMatch.forEach(({ name, value }) => { + assert.strictEqual( + response[index].resourceBody[name], + value, + `Failed during - ${description}`, + ); + }); + } + }); + } finally { + await database.delete(); + } + } + function createBulkOperationExpectedOutput( + statusCode: number, + propertysToMatch: { name: string; value: any }[], + ): { + statusCode: number; + propertysToMatch: { + name: string; + value: any; + }[]; + } { + return { + statusCode, + propertysToMatch, + }; + } + describe("handles create, upsert, patch, replace, delete", async function () { + it("Hierarchical Partitions with two keys", async function () { + readItemId = addEntropy("item1"); + const createItemWithBooleanPartitionKeyId = addEntropy( + "createItemWithBooleanPartitionKeyId", + ); + const createItemWithStringPartitionKeyId = addEntropy( + "createItemWithStringPartitionKeyId", + ); + const createItemWithNumberPartitionKeyId = addEntropy( + "createItemWithNumberPartitionKeyId", + ); + replaceItemId = addEntropy("item3"); + patchItemId = addEntropy("item4"); + deleteItemId = addEntropy("item2"); + const dataset: BulkTestDataSet = { + dbName: "hierarchical partition bulk 2 keys", + containerRequest: { + id: "patchContainer", + partitionKey: { + paths: ["/key", "/key2"], + version: PartitionKeyDefinitionVersion.V2, + kind: PartitionKeyKind.MultiHash, + }, + throughput: 25100, + }, + documentToCreate: [ + { id: readItemId, key: true, key2: true, class: "2010" }, + { id: createItemWithBooleanPartitionKeyId, key: true, key2: false, class: "2010" }, + { id: createItemWithNumberPartitionKeyId, key: 0, key2: 3, class: "2010" }, + { id: createItemWithStringPartitionKeyId, key: 5, key2: {}, class: "2010" }, + { id: deleteItemId, key: {}, key2: {}, class: "2011" }, + { id: replaceItemId, key: 5, key2: 5, class: "2012" }, + { id: patchItemId, key: 5, key2: 5, class: "2019" }, + ], + operations: [ + { + description: "Read document with partitionKey containing booleans values.", + operation: BulkOperations.getReadItemOperation( + createItemWithBooleanPartitionKeyId, + [true, false], + ), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "class", value: "2010" }, + ]), + }, + { + description: "Read document with partitionKey containing Number values.", + operation: BulkOperations.getReadItemOperation( + createItemWithNumberPartitionKeyId, + [0, 3], + ), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "class", value: "2010" }, + ]), + }, + { + description: "Creating document with partitionKey containing 2 strings.", + operation: BulkOperations.getCreateItemOperation(["A", "B"], { + id: addEntropy("doc1"), + name: "sample", + key: "A", + key2: "B", + }), + expectedOutput: createBulkOperationExpectedOutput(201, [ + { name: "name", value: "sample" }, + ]), + }, + { + description: "Creating document with mismatching partition key.", + operation: BulkOperations.getCreateItemOperation(["A", "V"], { + id: addEntropy("doc1"), + name: "sample", + key: "A", + key2: "B", + }), + expectedOutput: createBulkOperationExpectedOutput(400, []), + }, + { + description: "Upsert document with partitionKey containing 2 strings.", + operation: BulkOperations.getUpsertItemOperation(["U", "V"], { + id: addEntropy("doc1"), + name: "other", + key: "U", + key2: "V", + }), + expectedOutput: createBulkOperationExpectedOutput(201, [ + { name: "name", value: "other" }, + ]), + }, + { + description: "Read document with partitionKey containing 2 booleans.", + operation: BulkOperations.getReadItemOperation(readItemId, [true, true]), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "class", value: "2010" }, + ]), + }, + { + description: "Replace document with partition key.", + operation: BulkOperations.getReplaceItemOperation(replaceItemId, [5, 5], { + id: replaceItemId, + name: "nice", + key: 5, + key2: 5, + }), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "name", value: "nice" }, + ]), + }, + { + description: "Patch document with partitionKey containing 2 Numbers.", + operation: BulkOperations.getPatchItemOperation(patchItemId, [5, 5], { + operations: [{ op: PatchOperationType.add, path: "/great", value: "goodValue" }], + }), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "great", value: "goodValue" }, + ]), + }, + { + description: "Conditional Patch document with partitionKey containing 2 Numbers.", + operation: BulkOperations.getPatchItemOperation(patchItemId, [5, 5], { + operations: [{ op: PatchOperationType.add, path: "/good", value: "greatValue" }], + condition: "from c where NOT IS_DEFINED(c.newImproved)", + }), + expectedOutput: createBulkOperationExpectedOutput(200, []), + }, + ], + }; + await runBulkTestDataSet(dataset); + }); + it("Hierarchical Partitions with three keys", async function () { + readItemId = addEntropy("item1"); + const createItemWithBooleanPartitionKeyId = addEntropy( + "createItemWithBooleanPartitionKeyId", + ); + const createItemWithStringPartitionKeyId = addEntropy( + "createItemWithStringPartitionKeyId", + ); + const createItemWithUnknownPartitionKeyId = addEntropy( + "createItemWithUnknownPartitionKeyId", + ); + const createItemWithNumberPartitionKeyId = addEntropy( + "createItemWithNumberPartitionKeyId", + ); + replaceItemId = addEntropy("item3"); + patchItemId = addEntropy("item4"); + deleteItemId = addEntropy("item2"); + const dataset: BulkTestDataSet = { + dbName: "hierarchical partition bulk 3 keys", + containerRequest: { + id: "patchContainer", + partitionKey: { + paths: ["/key", "/key2", "/key3"], + version: PartitionKeyDefinitionVersion.V2, + kind: PartitionKeyKind.MultiHash, + }, + throughput: 25100, + }, + documentToCreate: [ + { id: readItemId, key: true, key2: true, key3: true, class: "2010" }, + { + id: createItemWithBooleanPartitionKeyId, + key: true, + key2: false, + key3: true, + class: "2010", + }, + { + id: createItemWithUnknownPartitionKeyId, + key: {}, + key2: {}, + key3: {}, + class: "2010", + }, + { id: createItemWithNumberPartitionKeyId, key: 0, key2: 3, key3: 5, class: "2010" }, + { + id: createItemWithStringPartitionKeyId, + key: 5, + key2: {}, + key3: "adsf", + class: "2010", + }, + { id: deleteItemId, key: {}, key2: {}, key3: {}, class: "2011" }, + { id: replaceItemId, key: 5, key2: 5, key3: "T", class: "2012" }, + { id: patchItemId, key: 5, key2: 5, key3: true, class: "2019" }, + ], + operations: [ + { + description: "Read document with partitionKey containing booleans values.", + operation: BulkOperations.getReadItemOperation( + createItemWithBooleanPartitionKeyId, + [true, false, true], + ), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "class", value: "2010" }, + ]), + }, + { + description: "Read document with partitionKey containing Number values.", + operation: BulkOperations.getReadItemOperation( + createItemWithNumberPartitionKeyId, + [0, 3, 5], + ), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "class", value: "2010" }, + ]), + }, + { + description: "Creating document with partitionKey containing 2 strings.", + operation: BulkOperations.getCreateItemOperation(["A", "B", "C"], { + id: addEntropy("doc1"), + name: "sample", + key: "A", + key2: "B", + key3: "C", + }), + expectedOutput: createBulkOperationExpectedOutput(201, [ + { name: "name", value: "sample" }, + ]), + }, + { + description: "Creating document with mismatching partition key.", + operation: BulkOperations.getCreateItemOperation(["A", "V", true], { + id: addEntropy("doc1"), + name: "sample", + key: "A", + key2: "B", + key3: true, + }), + expectedOutput: createBulkOperationExpectedOutput(400, []), + }, + { + description: "Upsert document with partitionKey containing 3 strings.", + operation: BulkOperations.getUpsertItemOperation(["U", "V", 5], { + id: addEntropy("doc1"), + name: "other", + key: "U", + key2: "V", + key3: 5, + }), + expectedOutput: createBulkOperationExpectedOutput(201, [ + { name: "name", value: "other" }, + ]), + }, + { + description: "Read document with partitionKey containing 3 booleans.", + operation: BulkOperations.getReadItemOperation(readItemId, [true, true, true]), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "class", value: "2010" }, + ]), + }, + { + description: "Replace document with partition key.", + operation: BulkOperations.getReplaceItemOperation(replaceItemId, [5, 5, "T"], { + id: replaceItemId, + name: "nice", + key: 5, + key2: 5, + key3: "T", + }), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "name", value: "nice" }, + ]), + }, + { + description: "Patch document with partitionKey containing 2 Numbers.", + operation: BulkOperations.getPatchItemOperation(patchItemId, [5, 5, true], { + operations: [{ op: PatchOperationType.add, path: "/great", value: "goodValue" }], + }), + expectedOutput: createBulkOperationExpectedOutput(200, [ + { name: "great", value: "goodValue" }, + ]), + }, + { + description: "Conditional Patch document with partitionKey containing 2 Numbers.", + operation: BulkOperations.getPatchItemOperation(patchItemId, [5, 5, true], { + operations: [{ op: PatchOperationType.add, path: "/good", value: "greatValue" }], + condition: "from c where NOT IS_DEFINED(c.newImproved)", + }), + expectedOutput: createBulkOperationExpectedOutput(200, []), + }, + ], + }; + await runBulkTestDataSet(dataset); + }); + }); + it("Continues after errors with default value of continueOnError true", async function () { + const dataset: BulkTestDataSet = { + ...defaultBulkTestDataSet, + dbName: addEntropy("continueOnError"), + documentToCreate: [], + operations: [ + { + description: "Operation should fail with invalid ttl.", + operation: BulkOperations.getCreateItemOperation("A", { + id: addEntropy("doc1"), + key: "A", + ttl: -10, + }), + expectedOutput: createBulkOperationExpectedOutput(400, []), + }, + { + description: + "Operation should suceed and should not be abondoned because of previous failure, since continueOnError is true.", + operation: BulkOperations.getCreateItemOperation("A", { + id: addEntropy("doc2"), + key: "A", + }), + expectedOutput: createBulkOperationExpectedOutput(201, []), + }, + ], + }; + await runBulkTestDataSet(dataset); + }); + it("handles throttling", async function () { + let responseIndex = 0; + const plugins: PluginConfig[] = [ + { + on: PluginOn.request, + plugin: async (context, _diagNode, next) => { + if (context.operationType === "batch" && responseIndex < 1) { + const error = new ErrorResponse(); + error.code = StatusCodes.TooManyRequests; + error.headers = { + "x-ms-retry-after-ms": 100, + }; + responseIndex++; + throw error; + } + const res = await next(context); + return res; + }, + }, + ]; + const client = new CosmosClient({ + key: masterKey, + endpoint, + plugins, + }); + const testcontainer = await getTestContainer("throttling container", client, { + partitionKey: { + paths: ["/key"], + version: 2, + }, + throughput: 400, + }); + const operations: ItemOperation[] = Array.from({ length: 10 }, (_, i) => { + return BulkOperations.getCreateItemOperation(i, { + id: addEntropy("doc" + i), + key: i, + class: "2010", + }); + }); + const bulkStreamer = testcontainer.items.getBulkStreamer(); + const executionPromises = bulkStreamer.execute(operations); + const response = await Promise.all(executionPromises); + bulkStreamer.dispose(); + assert.strictEqual(response.length, 10); + response.forEach((res, index) => { + assert.strictEqual(res.statusCode, 201, `Status should be 201 for operation ${index}`); + }); + await testcontainer.database.delete(); + }); + it("returns final response in order", async function () { + const testcontainer = await getTestContainer("final response order container", undefined, { + partitionKey: { + paths: ["/key"], + version: 2, + }, + throughput: 25100, + }); + const operations: ItemOperation[] = Array.from({ length: 10 }, (_, i) => { + return BulkOperations.getCreateItemOperation(i, { + id: addEntropy("doc" + i), + key: i, + class: "2010", + }); + }); + + const bulkStreamer = testcontainer.items.getBulkStreamer(); + const executionPromises = bulkStreamer.execute(operations); + const response = await Promise.all(executionPromises); + bulkStreamer.dispose(); + const expectedOrder = operations + .filter((op) => "id" in op.resourceBody) + .map((op) => (op.resourceBody as any).id); + const actualOrder = response.map((res) => res.resourceBody.id); + assert.deepStrictEqual(actualOrder, expectedOrder); + await testcontainer.database.delete(); + }); + }); + describe("multi partition container - nested partition key", async function () { + let container: Container; + let createItemId: string; + let upsertItemId: string; + before(async function () { + await removeAllDatabases(); + container = await getTestContainer("bulk container", undefined, { + partitionKey: { + paths: ["/nested/key"], + version: 2, + }, + throughput: 25100, + }); + createItemId = addEntropy("createItem"); + upsertItemId = addEntropy("upsertItem"); + }); + after(async () => { + if (container) { + await container.database.delete(); + } + }); + it("creates an item with nested object partition key", async function () { + const operations = [ + BulkOperations.getCreateItemOperation("A", { id: createItemId, nested: { key: "A" } }), + BulkOperations.getUpsertItemOperation("A", { id: upsertItemId, nested: { key: false } }), + ]; + const bulkStreamer = container.items.getBulkStreamer(); + const executionPromises = bulkStreamer.execute(operations); + const createResponse = await Promise.all(executionPromises); + bulkStreamer.dispose(); + assert.equal(createResponse[0].statusCode, 201); + }); + }); + describe("multi partitioned container with many items handle partition split", async function () { + let container: Container; + before(async function () { + let numpkRangeRequests = 0; + const plugins: PluginConfig[] = [ + { + on: PluginOn.request, + plugin: async (context, _diagNode, next) => { + if (context.resourceType === ResourceType.pkranges) { + let response: Response; + if (numpkRangeRequests === 0) { + response = { + headers: {}, + result: { + PartitionKeyRanges: [ + { + _rid: "RRsbAKHytdECAAAAAAAAUA==", + id: "7", + _etag: '"00000000-0000-0000-683c-819a242201db"', + minInclusive: "", + maxExclusive: "FF", + }, + ], + }, + }; + response.code = 200; + numpkRangeRequests++; + return response; + } + numpkRangeRequests++; + } + const res = await next(context); + return res; + }, + }, + ]; + const client = new CosmosClient({ + key: masterKey, + endpoint, + diagnosticLevel: CosmosDbDiagnosticLevel.debug, + plugins, + }); + await removeAllDatabases(); + container = await getTestContainer("bulk split container", client, { + partitionKey: { + paths: ["/key"], + version: 2, + }, + throughput: 25100, + }); + for (let i = 0; i < 300; i++) { + await container.items.create({ + id: "item" + i, + key: i, + class: "2010", + }); + } + }); + it("check partition splits during bulk", async function () { + const operations: ItemOperation[] = []; + for (let i = 0; i < 300; i++) { + operations.push(BulkOperations.getReadItemOperation("item" + i, i)); + } + + const bulkStreamer = container.items.getBulkStreamer(); + const executionResults = bulkStreamer.execute(operations); + const executionPromises = Array.isArray(executionResults) + ? executionResults + : [executionResults]; + const response = await Promise.all(executionPromises); + bulkStreamer.dispose(); + response.forEach((res, index) => { + assert.strictEqual(res.statusCode, 200, `Status should be 200 for operation ${index}`); + assert.strictEqual(res.resourceBody.id, "item" + index, "Read Items id should match"); + }); + // Delete database after use + if (container) { + await container.database.delete(); + } + }); + }); + }); +}); diff --git a/sdk/cosmosdb/cosmos/test/public/functional/npcontainer.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/npcontainer.spec.ts index 87b3b155c8c5..23084b8a309a 100644 --- a/sdk/cosmosdb/cosmos/test/public/functional/npcontainer.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/functional/npcontainer.spec.ts @@ -46,6 +46,7 @@ describe("Non Partitioned Container", function () { }); after(async () => { + await container.database.delete(); client.dispose(); legacyClient.dispose(); }); diff --git a/sdk/cosmosdb/cosmos/test/public/functional/queryIterator.spec.ts b/sdk/cosmosdb/cosmos/test/public/functional/queryIterator.spec.ts index 764cd6fcb3f1..641e44a81833 100644 --- a/sdk/cosmosdb/cosmos/test/public/functional/queryIterator.spec.ts +++ b/sdk/cosmosdb/cosmos/test/public/functional/queryIterator.spec.ts @@ -40,6 +40,7 @@ describe("Correlated Activity Id", function () { }); before(async () => { + await removeAllDatabases(); container = await getTestContainer("Test", client, { partitionKey: "/name", throughput: 10000, @@ -250,6 +251,8 @@ describe("Correlated Activity Id", function () { } }); after(async function () { - await removeAllDatabases(); + if (container) { + await container.database.delete(); + } }); }); diff --git a/sdk/cosmosdb/cosmos/tsconfig.strict.json b/sdk/cosmosdb/cosmos/tsconfig.strict.json index c040b528f282..739bdf7ed6d9 100644 --- a/sdk/cosmosdb/cosmos/tsconfig.strict.json +++ b/sdk/cosmosdb/cosmos/tsconfig.strict.json @@ -9,6 +9,7 @@ "include": ["src/**/*.ts", "test/**/*.ts", "samples-dev/**/*.(ts|json)"], // ADDITION TO THIS LIST IS NOT ALLOWED "exclude": [ + "src/bulk/*.ts", "src/documents/DatabaseAccount.ts", "src/documents/IndexingPolicy.ts", "src/plugins/Plugin.ts", @@ -90,6 +91,7 @@ "src/request/SharedOptions.ts", "src/retry/defaultRetryPolicy.ts", "src/retry/endpointDiscoveryRetryPolicy.ts", + "src/retry/bulkExecutionRetryPolicy.ts", "src/retry/resourceThrottleRetryPolicy.ts", "src/retry/RetryPolicy.ts", "src/retry/retryUtility.ts", @@ -170,6 +172,9 @@ "test/internal/unit/hybridExecutionContext.spec.ts", "test/internal/unit/sessionContainer.spec.ts", "test/internal/unit/changeFeed/*.spec.ts", + "test/internal/unit/bulkCongestionAlgorithm.spec.ts", + "test/internal/unit/bulkStreamerPerPartition.spec.ts", + "test/internal/unit/bulkExecutionRetryPolicy.spec.ts", "test/public/common/MockQueryIterator.ts", "test/public/common/MockClientContext.ts", "test/internal/unit/smartRoutingMapProvider.spec.ts", @@ -198,6 +203,7 @@ "test/public/functional/globalEndpointManager.spec.ts", "test/public/functional/item/item.spec.ts", "test/public/functional/item/bulk.item.spec.ts", + "test/public/functional/item/bulkStreamer.item.spec.ts", "test/public/functional/item/batch.item.spec.ts", "test/public/functional/item/itemIdEncoding.spec.ts", "test/public/functional/endpointComponent/NonStreamingOrderByEndpointComponent.spec.ts",