Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RefreshLoad/GetPartitionStats interface #1243

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions examples/main/java/io/milvus/v2/BulkWriterExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ private void createCollection(String collectionName, CreateCollectionReq.Collect
} else {
milvusClient.createCollection(requestCreate);
}
// milvusClient.loadCollection(LoadCollectionReq.builder().collectionName(collectionName).build());

System.out.printf("Collection %s created%n", collectionName);
}

Expand Down Expand Up @@ -694,12 +694,19 @@ private void createIndex() {
.collectionName(ALL_TYPES_COLLECTION_NAME)
.indexParams(indexes)
.build());

milvusClient.loadCollection(LoadCollectionReq.builder()
.collectionName(ALL_TYPES_COLLECTION_NAME)
.build());
}

private void loadCollection() {
System.out.println("Loading Collection...");
System.out.println("Refresh load collection...");
checkMilvusClientIfExist();
milvusClient.loadCollection(LoadCollectionReq.builder()
// RefreshLoad is a new interface from v2.5.3,
// mainly used when there are new segments generated by bulkinsert request,
// force the new segments to be loaded into memory.
milvusClient.refreshLoad(RefreshLoadReq.builder()
.collectionName(ALL_TYPES_COLLECTION_NAME)
.build());
System.out.println("Collection row number: " + getCollectionRowCount());
Expand Down
39 changes: 30 additions & 9 deletions sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.milvus.v2.service.index.response.*;
import io.milvus.v2.service.partition.PartitionService;
import io.milvus.v2.service.partition.request.*;
import io.milvus.v2.service.partition.response.*;
import io.milvus.v2.service.rbac.RBACService;
import io.milvus.v2.service.rbac.request.*;
import io.milvus.v2.service.rbac.response.*;
Expand Down Expand Up @@ -306,7 +307,7 @@ public ListDatabasesResp listDatabases() {
}
/**
* Alter database with key value pair. (Available from Milvus v2.4.4)
* Deprecated, replaced by alterDatabaseProperties from v2.5.2, to keep consistence with other SDKs
* Deprecated, replaced by alterDatabaseProperties from v2.5.3, to keep consistence with other SDKs
* @param request alter database request
*/
@Deprecated
Expand All @@ -317,14 +318,14 @@ public void alterDatabase(AlterDatabaseReq request) {
.build());
}
/**
* Alter a database's properties (Available from Milvus v2.5.2)
* Alter a database's properties (Available from Milvus v2.5.3)
* @param request alter database properties request
*/
public void alterDatabaseProperties(AlterDatabasePropertiesReq request) {
retry(()-> databaseService.alterDatabaseProperties(this.getRpcStub(), request));
}
/**
* drop a database's properties (Available from Milvus v2.5.2)
* drop a database's properties (Available from Milvus v2.5.3)
* @param request alter database properties request
*/
public void dropDatabaseProperties(DropDatabasePropertiesReq request) {
Expand Down Expand Up @@ -374,7 +375,7 @@ public void dropCollection(DropCollectionReq request) {
}
/**
* Alter a collection in Milvus.
* Deprecated, replaced by alterCollectionProperties from v2.5.2, to keep consistence with other SDKs
* Deprecated, replaced by alterCollectionProperties from v2.5.3, to keep consistence with other SDKs
*
* @param request alter collection request
*/
Expand All @@ -387,15 +388,15 @@ public void alterCollection(AlterCollectionReq request) {
.build());
}
/**
* Alter a collection's properties (Available from Milvus v2.5.2).
* Alter a collection's properties (Available from Milvus v2.5.3).
*
* @param request alter collection properties request
*/
public void alterCollectionProperties(AlterCollectionPropertiesReq request) {
retry(()-> collectionService.alterCollectionProperties(this.getRpcStub(), request));
}
/**
* drop a collection's properties (Available from Milvus v2.5.2)
* drop a collection's properties (Available from Milvus v2.5.3)
* @param request drop collection properties request
*/
public void dropCollectionProperties(DropCollectionPropertiesReq request) {
Expand Down Expand Up @@ -444,6 +445,16 @@ public void renameCollection(RenameCollectionReq request) {
public void loadCollection(LoadCollectionReq request) {
retry(()-> collectionService.loadCollection(this.getRpcStub(), request));
}
/**
* Refresh loads a collection. Mainly used when there are new segments generated by bulkinsert request.
* Force the new segments to be loaded into memory.
* Note: this interface will ignore the LoadCollectionReq.refresh flag
*
* @param request refresh load collection request
*/
public void refreshLoad(RefreshLoadReq request) {
retry(()-> collectionService.refreshLoad(this.getRpcStub(), request));
}
/**
* Releases a collection from memory in Milvus.
*
Expand Down Expand Up @@ -481,7 +492,7 @@ public void dropIndex(DropIndexReq request) {
}
/**
* Alter an index in Milvus.
* Deprecated, replaced by alterIndexProperties from v2.5.2, to keep consistence with other SDKs
* Deprecated, replaced by alterIndexProperties from v2.5.3, to keep consistence with other SDKs
*
* @param request alter index request
*/
Expand All @@ -495,15 +506,15 @@ public void alterIndex(AlterIndexReq request) {
.build());
}
/**
* Alter an index's properties (Available from Milvus v2.5.2)
* Alter an index's properties (Available from Milvus v2.5.3)
*
* @param request alter index request
*/
public void alterIndexProperties(AlterIndexPropertiesReq request) {
retry(()->indexService.alterIndexProperties(this.getRpcStub(), request));
}
/**
* drop an index's properties (Available from Milvus v2.5.2)
* drop an index's properties (Available from Milvus v2.5.3)
* @param request drop index properties request
*/
public void dropIndexProperties(DropIndexPropertiesReq request) {
Expand Down Expand Up @@ -654,6 +665,16 @@ public List<String> listPartitions(ListPartitionsReq request) {
return retry(()->partitionService.listPartitions(this.getRpcStub(), request));
}

/**
* get a partition stats in Milvus.
*
* @param request get partition stats request
* @return GetPartitionStatsResp
*/
public GetPartitionStatsResp getPartitionStats(GetPartitionStatsReq request) {
return retry(()-> partitionService.getPartitionStats(this.getRpcStub(), request));
}

/**
* Loads partitions in a collection in Milvus.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,22 @@ public Void loadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingS
Status status = blockingStub.loadCollection(loadCollectionRequest);
rpcUtils.handleResponse(title, status);
if (request.getAsync()) {
WaitForLoadCollection(blockingStub, request);
WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
}

return null;
}

public Void refreshLoad(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, RefreshLoadReq request) {
String title = String.format("RefreshLoadRequest collectionName:%s", request.getCollectionName());
LoadCollectionRequest loadCollectionRequest = LoadCollectionRequest.newBuilder()
.setCollectionName(request.getCollectionName())
.setRefresh(true)
.build();
Status status = blockingStub.loadCollection(loadCollectionRequest);
rpcUtils.handleResponse(title, status);
if (request.getAsync()) {
WaitForLoadCollection(blockingStub, request.getCollectionName(), request.getTimeout());
}

return null;
Expand Down Expand Up @@ -339,16 +354,17 @@ public void waitForCollectionRelease(MilvusServiceGrpc.MilvusServiceBlockingStub
}
}

private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadCollectionReq request) {
private void WaitForLoadCollection(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
String collectionName, long timeoutMs) {
boolean isLoaded = false;
long startTime = System.currentTimeMillis(); // Capture start time/ Timeout in milliseconds (60 seconds)

while (!isLoaded) {
// Call the getLoadState method
isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(request.getCollectionName()).build());
isLoaded = getLoadState(blockingStub, GetLoadStateReq.builder().collectionName(collectionName).build());
if (!isLoaded) {
// Check if timeout is exceeded
if (System.currentTimeMillis() - startTime > request.getTimeout()) {
if (System.currentTimeMillis() - startTime > timeoutMs) {
throw new MilvusClientException(ErrorCode.SERVER_ERROR, "Load collection timeout");
}
// Wait for a certain period before checking again
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.collection.request;

import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class RefreshLoadReq {
private String collectionName;
@Builder.Default
private Boolean async = Boolean.TRUE;
@Builder.Default
private Long timeout = 60000L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package io.milvus.v2.service.partition;

import io.milvus.grpc.CreatePartitionRequest;
import io.milvus.grpc.MilvusServiceGrpc;
import io.milvus.grpc.Status;
import io.milvus.grpc.*;
import io.milvus.v2.service.BaseService;
import io.milvus.v2.service.partition.request.*;
import io.milvus.v2.service.partition.response.*;

import java.util.List;

Expand Down Expand Up @@ -79,6 +78,21 @@ public List<String> listPartitions(MilvusServiceGrpc.MilvusServiceBlockingStub b
return showPartitionsResponse.getPartitionNamesList();
}

public GetPartitionStatsResp getPartitionStats(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, GetPartitionStatsReq request) {
String title = String.format("GetCollectionStatisticsRequest collectionName:%s", request.getCollectionName());
GetPartitionStatisticsRequest getPartitionStatisticsRequest = GetPartitionStatisticsRequest.newBuilder()
.setCollectionName(request.getCollectionName())
.setPartitionName(request.getPartitionName())
.build();
GetPartitionStatisticsResponse response = blockingStub.getPartitionStatistics(getPartitionStatisticsRequest);

rpcUtils.handleResponse(title, response.getStatus());
GetPartitionStatsResp getPartitionStatsResp = GetPartitionStatsResp.builder()
.numOfEntities(response.getStatsList().stream().filter(stat -> stat.getKey().equals("row_count")).map(stat -> Long.parseLong(stat.getValue())).findFirst().get())
.build();
return getPartitionStatsResp;
}

public Void loadPartitions(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, LoadPartitionsReq request) {
String title = String.format("Load partitions %s in collection %s", request.getPartitionNames(), request.getCollectionName());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.partition.request;

import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class GetPartitionStatsReq {
private String collectionName;
private String partitionName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package io.milvus.v2.service.partition.response;

import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class GetPartitionStatsResp {
private Long numOfEntities;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import io.milvus.grpc.*;
import io.milvus.v2.common.CompactionState;
import io.milvus.v2.exception.ErrorCode;
import io.milvus.v2.exception.MilvusClientException;
import io.milvus.v2.service.BaseService;
import io.milvus.v2.service.utility.request.*;
import io.milvus.v2.service.utility.response.*;
Expand All @@ -32,7 +34,8 @@ public FlushResp flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
List<String> collectionNames = request.getCollectionNames();
String title = String.format("Flush collections %s", collectionNames);
if (collectionNames.isEmpty()) {
return null; // maybe do flushAll in future
// consistent with python sdk behavior, throw an error if collection names list is null or empty
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "Collection name list can not be null or empty");
}

FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()
Expand Down
Loading