Skip to content

Commit

Permalink
enhance: Add partition-loaded and make force-release selectable
Browse files Browse the repository at this point in the history
See also milvus-io#255

This PR:
- Add `show partition-loaded` command to list partitions loaded
- Make `force-release` able to specify collection id
- Rewrite `force-release` command with new framework
- Some minor format/parameter usage fix

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed May 6, 2024
1 parent c0eb6b3 commit 320f53a
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 73 deletions.
8 changes: 4 additions & 4 deletions models/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type CollectionLoaded struct {
Status LoadStatus
FieldIndexID map[int64]int64

// orignial etcd key
key string
// orignial etcd Key
Key string
Version string
}

Expand All @@ -46,7 +46,7 @@ func NewCollectionLoadedV2_1(info *querypb.CollectionInfo, key string) *Collecti
c.InMemoryPercentage = info.GetInMemoryPercentage()
c.ReplicaIDs = info.GetReplicaIds()
c.Version = LTEVersion2_1
c.key = key
c.Key = key

return c
}
Expand All @@ -57,7 +57,7 @@ func NewCollectionLoadedV2_2(info *querypbv2.CollectionLoadInfo, key string) *Co
c.Status = LoadStatus(info.GetStatus())
c.FieldIndexID = info.GetFieldIndexID()
c.Version = GTEVersion2_2
c.key = key
c.Key = key
return c
}

Expand Down
27 changes: 27 additions & 0 deletions models/partition_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package models

import (
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
)

type PartitionLoaded struct {
CollectionID int64
PartitionID int64
ReplicaNumber int32

Status LoadStatus

Key string
Version string
}

