From 55bf5c6da5e4976a75d5581e6c47801adf664af1 Mon Sep 17 00:00:00 2001 From: Andrii Dema Date: Fri, 2 Aug 2024 10:10:51 +0300 Subject: [PATCH 1/5] SE-100: don't dump when instance is not found https://perconadev.atlassian.net/browse/SE-100 --- Makefile | 3 +- cmd/pmm-dump/main.go | 75 ++++++++++++++++++++--------------- cmd/pmm-dump/util.go | 18 +++------ pkg/victoriametrics/metric.go | 3 ++ pkg/victoriametrics/source.go | 53 +++++++++++++++++++++++++ 5 files changed, 106 insertions(+), 46 deletions(-) diff --git a/Makefile b/Makefile index 6645b22..cc8484c 100644 --- a/Makefile +++ b/Makefile @@ -76,10 +76,9 @@ export-all: export-vm: ./$(PMMD_BIN_NAME) export -v --dump-path $(DUMP_FILENAME) \ --pmm-url=$(PMM_URL) --dump-core - export-ch: ./$(PMMD_BIN_NAME) export -v --dump-path $(DUMP_FILENAME) \ - --pmm-url=$(PMM_URL) --dump-qan + --pmm-url=$(PMM_URL) --dump-qan --no-dump-core import-all: ./$(PMMD_BIN_NAME) import -v --dump-path $(DUMP_FILENAME) \ diff --git a/cmd/pmm-dump/main.go b/cmd/pmm-dump/main.go index ff67bb0..f00d4a4 100644 --- a/cmd/pmm-dump/main.go +++ b/cmd/pmm-dump/main.go @@ -219,23 +219,50 @@ func main() { //nolint:gocyclo,maintidx selectors = append(selectors, fmt.Sprintf(`{service_name="%s"}`, serviceName)) } } - vmSource, ok := prepareVictoriaMetricsSource(grafanaC, *dumpCore, pmmConfig.VictoriaMetricsURL, selectors, *vmNativeData, *vmContentLimit) - if ok { + + var chunks []dump.ChunkMeta + if *dumpCore { + vmSource := prepareVictoriaMetricsSource(grafanaC, pmmConfig.VictoriaMetricsURL, selectors, *vmNativeData, *vmContentLimit) sources = append(sources, vmSource) + hasMetrics, err := vmSource.HasMetrics(startTime, endTime) + if err != nil { + log.Fatal().Msgf("Failed to check metrics in VictoriaMetrics: %v", err) + } + if hasMetrics { + chunks = append(chunks, victoriametrics.SplitTimeRangeIntoChunks(startTime, endTime, *chunkTimeRange)...) + } } - if *where == "" && len(*instances) > 0 { - for i, serviceName := range *instances { - if i != 0 { - *where += " OR " + if *dumpQAN { + if *where == "" && len(*instances) > 0 { + for i, serviceName := range *instances { + if i != 0 { + *where += " OR " + } + *where += fmt.Sprintf("service_name='%s'", serviceName) } - *where += fmt.Sprintf("service_name='%s'", serviceName) } - } - chSource, ok := prepareClickHouseSource(ctx, *dumpQAN, pmmConfig.ClickHouseURL, *where) - if ok { + chSource, err := prepareClickHouseSource(ctx, pmmConfig.ClickHouseURL, *where) + if err != nil { + log.Fatal().Msgf("Failed to connect to ClickHouse: %v", err) + } sources = append(sources, chSource) + + chChunks, err := chSource.SplitIntoChunks(startTime, endTime, *chunkRows) + if err != nil { + log.Fatal().Msgf("Failed to create clickhouse chunks: %s", err.Error()) + } + if len(chChunks) > 0 { + chunks = append(chunks, chChunks...) + } + } + + if len(chunks) == 0 { + if len(*instances) > 0 { + log.Warn().Msgf("It seems that data about instances specified in the `--instance' option does not exist in the PMM server.") + } + log.Fatal().Msgf("Failed to create a dump. No data was found") } file, err := createFile(*dumpPath, *stdout) @@ -249,23 +276,6 @@ func main() { //nolint:gocyclo,maintidx log.Fatal().Msgf("Failed to setup export: %v", err) //nolint:gocritic //TODO: potential problem here, see muted linter warning } - var chunks []dump.ChunkMeta - - if *dumpCore { - chunks = append(chunks, victoriametrics.SplitTimeRangeIntoChunks(startTime, endTime, *chunkTimeRange)...) - } - - if *dumpQAN { - chChunks, err := chSource.SplitIntoChunks(startTime, endTime, *chunkRows) - if err != nil { - log.Fatal().Msgf("Failed to create clickhouse chunks: %s", err.Error()) - } - if len(chChunks) == 0 && !*dumpCore { - log.Fatal().Msg("QAN doesn't have any data") - } - chunks = append(chunks, chChunks...) - } - meta, err := composeMeta(*pmmURL, grafanaC, *exportServicesInfo, cli, *vmNativeData) if err != nil { log.Fatal().Err(err).Msg("Failed to compose meta") @@ -352,13 +362,16 @@ func main() { //nolint:gocyclo,maintidx log.Fatal().Msgf("`--vm-content-limit` is not supported with native data format") } - vmSource, ok := prepareVictoriaMetricsSource(grafanaC, *dumpCore, pmmConfig.VictoriaMetricsURL, nil, *vmNativeData, *vmContentLimit) - if ok { + if *dumpCore { + vmSource := prepareVictoriaMetricsSource(grafanaC, pmmConfig.VictoriaMetricsURL, nil, *vmNativeData, *vmContentLimit) sources = append(sources, vmSource) } - chSource, ok := prepareClickHouseSource(ctx, *dumpQAN, pmmConfig.ClickHouseURL, *where) - if ok { + if *dumpQAN { + chSource, err := prepareClickHouseSource(ctx, pmmConfig.ClickHouseURL, *where) + if err != nil { + log.Fatal().Msgf("Failed to connect to ClickHouse: %v", err) + } sources = append(sources, chSource) } diff --git a/cmd/pmm-dump/util.go b/cmd/pmm-dump/util.go index 10f31d0..fd4cdb5 100644 --- a/cmd/pmm-dump/util.go +++ b/cmd/pmm-dump/util.go @@ -361,11 +361,7 @@ func checkVersionSupport(c *client.Client, pmmURL, victoriaMetricsURL string) { } } -func prepareVictoriaMetricsSource(grafanaC *client.Client, dumpCore bool, url string, selectors []string, nativeData bool, contentLimit uint64) (*victoriametrics.Source, bool) { - if !dumpCore { - return nil, false - } - +func prepareVictoriaMetricsSource(grafanaC *client.Client, url string, selectors []string, nativeData bool, contentLimit uint64) *victoriametrics.Source { c := &victoriametrics.Config{ ConnectionURL: url, TimeSeriesSelectors: selectors, @@ -375,14 +371,10 @@ func prepareVictoriaMetricsSource(grafanaC *client.Client, dumpCore bool, url st log.Debug().Msgf("Got Victoria Metrics URL: %s", c.ConnectionURL) - return victoriametrics.NewSource(grafanaC, *c), true + return victoriametrics.NewSource(grafanaC, *c) } -func prepareClickHouseSource(ctx context.Context, dumpQAN bool, url, where string) (*clickhouse.Source, bool) { - if !dumpQAN { - return nil, false - } - +func prepareClickHouseSource(ctx context.Context, url, where string) (*clickhouse.Source, error) { c := &clickhouse.Config{ ConnectionURL: url, Where: where, @@ -390,12 +382,12 @@ func prepareClickHouseSource(ctx context.Context, dumpQAN bool, url, where strin clickhouseSource, err := clickhouse.NewSource(ctx, *c) if err != nil { - log.Fatal().Msgf("Failed to create ClickHouse source: %s", err.Error()) + return nil, errors.Wrap(err, "failed to create ClickHouse source") } log.Debug().Msgf("Got ClickHouse URL: %s", c.ConnectionURL) - return clickhouseSource, true + return clickhouseSource, nil } func parseURL(pmmURL, pmmHost, pmmPort, pmmUser, pmmPassword *string) { diff --git a/pkg/victoriametrics/metric.go b/pkg/victoriametrics/metric.go index 811d229..540cf5a 100644 --- a/pkg/victoriametrics/metric.go +++ b/pkg/victoriametrics/metric.go @@ -55,6 +55,9 @@ type MetricResponse struct { Value []interface{} `json:"value"` } `json:"result"` } `json:"data"` + Stats struct { + SeriesFetched string `json:"seriesFetched"` + } `json:"stats"` } func (r *MetricResponse) GetValidValue() (string, error) { diff --git a/pkg/victoriametrics/source.go b/pkg/victoriametrics/source.go index 06f4996..aa75b1c 100644 --- a/pkg/victoriametrics/source.go +++ b/pkg/victoriametrics/source.go @@ -308,6 +308,59 @@ func (s Source) FinalizeWrites() error { return nil } +func (s Source) HasMetrics(start, end time.Time) (bool, error) { + q := fasthttp.AcquireArgs() + defer fasthttp.ReleaseArgs(q) + + query := "" + for i, v := range s.cfg.TimeSeriesSelectors { + if i != 0 { + query += " and " + } + query += "absent(" + v + ")" + } + q.Add("start", strconv.FormatInt(start.Unix(), 10)) + q.Add("end", strconv.FormatInt(end.Unix(), 10)) + q.Add("step", "5s") + q.Add("query", query) + + url := fmt.Sprintf("%s/api/v1/query_range?%s", s.cfg.ConnectionURL, q.String()) + + log.Debug(). + Stringer("timeout", requestTimeout). + Str("url", url). + Msg("Sending GET query_range request to Victoria Metrics endpoint") + + req := fasthttp.AcquireRequest() + defer fasthttp.ReleaseRequest(req) + + req.Header.SetMethod(fasthttp.MethodGet) + req.SetRequestURI(url) + req.Header.Set(fasthttp.HeaderAcceptEncoding, "gzip") + + resp, err := s.c.DoWithTimeout(req, requestTimeout) + defer fasthttp.ReleaseResponse(resp) + if err != nil { + return false, errors.Wrap(err, "failed to send HTTP request to victoria metrics") + } + + body := gzipDecode(copyBytesArr(resp.Body())) + if status := resp.StatusCode(); status != fasthttp.StatusOK { + return false, errors.Errorf("non-OK response from victoria metrics: %d: %s", status, body) + } + log.Debug().Msg("Got successful response from Victoria Metrics") + + metricsResp := new(MetricResponse) + if err := json.Unmarshal([]byte(body), metricsResp); err != nil { + return false, errors.Wrap(err, "failed to unmarshal metrics response") + } + + if metricsResp.Stats.SeriesFetched == "0" { + return false, nil + } + return true, nil +} + func SplitTimeRangeIntoChunks(start, end time.Time, delta time.Duration) []dump.ChunkMeta { var chunks []dump.ChunkMeta chunkStart := start From f5e093117b17c53708c0cf68c7b0683f87637b70 Mon Sep 17 00:00:00 2001 From: Andrii Dema Date: Tue, 6 Aug 2024 01:39:48 +0300 Subject: [PATCH 2/5] fix check --- pkg/victoriametrics/source.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/victoriametrics/source.go b/pkg/victoriametrics/source.go index aa75b1c..deee363 100644 --- a/pkg/victoriametrics/source.go +++ b/pkg/victoriametrics/source.go @@ -319,17 +319,16 @@ func (s Source) HasMetrics(start, end time.Time) (bool, error) { } query += "absent(" + v + ")" } - q.Add("start", strconv.FormatInt(start.Unix(), 10)) - q.Add("end", strconv.FormatInt(end.Unix(), 10)) - q.Add("step", "5s") + q.Add("time", strconv.FormatInt(end.Unix(), 10)) + q.Add("step", strconv.Itoa(int(end.Sub(start).Seconds())+1)+"s") q.Add("query", query) - url := fmt.Sprintf("%s/api/v1/query_range?%s", s.cfg.ConnectionURL, q.String()) + url := fmt.Sprintf("%s/api/v1/query?%s", s.cfg.ConnectionURL, q.String()) log.Debug(). Stringer("timeout", requestTimeout). Str("url", url). - Msg("Sending GET query_range request to Victoria Metrics endpoint") + Msg("Sending GET query request to Victoria Metrics endpoint") req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) From 8d32face1e6f99122d6ef32d9621b95ea526955e Mon Sep 17 00:00:00 2001 From: Andrii Dema Date: Wed, 7 Aug 2024 01:09:04 +0300 Subject: [PATCH 3/5] fix TestDashboard --- cmd/pmm-dump/main.go | 2 +- internal/test/e2e/dashboard_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/pmm-dump/main.go b/cmd/pmm-dump/main.go index f00d4a4..f7b70c0 100644 --- a/cmd/pmm-dump/main.go +++ b/cmd/pmm-dump/main.go @@ -233,7 +233,7 @@ func main() { //nolint:gocyclo,maintidx } } - if *dumpQAN { + if *dumpQAN { //nolint:nestif if *where == "" && len(*instances) > 0 { for i, serviceName := range *instances { if i != 0 { diff --git a/internal/test/e2e/dashboard_test.go b/internal/test/e2e/dashboard_test.go index ef4e671..06c0459 100644 --- a/internal/test/e2e/dashboard_test.go +++ b/internal/test/e2e/dashboard_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "os" "path/filepath" + "strings" "testing" "github.com/valyala/fasthttp" @@ -53,6 +54,10 @@ func TestDashboard(t *testing.T) { pmm.Log("Exporting data with `--dashboard` flag to", dashboardDumpPath) stdout, stderr, err := b.Run(append([]string{"export", "--ignore-load"}, args...)...) if err != nil { + if strings.Contains(stderr, "Failed to create a dump. No data was found") { + // If pmm-dump returns this error, it also means that the dashboard selector parsing was successful + return + } t.Fatal("failed to export", err, stdout, stderr) } }) From 629429c4f9682f14cb575265c4f2c8eb97e35978 Mon Sep 17 00:00:00 2001 From: Andrii Dema Date: Thu, 22 Aug 2024 20:30:02 +0300 Subject: [PATCH 4/5] improve `amount of columns mismatch` error --- pkg/clickhouse/tsv/tsv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/clickhouse/tsv/tsv.go b/pkg/clickhouse/tsv/tsv.go index 3d6553d..96d8a14 100644 --- a/pkg/clickhouse/tsv/tsv.go +++ b/pkg/clickhouse/tsv/tsv.go @@ -55,7 +55,7 @@ func (r *Reader) Read() ([]interface{}, error) { return nil, err } if len(r.columnTypes) != len(records) { - return nil, errors.New("amount of columns mismatch") + return nil, errors.Errorf("amount of columns mismatch: expected %d, got %d", len(r.columnTypes), len(records)) } values := make([]interface{}, 0, len(records)) From 77cf3d33b06d74469da7ecd4acdff23ae17e6dcf Mon Sep 17 00:00:00 2001 From: Andrii Dema Date: Tue, 24 Sep 2024 03:26:49 +0300 Subject: [PATCH 5/5] fix tests --- internal/test/e2e/basic_test.go | 6 ++++-- internal/test/e2e/dashboard_test.go | 10 +++++++--- internal/test/e2e/validate_test.go | 23 +++++++++++++++++++---- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/internal/test/e2e/basic_test.go b/internal/test/e2e/basic_test.go index ec31a35..e4909c0 100644 --- a/internal/test/e2e/basic_test.go +++ b/internal/test/e2e/basic_test.go @@ -20,6 +20,7 @@ import ( "context" "path/filepath" "testing" + "time" "golang.org/x/sync/errgroup" @@ -44,17 +45,18 @@ func TestExportImport(t *testing.T) { if err := g.Wait(); err != nil { t.Fatal(err) } + time.Sleep(30 * time.Second) var b util.Binary testDir := t.TempDir() pmm.Log("Checking filtering with `--instance` flag") - args := []string{"-d", filepath.Join(testDir, "filter-dump.tar.gz"), "--pmm-url", pmm.PMMURL(), "--dump-qan", "--click-house-url", pmm.ClickhouseURL(), "--instance", "pmm-client"} + args := []string{"-d", filepath.Join(testDir, "filter-dump.tar.gz"), "--pmm-url", pmm.PMMURL(), "--dump-qan", "--click-house-url", pmm.ClickhouseURL(), "--instance", "pmm-server"} stdout, stderr, err := b.Run(append([]string{"export", "--ignore-load"}, args...)...) if err != nil { t.Fatal("failed to export", err, stdout, stderr) } - checkDumpFiltering(t, filepath.Join(testDir, "filter-dump.tar.gz"), "pmm-client") + checkDumpFiltering(t, filepath.Join(testDir, "filter-dump.tar.gz"), "pmm-server") args = []string{"-d", filepath.Join(testDir, "dump.tar.gz"), "--pmm-url", pmm.PMMURL(), "--dump-qan", "--click-house-url", pmm.ClickhouseURL()} diff --git a/internal/test/e2e/dashboard_test.go b/internal/test/e2e/dashboard_test.go index 0547c44..76d81d8 100644 --- a/internal/test/e2e/dashboard_test.go +++ b/internal/test/e2e/dashboard_test.go @@ -64,13 +64,17 @@ func TestDashboard(t *testing.T) { } dashboardDumpPath = filepath.Join(testDir, "dump2.tar.gz") - args = []string{"-d", dashboardDumpPath, "--pmm-url", pmm.PMMURL(), "--pmm-user", "admin", "--pmm-pass", "admin", "--dashboard", name, "--instance", "pmm-client"} + args = []string{"-d", dashboardDumpPath, "--pmm-url", pmm.PMMURL(), "--pmm-user", "admin", "--pmm-pass", "admin", "--dashboard", name, "--instance", "pmm-server"} pmm.Log("Exporting data with `--dashboard` flag and `--instance` to", dashboardDumpPath) stdout, stderr, err = b.Run(append([]string{"export", "--ignore-load"}, args...)...) if err != nil { + if strings.Contains(stderr, "Failed to create a dump. No data was found") { + // If pmm-dump returns this error, it also means that the dashboard selector parsing was successful + return + } t.Fatal("failed to export", err, stdout, stderr) } - checkDumpFiltering(t, dashboardDumpPath, "pmm-client") + checkDumpFiltering(t, dashboardDumpPath, "pmm-server") }) } } @@ -90,7 +94,7 @@ func checkDumpFiltering(t *testing.T, dumpPath, instanceFilter string) { case dump.VictoriaMetrics: chunk, err := vmParseChunk(data) if err != nil { - t.Fatal("failed to parse chunk", filename) + t.Fatal("failed to parse chunk", filename, "error", err.Error()) } for _, metric := range chunk { diff --git a/internal/test/e2e/validate_test.go b/internal/test/e2e/validate_test.go index b57ea88..93a3f09 100644 --- a/internal/test/e2e/validate_test.go +++ b/internal/test/e2e/validate_test.go @@ -367,12 +367,27 @@ func readChunks(filename string) (chunkMap, error) { return chunkMap, nil } +func isGzip(data []byte) bool { + reader := bytes.NewReader(data) + r, err := gzip.NewReader(reader) + if r != nil { + r.Close() + } + return err == nil +} + func vmParseChunk(data []byte) ([]vmMetric, error) { - r, err := gzip.NewReader(bytes.NewBuffer(data)) - if err != nil { - return nil, errors.Wrap(err, "failed to create reader") + var r io.Reader + var err error + r = bytes.NewBuffer(data) + if isGzip(data) { + gr, err := gzip.NewReader(bytes.NewBuffer(data)) + if err != nil { + return nil, errors.Wrap(err, "failed to create reader") + } + defer gr.Close() //nolint:errcheck + r = gr } - defer r.Close() //nolint:errcheck metrics, err := victoriametrics.ParseMetrics(r) if err != nil { return nil, errors.Wrap(err, "failed to parse metrics")