Skip to content

Commit

Permalink
enhance: Add healthz-check command (milvus-io#244)
Browse files Browse the repository at this point in the history
Add healthz-check command to check segment target block problem

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Feb 11, 2025
1 parent e9c72ee commit f1d53bc
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 0 deletions.
7 changes: 7 additions & 0 deletions framework/resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (rs *PresetResultSet) String() string {
return rs.PrintAs(rs.format)
}

func NewPresetResultSet(rs ResultSet, format Format) *PresetResultSet {
return &PresetResultSet{
ResultSet: rs,
format: format,
}
}

// NameFormat name to format mapping tool function.
func NameFormat(name string) Format {
f, ok := name2Format[name]
Expand Down
137 changes: 137 additions & 0 deletions states/healthz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package states

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/samber/lo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type HealthzCheckParam struct {
framework.ParamBase `use:"healthz-check" desc:"perform healthz check for connect instance"`
}

type HealthzCheckReports struct {
framework.ListResultSet[*HealthzCheckReport]
}

func (rs *HealthzCheckReports) PrintAs(format framework.Format) string {
switch format {
case framework.FormatDefault, framework.FormatPlain:
sb := &strings.Builder{}
for _, report := range rs.Data {
fmt.Fprintln(sb, report.Msg)
}
return sb.String()
case framework.FormatJSON:
sb := &strings.Builder{}
for _, report := range rs.Data {
output := report.Extra
bs, err := json.Marshal(output)
if err != nil {
fmt.Println(err.Error())
continue
}
sb.Write(bs)
sb.WriteString("\n")
}
return sb.String()
default:
}
return ""
}

type HealthzCheckReport struct {
Msg string
Extra map[string]any
}

func (c *InstanceState) HealthzCheckCommand(ctx context.Context, p *HealthzCheckParam) (*framework.PresetResultSet, error) {

results, err := c.checkSegmentTarget(ctx)
if err != nil {
return nil, err
}

return framework.NewPresetResultSet(framework.NewListResult[HealthzCheckReports](results), framework.FormatJSON), nil
}

func (c *InstanceState) checkSegmentTarget(ctx context.Context) ([]*HealthzCheckReport, error) {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion())
if err != nil {
return nil, err
}
validIDs := lo.SliceToMap(segments, func(segment *models.Segment) (int64, struct{}) { return segment.ID, struct{}{} })

sessions, err := common.ListSessions(c.client, c.basePath)
if err != nil {
return nil, err
}

var results []*HealthzCheckReport

for _, session := range sessions {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}

conn, err := grpc.DialContext(ctx, session.Address, opts...)
if err != nil {
fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error())
continue
}

if session.ServerName == "querynode" {
clientv2 := querypbv2.NewQueryNodeClient(conn)
resp, err := clientv2.GetDataDistribution(ctx, &querypbv2.GetDataDistributionRequest{
Base: &commonpbv2.MsgBase{
SourceID: -1,
TargetID: session.ServerID,
},
})
if err != nil {
fmt.Println(err.Error())
continue
}

for _, segment := range resp.GetSegments() {
if _, ok := validIDs[segment.GetID()]; !ok {
results = append(results, &HealthzCheckReport{
Msg: fmt.Sprintf("Sealed segment %d still loaded while meta gc-ed", segment.GetID()),
Extra: map[string]any{
"segment_id": segment.GetID(),
"segment_state": "sealed",
},
})
}
}

for _, lv := range resp.GetLeaderViews() {
growings := lo.Uniq(lo.Union(lv.GetGrowingSegmentIDs(), lo.Keys(lv.GetGrowingSegments())))
for _, segmentID := range growings {
if _, ok := validIDs[segmentID]; !ok {
results = append(results, &HealthzCheckReport{
Msg: fmt.Sprintf("Sealed segment %d still loaded while meta gc-ed", segmentID),
Extra: map[string]any{
"segment_id": segmentID,
"segment_state": "growing",
},
})
}
}
}
}
}
return results, nil
}

0 comments on commit f1d53bc

Please sign in to comment.