Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Separate basePath & metaPath to show config-etcd #337

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *embedEtcdMockState) SetInstance(instanceName string) {
s.CmdState.LabelStr = fmt.Sprintf("Backup(%s)", instanceName)
s.instanceName = instanceName
rootPath := path.Join(instanceName, metaPath)
s.ComponentShow = show.NewComponent(s.client, s.config, rootPath)
s.ComponentShow = show.NewComponent(s.client, s.config, instanceName, metaPath)
s.ComponentRemove = remove.NewComponent(s.client, s.config, rootPath)
s.ComponentRepair = repair.NewComponent(s.client, s.config, rootPath)
s.SetupCommands()
Expand Down Expand Up @@ -178,7 +178,7 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli *clientv3.Client, instanceName
CmdState: common.CmdState{
LabelStr: fmt.Sprintf("Backup(%s)", instanceName),
},
ComponentShow: show.NewComponent(cli, config, basePath),
ComponentShow: show.NewComponent(cli, config, instanceName, metaPath),
ComponentRemove: remove.NewComponent(cli, config, basePath),
instanceName: instanceName,
server: server,
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type AliasParam struct {

// AliasCommand implements `show alias` command.
func (c *ComponentShow) AliasCommand(ctx context.Context, p *AliasParam) (*Aliases, error) {
aliases, err := common.ListAliasVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(a *models.Alias) bool {
aliases, err := common.ListAliasVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(a *models.Alias) bool {
return p.DBID == -1 || p.DBID == a.DBID
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/bulkinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ImportJobParam struct {

// BulkInsertCommand returns show bulkinsert command.
func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam) error {
jobs, _, err := common.ListImportJobs(ctx, c.client, c.basePath, func(job *datapb.ImportJob) bool {
jobs, _, err := common.ListImportJobs(ctx, c.client, c.metaPath, func(job *datapb.ImportJob) bool {
return (p.JobID == 0 || job.GetJobID() == p.JobID) &&
(p.CollectionID == 0 || job.GetCollectionID() == p.CollectionID) &&
(p.State == "" || strings.EqualFold(job.GetState().String(), p.State))
Expand All @@ -51,7 +51,7 @@ func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam
fmt.Println("Please specify the job ID (-job={JobID}) to show detailed info.")
return nil
}
PrintDetailedImportJob(ctx, c.client, c.basePath, job, p.ShowAllFiles)
PrintDetailedImportJob(ctx, c.client, c.metaPath, job, p.ShowAllFiles)
} else {
PrintSimpleImportJob(job)
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/channel_watched.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ChannelWatchedParam struct {

// ChannelWatchedCommand return show channel-watched commands.
func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*framework.PresetResultSet, error) {
infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
infos, err := common.ListChannelWatch(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) && (!p.WithoutSchema || channel.Schema == nil)
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions states/etcd/show/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type CheckpointParam struct {

// CheckpointCommand returns show checkpoint command.
func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointParam) (*Checkpoints, error) {
coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err != nil {
return nil, errors.Wrap(err, "failed to get collection")
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (rs *Checkpoints) PrintAs(format framework.Format) string {
}

func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*models.MsgPosition, error) {
prefix := path.Join(c.basePath, "datacoord-meta", "channel-cp", channelName)
prefix := path.Join(c.metaPath, "datacoord-meta", "channel-cp", channelName)
results, _, err := common.ListProtoObjects[internalpb.MsgPosition](ctx, c.client, prefix)
if err != nil {
return nil, err
Expand All @@ -108,7 +108,7 @@ func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName st
}

func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*models.MsgPosition, int64, error) {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.Segment) bool {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.Segment) bool {
return info.CollectionID == collID && info.InsertChannel == vchannel
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionPara
// perform get by id to accelerate
if p.CollectionID > 0 {
var collection *models.Collection
collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err == nil {
collections = append(collections, collection)
}
} else {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(coll *models.Collection) bool {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(coll *models.Collection) bool {
if p.CollectionName != "" && coll.Schema.Name != p.CollectionName {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/collection_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect
}

// fetch current for now
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err != nil {
switch {
case errors.Is(err, common.ErrCollectionDropped):
Expand All @@ -43,7 +43,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect
Collection: collection,
}
// fetch history
items, err := common.ListCollectionHistory(ctx, c.client, c.basePath, etcdversion.GetVersion(), collection.DBID, p.CollectionID)
items, err := common.ListCollectionHistory(ctx, c.client, c.metaPath, etcdversion.GetVersion(), collection.DBID, p.CollectionID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type CollectionLoadedParam struct {
// CollectionLoadedCommand return show collection-loaded command.
func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) (*CollectionsLoaded, error) {
var total int
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
total++
return p.CollectionID == 0 || p.CollectionID == info.CollectionID
})
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *ComponentShow) CompactionTaskCommand(ctx context.Context, p *Compaction

// perform get by id to accelerate

compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.basePath, func(task *models.CompactionTask) bool {
compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.metaPath, func(task *models.CompactionTask) bool {
total++
if p.CollectionName != "" && task.GetSchema().GetName() != p.CollectionName {
return false
Expand Down
14 changes: 11 additions & 3 deletions states/etcd/show/component.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package show

import (
"path"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/birdwatcher/configs"
)

type ComponentShow struct {
client clientv3.KV
config *configs.Config
client clientv3.KV
config *configs.Config
// basePath is the root path of etcd key-value pairs.
// by default is by-dev
basePath string
// metaPath is the concatenated path of basePath & metaPath
// by default is by-dev/meta
metaPath string
}

func NewComponent(cli clientv3.KV, config *configs.Config, basePath string) *ComponentShow {
func NewComponent(cli clientv3.KV, config *configs.Config, basePath string, metaPath string) *ComponentShow {
return &ComponentShow{
client: cli,
config: config,
basePath: basePath,
metaPath: path.Join(basePath, metaPath),
}
}
6 changes: 4 additions & 2 deletions states/etcd/show/config_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ type ConfigEtcdParam struct {
}

// ConfigEtcdCommand return show config-etcd command.
func (c *ComponentShow) ConfigEtcdCommand(ctx context.Context, p *ConfigEtcdParam) {
func (c *ComponentShow) ConfigEtcdCommand(ctx context.Context, p *ConfigEtcdParam) error {
keys, values, err := common.ListEtcdConfigs(ctx, c.client, c.basePath)
if err != nil {
fmt.Println("failed to list configurations from etcd", err.Error())
return
return err
}

for i, key := range keys {
fmt.Printf("Key: %s, Value: %s\n", key, values[i])
}

return nil
}
2 changes: 1 addition & 1 deletion states/etcd/show/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type DatabaseParam struct {

// DatabaseCommand returns show database comand.
func (c *ComponentShow) DatabaseCommand(ctx context.Context, p *DatabaseParam) (*Databases, error) {
dbs, err := common.ListDatabase(ctx, c.client, c.basePath, func(db *models.Database) bool {
dbs, err := common.ListDatabase(ctx, c.client, c.metaPath, func(db *models.Database) bool {
return (p.DatabaseName == "" || db.Name == p.DatabaseName) && (p.DatabaseID == 0 || db.ID == p.DatabaseID)
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type IndexInfoV1 struct {
}

func (c *ComponentShow) listIndexMeta(ctx context.Context) ([]IndexInfoV1, error) {
prefix := path.Join(c.basePath, "root-coord/index")
prefix := path.Join(c.metaPath, "root-coord/index")
indexes, keys, err := common.ListProtoObjects[etcdpb.IndexInfo](ctx, c.client, prefix)
result := make([]IndexInfoV1, 0, len(indexes))
for idx, info := range indexes {
Expand All @@ -70,7 +70,7 @@ func (c *ComponentShow) listIndexMeta(ctx context.Context) ([]IndexInfoV1, error
}

func (c *ComponentShow) listIndexMetaV2(ctx context.Context) ([]indexpbv2.FieldIndex, error) {
indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.basePath, "field-index"))
indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.metaPath, "field-index"))
return indexes, err
}

Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *ComponentShow) PartitionCommand(ctx context.Context, p *PartitionParam)
return nil, errors.New("collection id not provided")
}

partitions, err := common.ListCollectionPartitions(ctx, c.client, c.basePath, p.CollectionID)
partitions, err := common.ListCollectionPartitions(ctx, c.client, c.metaPath, p.CollectionID)
if err != nil {
return nil, errors.Wrap(err, "failed to list partition info")
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/partition_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type PartitionLoadedParam struct {
}

func (c *ComponentShow) PartitionLoadedCommand(ctx context.Context, p *PartitionLoadedParam) (*PartitionsLoaded, error) {
partitions, err := common.ListPartitionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(pl *models.PartitionLoaded) bool {
partitions, err := common.ListPartitionLoadedInfo(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(pl *models.PartitionLoaded) bool {
return (p.CollectionID == 0 || p.CollectionID == pl.CollectionID) &&
(p.PartitionID == 0 || p.PartitionID == pl.PartitionID)
})
Expand Down
6 changes: 3 additions & 3 deletions states/etcd/show/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ func (c *ComponentShow) ReplicaCommand(ctx context.Context, p *ReplicaParam) (*R
var collections []*models.Collection
var err error
if p.CollectionID > 0 {
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err != nil {
return nil, err
}
collections = []*models.Collection{collection}
} else {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(c *models.Collection) bool {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(c *models.Collection) bool {
return p.CollectionID == 0 || p.CollectionID == c.ID
})
if err != nil {
return nil, err
}
}

replicas, err := common.ListReplica(ctx, c.client, c.basePath, p.CollectionID)
replicas, err := common.ListReplica(ctx, c.client, c.metaPath, p.CollectionID)
if err != nil {
return nil, errors.Wrap(err, "failed to list replica info")
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ResourceGroupParam struct {
}

func (c *ComponentShow) ResourceGroupCommand(ctx context.Context, p *ResourceGroupParam) (*ResourceGroups, error) {
rgs, err := common.ListResourceGroups(ctx, c.client, c.basePath, func(rg *models.ResourceGroup) bool {
rgs, err := common.ListResourceGroups(ctx, c.client, c.metaPath, func(rg *models.ResourceGroup) bool {
return p.Name == "" || p.Name == rg.GetName()
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type segStats struct {

// SegmentCommand returns show segments command.
func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) error {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID) &&
(p.PartitionID == 0 || segment.PartitionID == p.PartitionID) &&
(p.SegmentID == 0 || segment.ID == p.SegmentID) &&
Expand Down
10 changes: 5 additions & 5 deletions states/etcd/show/segment_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type SegmentIndexParam struct {

// SegmentIndexCommand returns show segment-index command.
func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndexParam) error {
segments, err := common.ListSegments(c.client, c.basePath, func(info *datapb.SegmentInfo) bool {
segments, err := common.ListSegments(c.client, c.metaPath, func(info *datapb.SegmentInfo) bool {
return (p.CollectionID == 0 || info.CollectionID == p.CollectionID) &&
(p.SegmentID == 0 || info.ID == p.SegmentID)
})
if err != nil {
return err
}

segmentIndexes, err := common.ListSegmentIndex(c.client, c.basePath)
segmentIndexes, err := common.ListSegmentIndex(c.client, c.metaPath)
if err != nil {
return err
}
Expand All @@ -40,12 +40,12 @@ func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndex
return err
}

indexBuildInfo, err := common.ListIndex(ctx, c.client, c.basePath)
indexBuildInfo, err := common.ListIndex(ctx, c.client, c.metaPath)
if err != nil {
return err
}

indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.basePath, "field-index"))
indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, c.client, path.Join(c.metaPath, "field-index"))
if err != nil {
return err
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (c *ComponentShow) SegmentIndexCommand(ctx context.Context, p *SegmentIndex
}

func (c *ComponentShow) listSegmentIndexV2(ctx context.Context) ([]indexpbv2.SegmentIndex, error) {
prefix := path.Join(c.basePath, "segment-index") + "/"
prefix := path.Join(c.metaPath, "segment-index") + "/"
result, _, err := common.ListProtoObjects[indexpbv2.SegmentIndex](ctx, c.client, prefix)
return result, err
}
2 changes: 1 addition & 1 deletion states/etcd/show/segment_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (c *ComponentShow) SegmentLoadedCommand(ctx context.Context, p *SegmentLoad
if etcdversion.GetVersion() != models.LTEVersion2_1 {
return nil, errors.New("list segment-loaded from meta only support before 2.1.x, try use `show segment-loaded-grc` instead")
}
segments, err := common.ListLoadedSegments(c.client, c.basePath, func(info *querypb.SegmentInfo) bool {
segments, err := common.ListLoadedSegments(c.client, c.metaPath, func(info *querypb.SegmentInfo) bool {
return (p.CollectionID == 0 || info.CollectionID == p.CollectionID) &&
(p.SegmentID == 0 || info.SegmentID == p.SegmentID)
})
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type SessionParam struct {
// SessionCommand returns show session command.
// usage: show session
func (c *ComponentShow) SessionCommand(ctx context.Context, p *SessionParam) (*Sessions, error) {
sessions, err := common.ListSessions(ctx, c.client, c.basePath)
sessions, err := common.ListSessions(ctx, c.client, c.metaPath)
if err != nil {
return nil, errors.Wrap(err, "failed to list sessions")
}
Expand Down
2 changes: 1 addition & 1 deletion states/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func getInstanceState(cli clientv3.KV, instanceName, metaPath string, etcdState
CmdState: common.CmdState{
LabelStr: fmt.Sprintf("Milvus(%s)", instanceName),
},
ComponentShow: show.NewComponent(cli, config, basePath),
ComponentShow: show.NewComponent(cli, config, instanceName, metaPath),
ComponentRemove: remove.NewComponent(cli, config, basePath),
ComponentRepair: repair.NewComponent(cli, config, basePath),
ComponentSet: set.NewComponent(cli, config, basePath),
Expand Down
Loading