Skip to content

Commit

Permalink
Support connection pool (#261)
Browse files Browse the repository at this point in the history
* part1

Signed-off-by: ryjiang <[email protected]>

* update poole

Signed-off-by: ruiyi.jiang <[email protected]>

* finish connection pool

Signed-off-by: ryjiang <[email protected]>

* adjust

Signed-off-by: ryjiang <[email protected]>

* fix test

Signed-off-by: ryjiang <[email protected]>

* add more test

Signed-off-by: ryjiang <[email protected]>

* adjust jest config

Signed-off-by: ryjiang <[email protected]>

* more coverage

Signed-off-by: ryjiang <[email protected]>

* using generic-pool's options

Signed-off-by: ryjiang <[email protected]>

* fix clientConfig

Signed-off-by: ryjiang <[email protected]>

---------

Signed-off-by: ryjiang <[email protected]>
Signed-off-by: ruiyi.jiang <[email protected]>
  • Loading branch information
shanghaikid authored Dec 14, 2023
1 parent 54f10eb commit a6457e3
Show file tree
Hide file tree
Showing 20 changed files with 413 additions and 290 deletions.
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module.exports = {
moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'],
testTimeout: 60000,
// because user will cause other test fail, but we still have user spec
coveragePathIgnorePatterns: ['/milvus/User.ts'],
coveragePathIgnorePatterns: ['dist'],
testPathIgnorePatterns: ['cloud.spec.ts', 'serverless.spec.ts'], // add this line
testEnvironmentOptions: {
NODE_ENV: 'production',
Expand Down
10 changes: 8 additions & 2 deletions milvus/MilvusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ export class MilvusClient extends GRPCClient {
logger.debug(
`new client initialized, version: ${MilvusClient.sdkInfo.version} `
);
// connect();
this.connect(MilvusClient.sdkInfo.version);

// If the configOrAddress is a string (i.e., the server's address), or if the configOrAddress object does not have the __SKIP_CONNECT__ property set to true, then establish a connection to the Milvus server using the current SDK version.
if (
typeof configOrAddress === 'string' ||
!(configOrAddress as ClientConfig).__SKIP_CONNECT__
) {
this.connect(MilvusClient.sdkInfo.version);
}
}

// High level API: align with pymilvus
Expand Down
5 changes: 3 additions & 2 deletions milvus/const/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ export enum METADATA {

export enum CONNECT_STATUS {
NOT_CONNECTED,
CONNECTING,
CONNECTED,
CONNECTING = 0, // GRPC channel state connecting
CONNECTED = 1, // GRPC channel state ready
UNIMPLEMENTED,
SHUTDOWN = 5, // GRPC channel state shutdown
}

export enum TLS_MODE {
Expand Down
3 changes: 3 additions & 0 deletions milvus/const/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ export const DEFAULT_DYNAMIC_FIELD = '$meta';
export const DEFAULT_COUNT_QUERY_STRING = 'count(*)';
export const DEFAULT_HTTP_TIMEOUT = 60000; // 60s
export const DEFAULT_HTTP_ENDPOINT_VERSION = 'v1'; // api version, default v1

export const DEFAULT_POOL_MAX = 10; // default max pool client number
export const DEFAULT_POOL_MIN = 2; // default min pool client number
100 changes: 78 additions & 22 deletions milvus/grpc/BaseClient.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import path from 'path';
import crypto from 'crypto';
import protobuf, { Root, Type } from 'protobufjs';
import { Client, ChannelOptions } from '@grpc/grpc-js';
import { readFileSync } from 'fs';
import {
Client,
ChannelOptions,
credentials,
ChannelCredentials,
} from '@grpc/grpc-js';
import { Pool } from 'generic-pool';
import {
ERROR_REASONS,
ClientConfig,
Expand All @@ -26,18 +33,36 @@ const schemaProtoPath = path.resolve(
* Base gRPC client, setup all configuration here
*/
export class BaseClient {
// channel pool
public channelPool!: Pool<Client>;
// Client ID
clientId: string = `${crypto.randomUUID()}`;
public clientId: string = `${crypto.randomUUID()}`;
// flags to indicate that if the connection is established and its state
connectStatus = CONNECT_STATUS.NOT_CONNECTED;
connectPromise = Promise.resolve();
// metadata
protected metadata: Map<string, string> = new Map<string, string>();
// The path to the Milvus protobuf file.
protected protoFilePath = {
public connectStatus = CONNECT_STATUS.NOT_CONNECTED;
// connection promise
public connectPromise = Promise.resolve();
// TLS mode, by default it is disabled
public readonly tlsMode: TLS_MODE = TLS_MODE.DISABLED;
// The client configuration.
public readonly config: ClientConfig;
// grpc options
public readonly channelOptions: ChannelOptions;
// server info
public serverInfo: ServerInfo = {};
// // The gRPC client instance.
// public client!: Promise<Client>;
// The timeout for connecting to the Milvus service.
public timeout: number = DEFAULT_CONNECT_TIMEOUT;
// The path to the Milvus protobuf file, user can define it from clientConfig
public protoFilePath = {
milvus: milvusProtoPath,
schema: schemaProtoPath,
};

// ChannelCredentials object used for authenticating the client on the gRPC channel.
protected creds!: ChannelCredentials;
// global metadata, send each grpc request with it
protected metadata: Map<string, string> = new Map<string, string>();
// The protobuf schema.
protected schemaProto: Root;
// The Milvus protobuf.
Expand All @@ -46,27 +71,13 @@ export class BaseClient {
protected collectionSchemaType: Type;
// The milvus field schema Type
protected fieldSchemaType: Type;

// milvus proto
protected readonly protoInternalPath = {
serviceName: 'milvus.proto.milvus.MilvusService',
collectionSchema: 'milvus.proto.schema.CollectionSchema',
fieldSchema: 'milvus.proto.schema.FieldSchema',
};

// TLS mode, by default it is disabled
public readonly tlsMode: TLS_MODE = TLS_MODE.DISABLED;
// The client configuration.
public readonly config: ClientConfig;
// grpc options
public readonly channelOptions: ChannelOptions;
// server info
public serverInfo: ServerInfo = {};
// The gRPC client instance.
public client: Client | undefined;
// The timeout for connecting to the Milvus service.
public timeout: number = DEFAULT_CONNECT_TIMEOUT;

/**
* Sets up the configuration object for the gRPC client.
*
Expand Down Expand Up @@ -163,6 +174,51 @@ export class BaseClient {
this.config.tls.serverName;
}

// Switch based on the TLS mode
switch (this.tlsMode) {
case TLS_MODE.ONE_WAY:
// Create SSL credentials with empty parameters for one-way authentication
this.creds = credentials.createSsl();
break;
case TLS_MODE.TWO_WAY:
// Extract paths for root certificate, private key, certificate chain, and verify options from the client configuration
const { rootCertPath, privateKeyPath, certChainPath, verifyOptions } =
this.config.tls!;

// Initialize buffers for root certificate, private key, and certificate chain
let rootCertBuff: Buffer | null = null;
let privateKeyBuff: Buffer | null = null;
let certChainBuff: Buffer | null = null;

// Read root certificate file if path is provided
if (rootCertPath) {
rootCertBuff = readFileSync(rootCertPath);
}

// Read private key file if path is provided
if (privateKeyPath) {
privateKeyBuff = readFileSync(privateKeyPath);
}

// Read certificate chain file if path is provided
if (certChainPath) {
certChainBuff = readFileSync(certChainPath);
}

// Create SSL credentials with the read files and verify options for two-way authentication
this.creds = credentials.createSsl(
rootCertBuff,
privateKeyBuff,
certChainBuff,
verifyOptions
);
break;
default:
// Create insecure credentials if no TLS mode is specified
this.creds = credentials.createInsecure();
break;
}

// Set up the timeout for connecting to the Milvus service.
this.timeout =
typeof config.timeout === 'string'
Expand Down
40 changes: 20 additions & 20 deletions milvus/grpc/Collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class Collection extends Database {

// Call the promisify function to create the collection.
const createPromise = await promisify(
this.client,
this.channelPool,
'CreateCollection',
{
...data,
Expand Down Expand Up @@ -201,7 +201,7 @@ export class Collection extends Database {

// avoid to call describe collection, because it has cache
const res = await promisify(
this.client,
this.channelPool,
'DescribeCollection',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -242,7 +242,7 @@ export class Collection extends Database {
data?: ShowCollectionsReq
): Promise<ShowCollectionsResponse> {
const promise = await promisify(
this.client,
this.channelPool,
'ShowCollections',
{
type: data ? data.type : ShowCollectionsType.All,
Expand Down Expand Up @@ -290,7 +290,7 @@ export class Collection extends Database {
async alterCollection(data: AlterCollectionReq): Promise<ResStatus> {
checkCollectionName(data);
const promise = await promisify(
this.client,
this.channelPool,
'AlterCollection',
{
collection_name: data.collection_name,
Expand Down Expand Up @@ -346,7 +346,7 @@ export class Collection extends Database {

// get new data
const promise = await promisify(
this.client,
this.channelPool,
'DescribeCollection',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -391,7 +391,7 @@ export class Collection extends Database {
checkCollectionName(data);

const promise = await promisify(
this.client,
this.channelPool,
'GetCollectionStatistics',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -432,7 +432,7 @@ export class Collection extends Database {
checkCollectionName(data);

const promise = await promisify(
this.client,
this.channelPool,
'LoadCollection',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -470,7 +470,7 @@ export class Collection extends Database {
checkCollectionName(data);

const promise = await promisify(
this.client,
this.channelPool,
'LoadCollection',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -529,7 +529,7 @@ export class Collection extends Database {
checkCollectionName(data);

const promise = await promisify(
this.client,
this.channelPool,
'ReleaseCollection',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -564,7 +564,7 @@ export class Collection extends Database {
*/
async renameCollection(data: RenameCollectionReq): Promise<ResStatus> {
const promise = await promisify(
this.client,
this.channelPool,
'RenameCollection',
{
oldName: data.collection_name,
Expand Down Expand Up @@ -602,7 +602,7 @@ export class Collection extends Database {
checkCollectionName(data);

const promise = await promisify(
this.client,
this.channelPool,
'DropCollection',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -649,7 +649,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.ALIAS_NAME_IS_REQUIRED);
}
const promise = await promisify(
this.client,
this.channelPool,
'CreateAlias',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -688,7 +688,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.ALIAS_NAME_IS_REQUIRED);
}
const promise = await promisify(
this.client,
this.channelPool,
'DropAlias',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -728,7 +728,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.ALIAS_NAME_IS_REQUIRED);
}
const promise = await promisify(
this.client,
this.channelPool,
'AlterAlias',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -763,7 +763,7 @@ export class Collection extends Database {
checkCollectionName(data);
const collectionInfo = await this.describeCollection(data);
const res = await promisify(
this.client,
this.channelPool,
'ManualCompaction',
{
collectionID: collectionInfo.collectionID,
Expand Down Expand Up @@ -803,7 +803,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.COMPACTION_ID_IS_REQUIRED);
}
const res = await promisify(
this.client,
this.channelPool,
'GetCompactionState',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -841,7 +841,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.COMPACTION_ID_IS_REQUIRED);
}
const res = await promisify(
this.client,
this.channelPool,
'GetCompactionStateWithPlans',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -894,7 +894,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.COLLECTION_ID_IS_REQUIRED);
}
const res = await promisify(
this.client,
this.channelPool,
'GetReplicas',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -935,7 +935,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.COLLECTION_NAME_IS_REQUIRED);
}
const res = await promisify(
this.client,
this.channelPool,
'GetLoadingProgress',
data,
data.timeout || this.timeout
Expand Down Expand Up @@ -973,7 +973,7 @@ export class Collection extends Database {
throw new Error(ERROR_REASONS.COLLECTION_NAME_IS_REQUIRED);
}
const res = await promisify(
this.client,
this.channelPool,
'GetLoadState',
data,
data.timeout || this.timeout
Expand Down
Loading

0 comments on commit a6457e3

Please sign in to comment.