Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SE-100: don't dump when instance is not found #173

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ export-all:
export-vm:
./$(PMMD_BIN_NAME) export -v --dump-path $(DUMP_FILENAME) \
--pmm-url=$(PMM_URL) --dump-core

export-ch:
./$(PMMD_BIN_NAME) export -v --dump-path $(DUMP_FILENAME) \
--pmm-url=$(PMM_URL) --dump-qan
--pmm-url=$(PMM_URL) --dump-qan --no-dump-core

import-all:
./$(PMMD_BIN_NAME) import -v --dump-path $(DUMP_FILENAME) \
Expand Down
75 changes: 44 additions & 31 deletions cmd/pmm-dump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,50 @@ func main() { //nolint:gocyclo,maintidx
selectors = append(selectors, fmt.Sprintf(`{service_name="%s" or node_name="%s" or instance="%s"}`, serviceName, serviceName, serviceName))
}
}
vmSource, ok := prepareVictoriaMetricsSource(grafanaC, *dumpCore, pmmConfig.VictoriaMetricsURL, selectors, *vmNativeData, *vmContentLimit)
if ok {

var chunks []dump.ChunkMeta
if *dumpCore {
vmSource := prepareVictoriaMetricsSource(grafanaC, pmmConfig.VictoriaMetricsURL, selectors, *vmNativeData, *vmContentLimit)
sources = append(sources, vmSource)
hasMetrics, err := vmSource.HasMetrics(startTime, endTime)
if err != nil {
log.Fatal().Msgf("Failed to check metrics in VictoriaMetrics: %v", err)
}
if hasMetrics {
chunks = append(chunks, victoriametrics.SplitTimeRangeIntoChunks(startTime, endTime, *chunkTimeRange)...)
}
}

if *where == "" && len(*instances) > 0 {
for i, serviceName := range *instances {
if i != 0 {
*where += " OR "
if *dumpQAN { //nolint:nestif
if *where == "" && len(*instances) > 0 {
for i, serviceName := range *instances {
if i != 0 {
*where += " OR "
}
*where += fmt.Sprintf("service_name='%s'", serviceName)
}
*where += fmt.Sprintf("service_name='%s'", serviceName)
}
}

chSource, ok := prepareClickHouseSource(ctx, *dumpQAN, pmmConfig.ClickHouseURL, *where)
if ok {
chSource, err := prepareClickHouseSource(ctx, pmmConfig.ClickHouseURL, *where)
if err != nil {
log.Fatal().Msgf("Failed to connect to ClickHouse: %v", err)
}
sources = append(sources, chSource)

chChunks, err := chSource.SplitIntoChunks(startTime, endTime, *chunkRows)
if err != nil {
log.Fatal().Msgf("Failed to create clickhouse chunks: %s", err.Error())
}
if len(chChunks) > 0 {
chunks = append(chunks, chChunks...)
}
}

if len(chunks) == 0 {
if len(*instances) > 0 {
log.Warn().Msgf("It seems that data about instances specified in the `--instance' option does not exist in the PMM server.")
}
log.Fatal().Msgf("Failed to create a dump. No data was found")
}

file, err := createFile(*dumpPath, *stdout)
Expand All @@ -249,23 +276,6 @@ func main() { //nolint:gocyclo,maintidx
log.Fatal().Msgf("Failed to setup export: %v", err) //nolint:gocritic //TODO: potential problem here, see muted linter warning
}

var chunks []dump.ChunkMeta

if *dumpCore {
chunks = append(chunks, victoriametrics.SplitTimeRangeIntoChunks(startTime, endTime, *chunkTimeRange)...)
}

if *dumpQAN {
chChunks, err := chSource.SplitIntoChunks(startTime, endTime, *chunkRows)
if err != nil {
log.Fatal().Msgf("Failed to create clickhouse chunks: %s", err.Error())
}
if len(chChunks) == 0 && !*dumpCore {
log.Fatal().Msg("QAN doesn't have any data")
}
chunks = append(chunks, chChunks...)
}

meta, err := composeMeta(*pmmURL, grafanaC, *exportServicesInfo, cli, *vmNativeData)
if err != nil {
log.Fatal().Err(err).Msg("Failed to compose meta")
Expand Down Expand Up @@ -352,13 +362,16 @@ func main() { //nolint:gocyclo,maintidx
log.Fatal().Msgf("`--vm-content-limit` is not supported with native data format")
}

vmSource, ok := prepareVictoriaMetricsSource(grafanaC, *dumpCore, pmmConfig.VictoriaMetricsURL, nil, *vmNativeData, *vmContentLimit)
if ok {
if *dumpCore {
vmSource := prepareVictoriaMetricsSource(grafanaC, pmmConfig.VictoriaMetricsURL, nil, *vmNativeData, *vmContentLimit)
sources = append(sources, vmSource)
}

chSource, ok := prepareClickHouseSource(ctx, *dumpQAN, pmmConfig.ClickHouseURL, *where)
if ok {
if *dumpQAN {
chSource, err := prepareClickHouseSource(ctx, pmmConfig.ClickHouseURL, *where)
if err != nil {
log.Fatal().Msgf("Failed to connect to ClickHouse: %v", err)
}
sources = append(sources, chSource)
}

Expand Down
18 changes: 5 additions & 13 deletions cmd/pmm-dump/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,7 @@ func checkVersionSupport(c *client.Client, pmmURL, victoriaMetricsURL string) {
}
}

func prepareVictoriaMetricsSource(grafanaC *client.Client, dumpCore bool, url string, selectors []string, nativeData bool, contentLimit uint64) (*victoriametrics.Source, bool) {
if !dumpCore {
return nil, false
}

func prepareVictoriaMetricsSource(grafanaC *client.Client, url string, selectors []string, nativeData bool, contentLimit uint64) *victoriametrics.Source {
if contentLimit > math.MaxInt {
log.Fatal().Msgf("`--vm-content-limit` can't have a value greater than %d", math.MaxInt)
}
Expand All @@ -380,27 +376,23 @@ func prepareVictoriaMetricsSource(grafanaC *client.Client, dumpCore bool, url st

log.Debug().Msgf("Got Victoria Metrics URL: %s", c.ConnectionURL)

return victoriametrics.NewSource(grafanaC, *c), true
return victoriametrics.NewSource(grafanaC, *c)
}

func prepareClickHouseSource(ctx context.Context, dumpQAN bool, url, where string) (*clickhouse.Source, bool) {
if !dumpQAN {
return nil, false
}

func prepareClickHouseSource(ctx context.Context, url, where string) (*clickhouse.Source, error) {
c := &clickhouse.Config{
ConnectionURL: url,
Where: where,
}

clickhouseSource, err := clickhouse.NewSource(ctx, *c)
if err != nil {
log.Fatal().Msgf("Failed to create ClickHouse source: %s", err.Error())
return nil, errors.Wrap(err, "failed to create ClickHouse source")
}

log.Debug().Msgf("Got ClickHouse URL: %s", c.ConnectionURL)

return clickhouseSource, true
return clickhouseSource, nil
}

func parseURL(pmmURL, pmmHost, pmmPort, pmmUser, pmmPassword *string) {
Expand Down
6 changes: 4 additions & 2 deletions internal/test/e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"path/filepath"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -44,17 +45,18 @@ func TestExportImport(t *testing.T) {
if err := g.Wait(); err != nil {
t.Fatal(err)
}
time.Sleep(30 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have comment there what we are waiting here.


var b util.Binary
testDir := t.TempDir()

pmm.Log("Checking filtering with `--instance` flag")
args := []string{"-d", filepath.Join(testDir, "filter-dump.tar.gz"), "--pmm-url", pmm.PMMURL(), "--dump-qan", "--click-house-url", pmm.ClickhouseURL(), "--instance", "pmm-client"}
args := []string{"-d", filepath.Join(testDir, "filter-dump.tar.gz"), "--pmm-url", pmm.PMMURL(), "--dump-qan", "--click-house-url", pmm.ClickhouseURL(), "--instance", "pmm-server"}
stdout, stderr, err := b.Run(append([]string{"export", "--ignore-load"}, args...)...)
if err != nil {
t.Fatal("failed to export", err, stdout, stderr)
}
checkDumpFiltering(t, filepath.Join(testDir, "filter-dump.tar.gz"), "pmm-client")
checkDumpFiltering(t, filepath.Join(testDir, "filter-dump.tar.gz"), "pmm-server")

args = []string{"-d", filepath.Join(testDir, "dump.tar.gz"), "--pmm-url", pmm.PMMURL(), "--dump-qan", "--click-house-url", pmm.ClickhouseURL()}

Expand Down
15 changes: 12 additions & 3 deletions internal/test/e2e/dashboard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"testing"

"github.com/valyala/fasthttp"
Expand Down Expand Up @@ -55,17 +56,25 @@ func TestDashboard(t *testing.T) {
pmm.Log("Exporting data with `--dashboard` flag to", dashboardDumpPath)
stdout, stderr, err := b.Run(append([]string{"export", "--ignore-load"}, args...)...)
if err != nil {
if strings.Contains(stderr, "Failed to create a dump. No data was found") {
// If pmm-dump returns this error, it also means that the dashboard selector parsing was successful
return
}
Comment on lines +59 to +62
Copy link
Contributor

@artemgavrilov artemgavrilov Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like relying on exit code may be a better option rather than error text.

t.Fatal("failed to export", err, stdout, stderr)
}

dashboardDumpPath = filepath.Join(testDir, "dump2.tar.gz")
args = []string{"-d", dashboardDumpPath, "--pmm-url", pmm.PMMURL(), "--pmm-user", "admin", "--pmm-pass", "admin", "--dashboard", name, "--instance", "pmm-client"}
args = []string{"-d", dashboardDumpPath, "--pmm-url", pmm.PMMURL(), "--pmm-user", "admin", "--pmm-pass", "admin", "--dashboard", name, "--instance", "pmm-server"}
pmm.Log("Exporting data with `--dashboard` flag and `--instance` to", dashboardDumpPath)
stdout, stderr, err = b.Run(append([]string{"export", "--ignore-load"}, args...)...)
if err != nil {
if strings.Contains(stderr, "Failed to create a dump. No data was found") {
// If pmm-dump returns this error, it also means that the dashboard selector parsing was successful
return
}
t.Fatal("failed to export", err, stdout, stderr)
}
checkDumpFiltering(t, dashboardDumpPath, "pmm-client")
checkDumpFiltering(t, dashboardDumpPath, "pmm-server")
})
}
}
Expand All @@ -85,7 +94,7 @@ func checkDumpFiltering(t *testing.T, dumpPath, instanceFilter string) {
case dump.VictoriaMetrics:
chunk, err := vmParseChunk(data)
if err != nil {
t.Fatal("failed to parse chunk", filename)
t.Fatal("failed to parse chunk", filename, "error", err.Error())
}

for _, metric := range chunk {
Expand Down
23 changes: 19 additions & 4 deletions internal/test/e2e/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,27 @@ func readChunks(filename string) (chunkMap, error) {
return chunkMap, nil
}

func isGzip(data []byte) bool {
reader := bytes.NewReader(data)
r, err := gzip.NewReader(reader)
if r != nil {
r.Close()
}
return err == nil
}

func vmParseChunk(data []byte) ([]vmMetric, error) {
r, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, errors.Wrap(err, "failed to create reader")
var r io.Reader
var err error
r = bytes.NewBuffer(data)
if isGzip(data) {
gr, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, errors.Wrap(err, "failed to create reader")
}
defer gr.Close() //nolint:errcheck
r = gr
}
defer r.Close() //nolint:errcheck
metrics, err := victoriametrics.ParseMetrics(r)
if err != nil {
return nil, errors.Wrap(err, "failed to parse metrics")
Expand Down
2 changes: 1 addition & 1 deletion pkg/clickhouse/tsv/tsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *Reader) Read() ([]interface{}, error) {
return nil, err
}
if len(r.columnTypes) != len(records) {
return nil, errors.New("amount of columns mismatch")
return nil, errors.Errorf("amount of columns mismatch: expected %d, got %d", len(r.columnTypes), len(records))
}

values := make([]interface{}, 0, len(records))
Expand Down
3 changes: 3 additions & 0 deletions pkg/victoriametrics/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type MetricResponse struct {
Value []interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
Stats struct {
SeriesFetched string `json:"seriesFetched"`
} `json:"stats"`
}

func (r *MetricResponse) GetValidValue() (string, error) {
Expand Down
52 changes: 52 additions & 0 deletions pkg/victoriametrics/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,58 @@ func (s Source) FinalizeWrites() error {
return nil
}

func (s Source) HasMetrics(start, end time.Time) (bool, error) {
q := fasthttp.AcquireArgs()
defer fasthttp.ReleaseArgs(q)

query := ""
for i, v := range s.cfg.TimeSeriesSelectors {
if i != 0 {
query += " and "
}
query += "absent(" + v + ")"
}
q.Add("time", strconv.FormatInt(end.Unix(), 10))
q.Add("step", strconv.Itoa(int(end.Sub(start).Seconds())+1)+"s")
q.Add("query", query)

url := fmt.Sprintf("%s/api/v1/query?%s", s.cfg.ConnectionURL, q.String())

log.Debug().
Stringer("timeout", requestTimeout).
Str("url", url).
Msg("Sending GET query request to Victoria Metrics endpoint")

req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)

req.Header.SetMethod(fasthttp.MethodGet)
req.SetRequestURI(url)
req.Header.Set(fasthttp.HeaderAcceptEncoding, "gzip")

resp, err := s.c.DoWithTimeout(req, requestTimeout)
defer fasthttp.ReleaseResponse(resp)
if err != nil {
return false, errors.Wrap(err, "failed to send HTTP request to victoria metrics")
}

body := gzipDecode(copyBytesArr(resp.Body()))
if status := resp.StatusCode(); status != fasthttp.StatusOK {
return false, errors.Errorf("non-OK response from victoria metrics: %d: %s", status, body)
}
log.Debug().Msg("Got successful response from Victoria Metrics")

metricsResp := new(MetricResponse)
if err := json.Unmarshal([]byte(body), metricsResp); err != nil {
return false, errors.Wrap(err, "failed to unmarshal metrics response")
}

if metricsResp.Stats.SeriesFetched == "0" {
return false, nil
}
return true, nil
}

func SplitTimeRangeIntoChunks(start, end time.Time, delta time.Duration) []dump.ChunkMeta {
var chunks []dump.ChunkMeta
chunkStart := start
Expand Down