Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support search iterator #409

Merged
merged 8 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 105 additions & 178 deletions milvus/grpc/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import {
SearchRes,
SearchSimpleReq,
SearchIteratorReq,
DEFAULT_TOPK,
HybridSearchReq,
promisify,
sleep,
Expand All @@ -56,9 +55,6 @@ import {
DEFAULT_COUNT_QUERY_STRING,
getQueryIteratorExpr,
QueryIteratorReq,
getRangeFromSearchResult,
SearchResultData,
getPKFieldExpr,
DEFAULT_MAX_SEARCH_SIZE,
SparseFloatVector,
sparseRowsToBytes,
Expand Down Expand Up @@ -473,6 +469,10 @@ export class Data extends Collection {
* @returns {string} status.error_code - The error code of the operation.
* @returns {string} status.reason - The reason for the error, if any.
* @returns {{score:number,id:string, [outputfield]: value}[]} results - Array of search results.
* @returns {number} session_ts - The timestamp of the search session.
* @returns {string} collection_name - The name of the collection.
* @returns {number} all_search_count - The total number of search operations.
* @returns {string[]} recalls - The recalls of the search operation.
*
* @example
* ```
Expand Down Expand Up @@ -533,6 +533,14 @@ export class Data extends Collection {
status: originSearchResult.status,
results: [],
recalls: [],
session_ts: -1,
collection_name: data.collection_name,
search_iterator_v2_results:
originSearchResult.results &&
originSearchResult.results.search_iterator_v2_results,
_search_iterator_v2_results:
originSearchResult.results &&
originSearchResult.results._search_iterator_v2_results,
};
}

Expand All @@ -547,183 +555,102 @@ export class Data extends Collection {
// nq === 1, return the first object of results array
results: nq === 1 ? results[0] || [] : results,
recalls: originSearchResult.results.recalls,
session_ts: originSearchResult.session_ts,
collection_name: data.collection_name,
all_search_count: originSearchResult.results.all_search_count,
search_iterator_v2_results:
originSearchResult.results.search_iterator_v2_results,
_search_iterator_v2_results:
originSearchResult.results._search_iterator_v2_results,
};
}

// async searchIterator(data: SearchIteratorReq): Promise<any> {
// // store client
// const client = this;
// // get collection info
// const pkField = await this.getPkField(data);
// // get available count
// const count = await client.count({
// collection_name: data.collection_name,
// expr: data.expr || data.filter || '',
// });
// // make sure limit is not exceed the total count
// const total = data.limit > count.data ? count.data : data.limit;
// // make sure batch size is exceed the total count
// let batchSize = data.batchSize > total ? total : data.batchSize;
// // make sure batch size is not exceed max search size
// batchSize =
// batchSize > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : batchSize;

// // init expr
// const initExpr = data.expr || data.filter || '';
// // init search params object
// data.params = data.params || {};
// data.limit = batchSize;

// // user range filter set
// const initRadius = Number(data.params.radius) || 0;
// const initRangeFilter = Number(data.params.range_filter) || 0;
// // range params object
// const rangeFilterParams = {
// radius: initRadius,
// rangeFilter: initRangeFilter,
// expr: initExpr,
// };

// // force quite if true, at first, if total is 0, return done
// let done = total === 0;
// // batch result store
// let lastBatchRes: SearchResultData[] = [];

// // build cache
// const cache = await client.search({
// ...data,
// limit: total > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : total,
// });

// return {
// currentTotal: 0,
// [Symbol.asyncIterator]() {
// return {
// currentTotal: this.currentTotal,
// async next() {
// // check if reach the limit
// if (
// (this.currentTotal >= total && this.currentTotal !== 0) ||
// done
// ) {
// return { done: true, value: lastBatchRes };
// }

// // batch result container
// const batchRes: SearchResultData[] = [];
// const bs =
// this.currentTotal + batchSize > total
// ? total - this.currentTotal
// : batchSize;

// // keep getting search data if not reach the batch size
// while (batchRes.length < bs) {
// // search results container
// let searchResults: SearchResults = {
// status: { error_code: 'SUCCESS', reason: '' },
// results: [],
// };

// // Iterate through the cached data, adding it to the search results container until the batch size is reached.
// if (cache.results.length > 0) {
// while (
// cache.results.length > 0 &&
// searchResults.results.length < bs
// ) {
// searchResults.results.push(cache.results.shift()!);
// }
// } else if (searchResults.results.length < bs) {
// // build search params, overwrite range filter
// if (rangeFilterParams.radius && rangeFilterParams.rangeFilter) {
// data.params = {
// ...data.params,
// radius: rangeFilterParams.radius,
// range_filter:
// rangeFilterParams.rangeFilter,
// };
// }
// // set search expr
// data.expr = rangeFilterParams.expr;

// console.log('search param', data.params, data.expr);

// // iterate search, if no result, double the radius, until we doubled for 5 times
// let newSearchRes = await client.search(data);
// let retry = 0;
// while (newSearchRes.results.length === 0 && retry < 5) {
// newSearchRes = await client.search(data);
// if (searchResults.results.length === 0) {
// const newRadius = rangeFilterParams.radius * 2;

// data.params = {
// ...data.params,
// radius: newRadius,
// };
// }

// retry++;
// }

// // combine search results
// searchResults.results = [
// ...searchResults.results,
// ...newSearchRes.results,
// ];
// }

// console.log('return', searchResults.results);

// // filter result, batchRes should be unique
// const filterResult = searchResults.results.filter(
// r =>
// !lastBatchRes.some(l => l.id === r.id) &&
// !batchRes.some(c => c.id === r.id)
// );

// // fill filter result to batch result, it should not exceed the batch size
// for (let i = 0; i < filterResult.length; i++) {
// if (batchRes.length < bs) {
// batchRes.push(filterResult[i]);
// }
// }

// // get data range about last batch result
// const resultRange = getRangeFromSearchResult(filterResult);

// console.log('result range', resultRange);

// // if no more result, force quite
// if (resultRange.lastDistance === 0) {
// done = true;
// return { done: false, value: batchRes };
// }

// // update next range and expr
// rangeFilterParams.rangeFilter = resultRange.lastDistance;
// rangeFilterParams.radius =
// rangeFilterParams.radius + resultRange.radius;
// rangeFilterParams.expr = getPKFieldExpr({
// pkField,
// value: resultRange.id as string,
// expr: initExpr,
// });

// console.log('last', rangeFilterParams);
// }

// // store last result
// lastBatchRes = batchRes;

// // update current total
// this.currentTotal += batchRes.length;

// // return batch result
// return { done: false, value: batchRes };
// },
// };
// },
// };
// }
async searchIterator(data: SearchIteratorReq): Promise<any> {
const client = this;

// Get available count
const count = await client.count({
collection_name: data.collection_name,
expr: data.expr || data.filter || '',
});

// get collection Info
const collectionInfo = await this.describeCollection({
collection_name: data.collection_name,
});

// if limit not set, set it to count
if (!data.limit || data.limit === NO_LIMIT) {
data.limit = count.data;
}

// Ensure limit does not exceed the total count
const total = Math.min(data.limit, count.data);

// Ensure batch size does not exceed the total count or max search size
let batchSize = Math.min(data.batchSize, total, DEFAULT_MAX_SEARCH_SIZE);

// Iterator fields
const ITERATOR_FIELD = 'iterator';
const ITER_SEARCH_V2_KEY = 'search_iter_v2';
const ITER_SEARCH_ID_KEY = 'search_iter_id';
const ITER_SEARCH_BATCH_SIZE_KEY = 'search_iter_batch_size';
const ITER_SEARCH_LAST_BOUND_KEY = 'search_iter_last_bound';
const GUARANTEE_TIMESTAMP_KEY = 'guarantee_timestamp';
const COLLECTION_ID = 'collection_id';

let currentTotal = 0;

// search iterator special params
const params: any = {
...data.params,
[ITERATOR_FIELD]: true,
[ITER_SEARCH_V2_KEY]: true,
[ITER_SEARCH_BATCH_SIZE_KEY]: batchSize,
[GUARANTEE_TIMESTAMP_KEY]: 0,
[COLLECTION_ID]: collectionInfo.collectionID,
};

return {
[Symbol.asyncIterator]() {
return {
async next() {
if (currentTotal >= total) {
return { done: true, value: null };
}

try {
const batchRes = await client.search({
...data,
params,
limit: batchSize,
});

// update current total and batch size
currentTotal += batchRes.results.length;
batchSize = Math.min(batchSize, total - currentTotal);

// update search params
params[ITER_SEARCH_ID_KEY] =
batchRes.search_iterator_v2_results!.token;
params[ITER_SEARCH_LAST_BOUND_KEY] =
batchRes.search_iterator_v2_results?.last_bound;
params[GUARANTEE_TIMESTAMP_KEY] = batchRes.session_ts;
params[ITER_SEARCH_BATCH_SIZE_KEY] = batchSize;

return {
done: currentTotal > total || !batchRes.results.length,
value: batchRes.results,
};
} catch (error) {
console.error('Error during search iteration:', error);
return { done: true, value: null };
}
},
};
},
};
}

/**
* Executes a query and returns an async iterator that allows iterating over the results in batches.
Expand Down
Loading
Loading