Skip to content

Commit

Permalink
Support search iterator (#409)
Browse files Browse the repository at this point in the history
* support search iterator

Signed-off-by: shanghaikid <[email protected]>

* upgrade milvus test version

Signed-off-by: shanghaikid <[email protected]>

* fix test

Signed-off-by: shanghaikid <[email protected]>

* change log level

Signed-off-by: shanghaikid <[email protected]>

* fix iterator

Signed-off-by: ryjiang <[email protected]>

* add collection id

Signed-off-by: ryjiang <[email protected]>

* fix test

Signed-off-by: ryjiang <[email protected]>

---------

Signed-off-by: shanghaikid <[email protected]>
Signed-off-by: ryjiang <[email protected]>
  • Loading branch information
shanghaikid authored Jan 16, 2025
1 parent 66ae49a commit 07af1bd
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 557 deletions.
283 changes: 105 additions & 178 deletions milvus/grpc/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import {
SearchRes,
SearchSimpleReq,
SearchIteratorReq,
DEFAULT_TOPK,
HybridSearchReq,
promisify,
sleep,
Expand All @@ -56,9 +55,6 @@ import {
DEFAULT_COUNT_QUERY_STRING,
getQueryIteratorExpr,
QueryIteratorReq,
getRangeFromSearchResult,
SearchResultData,
getPKFieldExpr,
DEFAULT_MAX_SEARCH_SIZE,
SparseFloatVector,
sparseRowsToBytes,
Expand Down Expand Up @@ -473,6 +469,10 @@ export class Data extends Collection {
* @returns {string} status.error_code - The error code of the operation.
* @returns {string} status.reason - The reason for the error, if any.
* @returns {{score:number,id:string, [outputfield]: value}[]} results - Array of search results.
* @returns {number} session_ts - The timestamp of the search session.
* @returns {string} collection_name - The name of the collection.
* @returns {number} all_search_count - The total number of search operations.
* @returns {string[]} recalls - The recalls of the search operation.
*
* @example
* ```
Expand Down Expand Up @@ -533,6 +533,14 @@ export class Data extends Collection {
status: originSearchResult.status,
results: [],
recalls: [],
session_ts: -1,
collection_name: data.collection_name,
search_iterator_v2_results:
originSearchResult.results &&
originSearchResult.results.search_iterator_v2_results,
_search_iterator_v2_results:
originSearchResult.results &&
originSearchResult.results._search_iterator_v2_results,
};
}

Expand All @@ -547,183 +555,102 @@ export class Data extends Collection {
// nq === 1, return the first object of results array
results: nq === 1 ? results[0] || [] : results,
recalls: originSearchResult.results.recalls,
session_ts: originSearchResult.session_ts,
collection_name: data.collection_name,
all_search_count: originSearchResult.results.all_search_count,
search_iterator_v2_results:
originSearchResult.results.search_iterator_v2_results,
_search_iterator_v2_results:
originSearchResult.results._search_iterator_v2_results,
};
}

// async searchIterator(data: SearchIteratorReq): Promise<any> {
// // store client
// const client = this;
// // get collection info
// const pkField = await this.getPkField(data);
// // get available count
// const count = await client.count({
// collection_name: data.collection_name,
// expr: data.expr || data.filter || '',
// });
// // make sure limit is not exceed the total count
// const total = data.limit > count.data ? count.data : data.limit;
// // make sure batch size is exceed the total count
// let batchSize = data.batchSize > total ? total : data.batchSize;
// // make sure batch size is not exceed max search size
// batchSize =
// batchSize > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : batchSize;

// // init expr
// const initExpr = data.expr || data.filter || '';
// // init search params object
// data.params = data.params || {};
// data.limit = batchSize;

// // user range filter set
// const initRadius = Number(data.params.radius) || 0;
// const initRangeFilter = Number(data.params.range_filter) || 0;
// // range params object
// const rangeFilterParams = {
// radius: initRadius,
// rangeFilter: initRangeFilter,
// expr: initExpr,
// };

// // force quite if true, at first, if total is 0, return done
// let done = total === 0;
// // batch result store
// let lastBatchRes: SearchResultData[] = [];

// // build cache
// const cache = await client.search({
// ...data,
// limit: total > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : total,
// });

// return {
// currentTotal: 0,
// [Symbol.asyncIterator]() {
// return {
// currentTotal: this.currentTotal,
// async next() {
// // check if reach the limit
// if (
// (this.currentTotal >= total && this.currentTotal !== 0) ||
// done
// ) {
// return { done: true, value: lastBatchRes };
// }

// // batch result container
// const batchRes: SearchResultData[] = [];
// const bs =
// this.currentTotal + batchSize > total
// ? total - this.currentTotal
// : batchSize;

// // keep getting search data if not reach the batch size
// while (batchRes.length < bs) {
// // search results container
// let searchResults: SearchResults = {
// status: { error_code: 'SUCCESS', reason: '' },
// results: [],
// };

// // Iterate through the cached data, adding it to the search results container until the batch size is reached.
// if (cache.results.length > 0) {
// while (
// cache.results.length > 0 &&
// searchResults.results.length < bs
// ) {
// searchResults.results.push(cache.results.shift()!);
// }
// } else if (searchResults.results.length < bs) {
// // build search params, overwrite range filter
// if (rangeFilterParams.radius && rangeFilterParams.rangeFilter) {
// data.params = {
// ...data.params,
// radius: rangeFilterParams.radius,
// range_filter:
// rangeFilterParams.rangeFilter,
// };
// }
// // set search expr
// data.expr = rangeFilterParams.expr;

// console.log('search param', data.params, data.expr);

// // iterate search, if no result, double the radius, until we doubled for 5 times
// let newSearchRes = await client.search(data);
// let retry = 0;
// while (newSearchRes.results.length === 0 && retry < 5) {
// newSearchRes = await client.search(data);
// if (searchResults.results.length === 0) {
// const newRadius = rangeFilterParams.radius * 2;

// data.params = {
// ...data.params,
// radius: newRadius,
// };
// }

// retry++;
// }

// // combine search results
// searchResults.results = [
// ...searchResults.results,
// ...newSearchRes.results,
// ];
// }

// console.log('return', searchResults.results);

// // filter result, batchRes should be unique
// const filterResult = searchResults.results.filter(
// r =>
// !lastBatchRes.some(l => l.id === r.id) &&
// !batchRes.some(c => c.id === r.id)
// );

// // fill filter result to batch result, it should not exceed the batch size
// for (let i = 0; i < filterResult.length; i++) {
// if (batchRes.length < bs) {
// batchRes.push(filterResult[i]);
// }
// }

// // get data range about last batch result
// const resultRange = getRangeFromSearchResult(filterResult);

// console.log('result range', resultRange);

// // if no more result, force quite
// if (resultRange.lastDistance === 0) {
// done = true;
// return { done: false, value: batchRes };
// }

// // update next range and expr
// rangeFilterParams.rangeFilter = resultRange.lastDistance;
// rangeFilterParams.radius =
// rangeFilterParams.radius + resultRange.radius;
// rangeFilterParams.expr = getPKFieldExpr({
// pkField,
// value: resultRange.id as string,
// expr: initExpr,
// });

// console.log('last', rangeFilterParams);
// }

// // store last result
// lastBatchRes = batchRes;

// // update current total
// this.currentTotal += batchRes.length;

// // return batch result
// return { done: false, value: batchRes };
// },
// };
// },
// };
// }
async searchIterator(data: SearchIteratorReq): Promise<any> {
const client = this;

// Get available count
const count = await client.count({
collection_name: data.collection_name,
expr: data.expr || data.filter || '',
});

// get collection Info
const collectionInfo = await this.describeCollection({
collection_name: data.collection_name,
});

// if limit not set, set it to count
if (!data.limit || data.limit === NO_LIMIT) {
data.limit = count.data;
}

// Ensure limit does not exceed the total count
const total = Math.min(data.limit, count.data);

// Ensure batch size does not exceed the total count or max search size
let batchSize = Math.min(data.batchSize, total, DEFAULT_MAX_SEARCH_SIZE);

// Iterator fields
const ITERATOR_FIELD = 'iterator';
const ITER_SEARCH_V2_KEY = 'search_iter_v2';
const ITER_SEARCH_ID_KEY = 'search_iter_id';
const ITER_SEARCH_BATCH_SIZE_KEY = 'search_iter_batch_size';
const ITER_SEARCH_LAST_BOUND_KEY = 'search_iter_last_bound';
const GUARANTEE_TIMESTAMP_KEY = 'guarantee_timestamp';
const COLLECTION_ID = 'collection_id';

let currentTotal = 0;

// search iterator special params
const params: any = {
...data.params,
[ITERATOR_FIELD]: true,
[ITER_SEARCH_V2_KEY]: true,
[ITER_SEARCH_BATCH_SIZE_KEY]: batchSize,
[GUARANTEE_TIMESTAMP_KEY]: 0,
[COLLECTION_ID]: collectionInfo.collectionID,
};

return {
[Symbol.asyncIterator]() {
return {
async next() {
if (currentTotal >= total) {
return { done: true, value: null };
}

try {
const batchRes = await client.search({
...data,
params,
limit: batchSize,
});

// update current total and batch size
currentTotal += batchRes.results.length;
batchSize = Math.min(batchSize, total - currentTotal);

// update search params
params[ITER_SEARCH_ID_KEY] =
batchRes.search_iterator_v2_results!.token;
params[ITER_SEARCH_LAST_BOUND_KEY] =
batchRes.search_iterator_v2_results?.last_bound;
params[GUARANTEE_TIMESTAMP_KEY] = batchRes.session_ts;
params[ITER_SEARCH_BATCH_SIZE_KEY] = batchSize;

return {
done: currentTotal > total || !batchRes.results.length,
value: batchRes.results,
};
} catch (error) {
console.error('Error during search iteration:', error);
return { done: true, value: null };
}
},
};
},
};
}

/**
* Executes a query and returns an async iterator that allows iterating over the results in batches.
Expand Down
Loading

0 comments on commit 07af1bd

Please sign in to comment.