Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support search iterator v2 #405

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 92 additions & 178 deletions milvus/grpc/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import {
SearchRes,
SearchSimpleReq,
SearchIteratorReq,
DEFAULT_TOPK,
HybridSearchReq,
promisify,
sleep,
Expand All @@ -56,9 +55,6 @@ import {
DEFAULT_COUNT_QUERY_STRING,
getQueryIteratorExpr,
QueryIteratorReq,
getRangeFromSearchResult,
SearchResultData,
getPKFieldExpr,
DEFAULT_MAX_SEARCH_SIZE,
SparseFloatVector,
sparseRowsToBytes,
Expand Down Expand Up @@ -473,6 +469,10 @@ export class Data extends Collection {
* @returns {string} status.error_code - The error code of the operation.
* @returns {string} status.reason - The reason for the error, if any.
* @returns {{score:number,id:string, [outputfield]: value}[]} results - Array of search results.
* @returns {number} session_ts - The timestamp of the search session.
* @returns {string} collection_name - The name of the collection.
* @returns {number} all_search_count - The total number of search operations.
* @returns {string[]} recalls - The recalls of the search operation.
*
* @example
* ```
Expand Down Expand Up @@ -533,6 +533,8 @@ export class Data extends Collection {
status: originSearchResult.status,
results: [],
recalls: [],
session_ts: -1,
collection_name: data.collection_name,
};
}

Expand All @@ -547,183 +549,95 @@ export class Data extends Collection {
// nq === 1, return the first object of results array
results: nq === 1 ? results[0] || [] : results,
recalls: originSearchResult.results.recalls,
session_ts: originSearchResult.session_ts,
collection_name: data.collection_name,
all_search_count: originSearchResult.results.all_search_count,
search_iterator_v2_results:
originSearchResult.results.search_iterator_v2_results,
_search_iterator_v2_results:
originSearchResult.results._search_iterator_v2_results,
};
}

// async searchIterator(data: SearchIteratorReq): Promise<any> {
// // store client
// const client = this;
// // get collection info
// const pkField = await this.getPkField(data);
// // get available count
// const count = await client.count({
// collection_name: data.collection_name,
// expr: data.expr || data.filter || '',
// });
// // make sure limit is not exceed the total count
// const total = data.limit > count.data ? count.data : data.limit;
// // make sure batch size is exceed the total count
// let batchSize = data.batchSize > total ? total : data.batchSize;
// // make sure batch size is not exceed max search size
// batchSize =
// batchSize > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : batchSize;

// // init expr
// const initExpr = data.expr || data.filter || '';
// // init search params object
// data.params = data.params || {};
// data.limit = batchSize;

// // user range filter set
// const initRadius = Number(data.params.radius) || 0;
// const initRangeFilter = Number(data.params.range_filter) || 0;
// // range params object
// const rangeFilterParams = {
// radius: initRadius,
// rangeFilter: initRangeFilter,
// expr: initExpr,
// };

// // force quite if true, at first, if total is 0, return done
// let done = total === 0;
// // batch result store
// let lastBatchRes: SearchResultData[] = [];

// // build cache
// const cache = await client.search({
// ...data,
// limit: total > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : total,
// });

// return {
// currentTotal: 0,
// [Symbol.asyncIterator]() {
// return {
// currentTotal: this.currentTotal,
// async next() {
// // check if reach the limit
// if (
// (this.currentTotal >= total && this.currentTotal !== 0) ||
// done
// ) {
// return { done: true, value: lastBatchRes };
// }

// // batch result container
// const batchRes: SearchResultData[] = [];
// const bs =
// this.currentTotal + batchSize > total
// ? total - this.currentTotal
// : batchSize;

// // keep getting search data if not reach the batch size
// while (batchRes.length < bs) {
// // search results container
// let searchResults: SearchResults = {
// status: { error_code: 'SUCCESS', reason: '' },
// results: [],
// };

// // Iterate through the cached data, adding it to the search results container until the batch size is reached.
// if (cache.results.length > 0) {
// while (
// cache.results.length > 0 &&
// searchResults.results.length < bs
// ) {
// searchResults.results.push(cache.results.shift()!);
// }
// } else if (searchResults.results.length < bs) {
// // build search params, overwrite range filter
// if (rangeFilterParams.radius && rangeFilterParams.rangeFilter) {
// data.params = {
// ...data.params,
// radius: rangeFilterParams.radius,
// range_filter:
// rangeFilterParams.rangeFilter,
// };
// }
// // set search expr
// data.expr = rangeFilterParams.expr;

// console.log('search param', data.params, data.expr);

// // iterate search, if no result, double the radius, until we doubled for 5 times
// let newSearchRes = await client.search(data);
// let retry = 0;
// while (newSearchRes.results.length === 0 && retry < 5) {
// newSearchRes = await client.search(data);
// if (searchResults.results.length === 0) {
// const newRadius = rangeFilterParams.radius * 2;

// data.params = {
// ...data.params,
// radius: newRadius,
// };
// }

// retry++;
// }

// // combine search results
// searchResults.results = [
// ...searchResults.results,
// ...newSearchRes.results,
// ];
// }

// console.log('return', searchResults.results);

// // filter result, batchRes should be unique
// const filterResult = searchResults.results.filter(
// r =>
// !lastBatchRes.some(l => l.id === r.id) &&
// !batchRes.some(c => c.id === r.id)
// );

// // fill filter result to batch result, it should not exceed the batch size
// for (let i = 0; i < filterResult.length; i++) {
// if (batchRes.length < bs) {
// batchRes.push(filterResult[i]);
// }
// }

// // get data range about last batch result
// const resultRange = getRangeFromSearchResult(filterResult);

// console.log('result range', resultRange);

// // if no more result, force quite
// if (resultRange.lastDistance === 0) {
// done = true;
// return { done: false, value: batchRes };
// }

// // update next range and expr
// rangeFilterParams.rangeFilter = resultRange.lastDistance;
// rangeFilterParams.radius =
// rangeFilterParams.radius + resultRange.radius;
// rangeFilterParams.expr = getPKFieldExpr({
// pkField,
// value: resultRange.id as string,
// expr: initExpr,
// });

// console.log('last', rangeFilterParams);
// }

// // store last result
// lastBatchRes = batchRes;

// // update current total
// this.currentTotal += batchRes.length;

// // return batch result
// return { done: false, value: batchRes };
// },
// };
// },
// };
// }
async searchIterator(data: SearchIteratorReq): Promise<any> {
const client = this;

// Get available count
const count = await client.count({
collection_name: data.collection_name,
expr: data.expr || data.filter || '',
});

// if limit not set, set it to count
if (!data.limit || data.limit === NO_LIMIT) {
data.limit = count.data;
}

// Ensure limit does not exceed the total count
const total = Math.min(data.limit, count.data);

// Ensure batch size does not exceed the total count or max search size
let batchSize = Math.min(data.batchSize, total, DEFAULT_MAX_SEARCH_SIZE);

// Iterator fields
const ITERATOR_FIELD = 'iterator';
const ITER_SEARCH_V2_KEY = 'search_iter_v2';
const ITER_SEARCH_ID_KEY = 'search_iter_id';
const ITER_SEARCH_BATCH_SIZE_KEY = 'search_iter_batch_size';
const ITER_SEARCH_LAST_BOUND_KEY = 'search_iter_last_bound';
const GUARANTEE_TIMESTAMP_KEY = 'guarantee_timestamp';

let currentTotal = 0;

// search iterator special params
const params: any = {
...data.params,
[ITERATOR_FIELD]: true,
[ITER_SEARCH_V2_KEY]: true,
[ITER_SEARCH_BATCH_SIZE_KEY]: batchSize,
[GUARANTEE_TIMESTAMP_KEY]: 0,
};

return {
[Symbol.asyncIterator]() {
return {
async next() {
if (currentTotal >= total) {
return { done: true, value: null };
}

try {
const batchRes = await client.search({
...data,
params,
limit: batchSize,
});

// update current total and batch size
currentTotal += batchRes.results.length;
batchSize = Math.min(batchSize, total - currentTotal);

// update search params
params[ITER_SEARCH_ID_KEY] =
batchRes.search_iterator_v2_results!.token;
params[ITER_SEARCH_LAST_BOUND_KEY] =
batchRes.search_iterator_v2_results?.last_bound;
params[GUARANTEE_TIMESTAMP_KEY] = batchRes.session_ts;
params[ITER_SEARCH_BATCH_SIZE_KEY] = batchSize;

return {
done: currentTotal > total,
value: batchRes.results,
};
} catch (error) {
console.error('Error during search iteration:', error);
return { done: true, value: null };
}
},
};
},
};
}

/**
* Executes a query and returns an async iterator that allows iterating over the results in batches.
Expand Down
34 changes: 20 additions & 14 deletions milvus/types/Data.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Key } from 'readline';
import {
GrpcTimeOut,
KeyValuePair,
Expand Down Expand Up @@ -263,6 +262,11 @@ export interface SearchResultData {
export interface SearchResults extends resStatusResponse {
results: SearchResultData[];
recalls: number[];
session_ts: number;
collection_name: string;
all_search_count?: number;
search_iterator_v2_results?: Record<string, any>;
_search_iterator_v2_results?: string;
}

export interface ImportResponse extends resStatusResponse {
Expand Down Expand Up @@ -302,6 +306,7 @@ export interface SearchParam {
group_size?: number; // group size
strict_group_size?: boolean; // if strict group size
hints?: string; // hints to improve milvus search performance
[key: string]: any; // extra search parameters
}

// old search api parameter type, deprecated
Expand All @@ -320,16 +325,6 @@ export interface SearchReq extends collectionNameReq {
transformers?: OutputTransformers; // provide custom data transformer for specific data type like bf16 or f16 vectors
}

export interface SearchIteratorReq
extends Omit<
SearchSimpleReq,
'data' | 'vectors' | 'offset' | 'limit' | 'topk'
> {
data: number[]; // data to search
batchSize: number;
limit: number;
}

export type SearchTextType = string | string[];
export type SearchVectorType = VectorTypes | VectorTypes[];
export type SearchDataType = SearchVectorType | SearchTextType;
Expand All @@ -338,7 +333,7 @@ export type SearchMultipleDataType = VectorTypes[] | SearchTextType[];
// simplified search api parameter type
export interface SearchSimpleReq extends collectionNameReq {
partition_names?: string[]; // partition names
anns_field?: string; // your vector field name,rquired if you are searching on multiple vector fields collection
anns_field?: string; // your vector field name,required if you are searching on multiple vector fields collection
data?: SearchDataType; // vector or text to search
vector?: VectorTypes; // alias for data, deprecated
vectors?: VectorTypes[]; // alias for data, deprecated
Expand Down Expand Up @@ -372,6 +367,12 @@ export type HybridSearchSingleReq = Pick<
transformers?: OutputTransformers; // provide custom data transformer for specific data type like bf16 or f16 vectors
};

export interface SearchIteratorReq
extends Omit<SearchSimpleReq, 'vectors' | 'offset' | 'limit' | 'topk'> {
limit?: number; // Optional. Specifies the maximum number of items. Default is no limit (-1 or if not set).
batchSize: number; // Specifies the number of items to return in each batch. if it exceeds 16384, it will be set to 16384
}

// rerank strategy and parameters
export type RerankerObj = {
strategy: RANKER_TYPE | string; // rerank strategy
Expand Down Expand Up @@ -433,7 +434,12 @@ export interface SearchRes extends resStatusResponse {
output_fields: string[];
group_by_field_value: string;
recalls: number[];
search_iterator_v2_results?: Record<string, any>;
_search_iterator_v2_results?: string;
all_search_count?: number;
};
collection_name: string;
session_ts: number;
}

// because in javascript, there is no float16 and bfloat16 type
Expand Down Expand Up @@ -462,8 +468,8 @@ export type QueryReq = BaseQueryReq &

export interface QueryIteratorReq
extends Omit<QueryReq, 'ids' | 'offset' | 'limit'> {
limit?: number;
batchSize: number;
limit?: number; // Optional. Specifies the maximum number of items. Default is no limit (-1 or if not set).
batchSize: number; // Specifies the number of items to return in each batch. if it exceeds 16384, it will be set to 16384
}

export interface GetReq extends collectionNameReq {
Expand Down
Loading
Loading