func NewPartitionLoaded(info *querypbv2.PartitionLoadInfo, key string) *PartitionLoaded {
return &PartitionLoaded{
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
ReplicaNumber: info.GetReplicaNumber(),
Status: LoadStatus(info.GetStatus()),
Version: GTEVersion2_2,
Key: key,
}
}
3 changes: 0 additions & 3 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func (s *embedEtcdMockState) SetupCommands() {
// inspect-pk
getInspectPKCmd(s.client, rootPath),

// force-release
getForceReleaseCmd(s.client, rootPath),

// for testing
etcd.RepairCommand(s.client, rootPath),

Expand Down
5 changes: 4 additions & 1 deletion states/etcd/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ func (c *FileAuditKV) Get(ctx context.Context, key string, opts ...clientv3.OpOp

// Delete deletes a key, or optionally using WithRange(end), [key, end).
func (c *FileAuditKV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
fmt.Println("audio delete", key)
fmt.Println("audit delete", key)
opts = append(opts, clientv3.WithPrevKV())
resp, err := c.cli.Delete(ctx, key, opts...)
if err != nil {
return resp, err
}
c.writeHeader(models.OpDel, int32(len(resp.PrevKvs)))
for _, kv := range resp.PrevKvs {
c.writeLogKV(kv)
Expand Down
4 changes: 3 additions & 1 deletion states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ const (
// CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x
CollectionLoadPrefix = "queryCoord-collectionMeta"
// CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x
CollectionLoadPrefixV2 = "querycoord-collection-loadinfo"
CollectionLoadPrefixV2 = "querycoord-collection-loadinfo"
PartitionLoadedPrefixLegacy = "queryCoord-partitionMeta"
PartitionLoadedPrefix = "querycoord-partition-loadinfo"
)

var (
Expand Down
8 changes: 5 additions & 3 deletions states/etcd/common/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
)

// ListCollectionLoadedInfo returns collection loaded info with provided version.
func ListCollectionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(any) bool) ([]*models.CollectionLoaded, error) {
func ListCollectionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(cl *models.CollectionLoaded) bool) ([]*models.CollectionLoaded, error) {
switch version {
case models.LTEVersion2_1:
prefix := path.Join(basePath, CollectionLoadPrefix)
infos, paths, err := ListProtoObjects(ctx, cli, prefix, func(info *querypb.CollectionInfo) bool {
cl := models.NewCollectionLoadedV2_1(info, "")
for _, filter := range filters {
if !filter(info) {
if !filter(cl) {
return false
}
}
Expand All @@ -34,8 +35,9 @@ func ListCollectionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath str
case models.GTEVersion2_2:
prefix := path.Join(basePath, CollectionLoadPrefixV2)
infos, paths, err := ListProtoObjects(ctx, cli, prefix, func(info *querypbv2.CollectionLoadInfo) bool {
cl := models.NewCollectionLoadedV2_2(info, "")
for _, filter := range filters {
if !filter(info) {
if !filter(cl) {
return false
}
}
Expand Down
37 changes: 37 additions & 0 deletions states/etcd/common/partition_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package common

import (
"context"
"errors"
"path"

"github.com/milvus-io/birdwatcher/models"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
)

func ListPartitionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.PartitionLoaded) bool) ([]*models.PartitionLoaded, error) {
switch version {
case models.GTEVersion2_2:
prefix := path.Join(basePath, PartitionLoadedPrefix)
infos, paths, err := ListProtoObjects(ctx, cli, prefix, func(info *querypbv2.PartitionLoadInfo) bool {
pl := models.NewPartitionLoaded(info, "")
for _, filter := range filters {
if !filter(pl) {
return false
}
}
return true
})
if err != nil {
return nil, err
}

return lo.Map(infos, func(info querypbv2.PartitionLoadInfo, idx int) *models.PartitionLoaded {
return models.NewPartitionLoaded(&info, paths[idx])
}), nil
default:
return nil, errors.New("version not supported")
}
}
31 changes: 15 additions & 16 deletions states/etcd/show/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/proto/v2.0/commonpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
"github.com/milvus-io/birdwatcher/proto/v2.0/internalpb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
Expand All @@ -37,22 +35,22 @@ func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointPara
VirtualName: channel.VirtualName,
},
}
var cp *internalpb.MsgPosition
var cp *models.MsgPosition
var segmentID int64
var err error

cp, err = c.getChannelCheckpoint(ctx, channel.VirtualName)
if err == nil {
checkpoint.Source = "Channel Checkpoint"
checkpoint.Checkpoint = models.NewMsgPosition(cp)
checkpoint.Checkpoint = cp
checkpoints = append(checkpoints, checkpoint)
continue
}

cp, segmentID, err = c.getCheckpointFromSegments(ctx, p.CollectionID, channel.VirtualName)
if err == nil {
checkpoint.Source = fmt.Sprintf("from segment id %d", segmentID)
checkpoint.Checkpoint = models.NewMsgPosition(cp)
checkpoint.Checkpoint = cp
checkpoints = append(checkpoints, checkpoint)
continue
}
Expand Down Expand Up @@ -93,9 +91,9 @@ func (rs *Checkpoints) PrintAs(format framework.Format) string {
return ""
}

func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*internalpb.MsgPosition, error) {
func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*models.MsgPosition, error) {
prefix := path.Join(c.basePath, "datacoord-meta", "channel-cp", channelName)
results, _, err := common.ListProtoObjects[internalpb.MsgPosition](context.Background(), c.client, prefix)
results, _, err := common.ListProtoObjects[internalpb.MsgPosition](ctx, c.client, prefix)
if err != nil {
return nil, err
}
Expand All @@ -104,11 +102,12 @@ func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName st
return nil, fmt.Errorf("expected 1 position but got %d", len(results))
}

return &results[0], nil
pos := models.NewMsgPosition(&results[0])
return pos, nil
}

func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*internalpb.MsgPosition, int64, error) {
segments, err := common.ListSegments(c.client, c.basePath, func(info *datapb.SegmentInfo) bool {
func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*models.MsgPosition, int64, error) {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.Segment) bool {
return info.CollectionID == collID && info.InsertChannel == vchannel
})
if err != nil {
Expand All @@ -117,18 +116,18 @@ func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID in
}
fmt.Printf("find segments to list checkpoint for %s, segment found %d\n", vchannel, len(segments))
var segmentID int64
var pos *internalpb.MsgPosition
var pos *models.MsgPosition
for _, segment := range segments {
if segment.State != commonpb.SegmentState_Flushed &&
segment.State != commonpb.SegmentState_Growing &&
segment.State != commonpb.SegmentState_Flushing {
if segment.State != models.SegmentStateFlushed &&
segment.State != models.SegmentStateGrowing &&
segment.State != models.SegmentStateFlushing {
continue
}
// skip all empty segment
if segment.GetDmlPosition() == nil && segment.GetStartPosition() == nil {
continue
}
var segPos *internalpb.MsgPosition
var segPos *models.MsgPosition

if segment.GetDmlPosition() != nil {
segPos = segment.GetDmlPosition()
Expand All @@ -138,7 +137,7 @@ func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID in

if pos == nil || segPos.GetTimestamp() < pos.GetTimestamp() {
pos = segPos
segmentID = segment.GetID()
segmentID = segment.ID
}
}

Expand Down
3 changes: 2 additions & 1 deletion states/etcd/show/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionPara
if p.DatabaseID > -1 && coll.DBID != p.DatabaseID {
return false
}
if p.State != "" && strings.ToLower(p.State) != strings.ToLower(coll.State.String()) {
if p.State != "" && !strings.EqualFold(p.State, coll.State.String()) {
return false
}

total++
return true
})
Expand Down
11 changes: 2 additions & 9 deletions states/etcd/show/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/samber/lo"
)

const (
Expand All @@ -25,20 +24,14 @@ type CollectionLoadedParam struct {
// CollectionLoadedCommand return show collection-loaded command.
func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) (*CollectionsLoaded, error) {
var total int
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(_ any) bool {
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
total++
return true
return p.CollectionID == 0 || p.CollectionID == info.CollectionID
})
if err != nil {
return nil, errors.Wrap(err, "failed to list collection load info")
}

if p.CollectionID > 0 {
infos = lo.Filter(infos, func(info *models.CollectionLoaded, _ int) bool {
return info.CollectionID == p.CollectionID
})
}

return framework.NewListResult[CollectionsLoaded](infos), nil
}

Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func printIndex(index IndexInfoV1) {
)
fmt.Printf("Index Params: %s\n", common.GetKVPair(index.info.GetIndexParams(), "params"))
for _, v := range indexParams {
fmt.Printf("%s:%s", v.GetKey(), v.GetValue())
fmt.Printf("%s:%s\n", v.GetKey(), v.GetValue())
}
fmt.Println("==================================================================")
}
Expand All @@ -101,7 +101,7 @@ func printIndexV2(index indexpbv2.FieldIndex) {
)
fmt.Printf("Index Params: %s\n", common.GetKVPair(index.GetIndexInfo().GetUserIndexParams(), "params"))
for _, v := range indexParams {
fmt.Printf("%s:%s", v.GetKey(), v.GetValue())
fmt.Printf("%s:%s\n", v.GetKey(), v.GetValue())
}
fmt.Println("==================================================================")
}
57 changes: 57 additions & 0 deletions states/etcd/show/partition_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package show

import (
"context"
"fmt"
"strings"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

type PartitionLoadedParam struct {
framework.ParamBase `use:"show partition-loaded" desc:"display the information of loaded partition(s) from querycoord meta"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
PartitionID int64 `name:"partition" default:"0" desc:"partition id to filter with"`
}

func (c *ComponentShow) PartitionLoadedCommand(ctx context.Context, p *PartitionLoadedParam) (*PartitionsLoaded, error) {
partitions, err := common.ListPartitionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(pl *models.PartitionLoaded) bool {
return (p.CollectionID == 0 || p.CollectionID == pl.CollectionID) &&
(p.PartitionID == 0 || p.PartitionID == pl.PartitionID)
})
if err != nil {
return nil, err
}
return framework.NewListResult[PartitionsLoaded](partitions), nil
}

type PartitionsLoaded struct {
framework.ListResultSet[*models.PartitionLoaded]
}

func (rs *PartitionsLoaded) PrintAs(format framework.Format) string {
switch format {
case framework.FormatDefault, framework.FormatPlain:
sb := &strings.Builder{}
for _, info := range rs.Data {
rs.printPartitionLoaded(sb, info)
}
fmt.Fprintf(sb, "--- Partitions Loaded: %d\n", len(rs.Data))
return sb.String()
default:
}
return ""
}

func (rs *PartitionsLoaded) printPartitionLoaded(sb *strings.Builder, info *models.PartitionLoaded) {
fmt.Fprintf(sb, "CollectionID: %d\tPartitionID: %d\n", info.CollectionID, info.PartitionID)
fmt.Fprintf(sb, "ReplicaNumber: %d", info.ReplicaNumber)
switch info.Version {
case models.LTEVersion2_1:
case models.GTEVersion2_2:
fmt.Fprintf(sb, "\tLoadStatus: %s\n", info.Status.String())
}
}
Loading

0 comments on commit 320f53a

Please sign in to comment.