Skip to content

Commit

Permalink
enhance:add support for partition stats for major compaction test(mil…
Browse files Browse the repository at this point in the history
…vus-io#276)

Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Jun 21, 2024
1 parent 3ffd5ff commit a8c1615
Show file tree
Hide file tree
Showing 8 changed files with 1,838 additions and 898 deletions.
4 changes: 4 additions & 0 deletions models/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Segment struct {
// Semantic version
Version string

//PartitionStats version
PartitionStatsVersion int64

// etcd segment key
key string

Expand Down Expand Up @@ -125,6 +128,7 @@ func NewSegmentFromV2_2(info *datapbv2.SegmentInfo, key string,
}

s.Version = ">=2.2.0"
s.PartitionStatsVersion = info.GetPartitionStatsVersion()
return s
}

Expand Down
3 changes: 3 additions & 0 deletions models/segment_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,21 @@ const (
SegmentLevelLegacy SegmentLevel = 0
SegmentLevelL0 SegmentLevel = 1
SegmentLevelL1 SegmentLevel = 2
SegmentLevelL2 SegmentLevel = 3
)

var SegmentLevelName = map[int32]string{
0: "Legacy",
1: "L0",
2: "L1",
3: "L2",
}

var SegmentLevelValue = map[string]int32{
"Legacy": 0,
"L0": 1,
"L1": 2,
"L2": 3,
}

func (x SegmentLevel) String() string {
Expand Down
103 changes: 99 additions & 4 deletions proto/v2.2/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum SegmentLevel {
Legacy = 0; // zero value for legacy logic
L0 = 1; // L0 segment, contains delta data for current channel
L1 = 2; // L1 segment, normal segment, with no extra compaction attribute
L2 = 3; // L2 segemnt, segment with extra data distribution info
L2 = 3; // L2 segment, segment with extra data distribution info
}

service DataCoord {
Expand Down Expand Up @@ -110,7 +110,7 @@ service DataNode {
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}

rpc Compaction(CompactionPlan) returns (common.Status) {}
rpc CompactionV2(CompactionPlan) returns (common.Status) {}
rpc GetCompactionState(CompactionStateRequest) returns (CompactionStateResponse) {}
rpc SyncSegments(SyncSegmentsRequest) returns (common.Status) {}

Expand All @@ -127,6 +127,10 @@ service DataNode {
rpc QueryPreImport(QueryPreImportRequest) returns(QueryPreImportResponse) {}
rpc QueryImport(QueryImportRequest) returns(QueryImportResponse) {}
rpc DropImport(DropImportRequest) returns(common.Status) {}

rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {}

rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {}
}

message FlushRequest {
Expand Down Expand Up @@ -265,6 +269,7 @@ message VchannelInfo {
repeated int64 indexed_segmentIds = 10;
repeated SegmentInfo indexed_segments = 11;
repeated int64 level_zero_segment_ids = 12;
map<int64, int64> partition_stats_versions = 13;
}

message WatchDmChannelsRequest {
Expand Down Expand Up @@ -321,6 +326,12 @@ message SegmentInfo {
// so segments with Legacy level shall be treated as L1 segment
SegmentLevel level = 20;
int64 storage_version = 21;

int64 partition_stats_version = 22;
// use in major compaction, if compaction fail, should revert segment level to last value
SegmentLevel last_level = 23;
// use in major compaction, if compaction fail, should revert partition stats version to last value
int64 last_partition_stats_version = 24;
}

message SegmentStartPosition {
Expand Down Expand Up @@ -488,21 +499,37 @@ enum CompactionType {
MinorCompaction = 5;
MajorCompaction = 6;
Level0DeleteCompaction = 7;
ClusteringCompaction = 8;
}

message CompactionStateRequest {
common.MsgBase base = 1;
int64 planID = 2;
}

message SyncSegmentInfo {
int64 segment_id = 1;
FieldBinlog pk_stats_log = 2;
common.SegmentState state = 3;
SegmentLevel level = 4;
int64 num_of_rows = 5;
}

message SyncSegmentsRequest {
// Deprecated, after v2.4.3
int64 planID = 1;
// Deprecated, after v2.4.3
int64 compacted_to = 2;
// Deprecated, after v2.4.3
int64 num_of_rows = 3;
// Deprecated, after v2.4.3
repeated int64 compacted_from = 4;
// Deprecated, after v2.4.3
repeated FieldBinlog stats_logs = 5;
string channel_name = 6;
int64 partition_id = 7;
int64 collection_id = 8;
map<int64, SyncSegmentInfo> segment_infos = 9;
}

message CompactionSegmentBinlogs {
Expand All @@ -519,13 +546,20 @@ message CompactionSegmentBinlogs {
message CompactionPlan {
int64 planID = 1;
repeated CompactionSegmentBinlogs segmentBinlogs = 2;
uint64 start_time = 3;
int64 start_time = 3;
int32 timeout_in_seconds = 4;
CompactionType type = 5;
uint64 timetravel = 6;
string channel = 7;
int64 collection_ttl = 8;
int64 total_rows = 9;
schema.CollectionSchema schema = 10;
int64 clustering_key_field = 11;
int64 max_segment_rows = 12;
int64 prefer_segment_rows = 13;
string analyze_result_path = 14;
repeated int64 analyze_segment_ids = 15;
int32 state = 16;
}

message CompactionSegment {
Expand All @@ -540,7 +574,7 @@ message CompactionSegment {

message CompactionPlanResult {
int64 planID = 1;
common.CompactionState state = 2;
CompactionTaskState state = 2;
repeated CompactionSegment segments = 3;
string channel = 4;
CompactionType type = 5;
Expand Down Expand Up @@ -755,6 +789,7 @@ message ImportSegmentInfo {
int64 imported_rows = 2;
repeated FieldBinlog binlogs = 3;
repeated FieldBinlog statslogs = 4;
repeated FieldBinlog deltalogs = 5;
}

message QueryImportResponse {
Expand Down Expand Up @@ -832,3 +867,63 @@ message GcControlRequest {
GcCommand command = 2;
repeated common.KeyValuePair params = 3;
}

message QuerySlotRequest {}

message QuerySlotResponse {
common.Status status = 1;
int64 num_slots = 2;
}

enum CompactionTaskState {
unknown = 0;
executing = 1;
pipelining = 2;
completed = 3;
failed = 4;
timeout = 5;
analyzing = 6;
indexing = 7;
cleaned = 8;
meta_saved = 9;
}

message CompactionTask{
int64 planID = 1;
int64 triggerID = 2;
int64 collectionID = 3;
int64 partitionID = 4;
string channel = 5;
CompactionType type = 6;
CompactionTaskState state = 7;
string fail_reason = 8;
int64 start_time = 9;
int64 end_time = 10;
int32 timeout_in_seconds = 11;
int32 retry_times = 12;
int64 collection_ttl = 13;
int64 total_rows = 14;
repeated int64 inputSegments = 15;
repeated int64 resultSegments = 16;
msg.MsgPosition pos = 17;
int64 nodeID = 18;
schema.CollectionSchema schema = 19;
schema.FieldSchema clustering_key_field = 20;
int64 max_segment_rows = 21;
int64 prefer_segment_rows = 22;
int64 analyzeTaskID = 23;
int64 analyzeVersion = 24;
}

message PartitionStatsInfo {
int64 collectionID = 1;
int64 partitionID = 2;
string vChannel = 3;
int64 version = 4;
repeated int64 segmentIDs = 5;
int64 analyzeTaskID = 6;
}

message DropCompactionPlanRequest {
int64 planID = 1;
}
Loading

0 comments on commit a8c1615

Please sign in to comment.