Skip to content

Commit

Permalink
Merge #115201
Browse files Browse the repository at this point in the history
115201: obsservice: create batches for insert insights r=maryliag a=maryliag

Note to reviewers: The values for 100 Insights and the at least 1 minute are random for now. Later we can check if another value makes more sense.

---
Previously, all insights were being inserted as soon as exported to obsservice.
This commit introduces a cache to batch insights and do less inserts.
When the caches reaches 100 entries or it has been at least a minute since last insert, we do an insert.

Fixes CC-26215

Demo: https://www.loom.com/share/ecf706d41155484eb2fd532414a4f7b6

Release note: None

Co-authored-by: maryliag <[email protected]>
  • Loading branch information
craig[bot] and maryliag committed Dec 4, 2023
2 parents 53f28e3 + 9646291 commit 8b246d0
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pkg/obsservice/cmd/obsservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func makeStatementInsightsPipeline(
}

processor, err := process.NewMemQueueProcessor[*obspb.StatementInsightsStatistics](
memQueue, &process.StmtInsightsProcessor{SinkPGURL: sinkPGURL})
memQueue, process.NewStmtInsightsProcessor(sinkPGURL))
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/obsservice/obslib/process/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgx_v5//pgxpool",
"@com_github_jackc_pgx_v5//stdlib",
Expand Down
176 changes: 125 additions & 51 deletions pkg/obsservice/obslib/process/stmt_insights_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,125 @@ package process
import (
"context"
"fmt"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

type StmtInsightsProcessor struct {
SinkPGURL string

mu struct {
syncutil.RWMutex

lastExportTs time.Time
insights []*obspb.StatementInsightsStatistics
}
}

func (t *StmtInsightsProcessor) Process(
const InsightsBatchMax = 100

func NewStmtInsightsProcessor(sinkPGURL string) *StmtInsightsProcessor {
p := StmtInsightsProcessor{SinkPGURL: sinkPGURL}
p.mu.Lock()
defer p.mu.Unlock()

p.mu.lastExportTs = timeutil.Now()
return &p
}

func (p *StmtInsightsProcessor) Process(
ctx context.Context, stmtInsight *obspb.StatementInsightsStatistics,
) error {
db, err := OpenDBSync(ctx, t.SinkPGURL)
// TODO(maryliag): create a pool for the connections
if err != nil {
return err
p.addInsight(stmtInsight)
insightsSize, lastExportTs := p.getInsightsInfo()

if insightsSize >= InsightsBatchMax || lastExportTs > time.Minute {
insertStmt, args := p.prepareInsightExport()
err := p.exportInsights(ctx, insertStmt, args)
if err != nil {
return err
}
}
defer db.Close()

insertStmt := `INSERT INTO obsservice.statement_execution_insights (
return nil
}

func (p *StmtInsightsProcessor) addInsight(stmtInsight *obspb.StatementInsightsStatistics) {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.insights = append(p.mu.insights, stmtInsight)
}

func (p *StmtInsightsProcessor) getInsightsInfo() (int, time.Duration) {
p.mu.RLock()
defer p.mu.RUnlock()
insightsSize := len(p.mu.insights)
lastExportTs := timeutil.Since(p.mu.lastExportTs)

return insightsSize, lastExportTs
}

// prepareInsightExport creates the insert and args for the INSERT and clears the insights mutex.
func (p *StmtInsightsProcessor) prepareInsightExport() (string, []interface{}) {
columnCount := 31
var rows []string
var args []interface{}

p.mu.Lock()
defer p.mu.Unlock()
for insightIdx, stmtInsight := range p.mu.insights {
args = append(args,
stmtInsight.EventInfo.Timestamp,
stmtInsight.EventInfo.OrgID,
stmtInsight.EventInfo.ClusterID,
stmtInsight.EventInfo.TenantID,
stmtInsight.EventInfo.EventID,
stmtInsight.SessionID,
stmtInsight.TransactionID,
fmt.Sprint(stmtInsight.TxnFingerprintID),
stmtInsight.ID,
fmt.Sprint(stmtInsight.FingerprintID),
stmtInsight.Problem,
stmtInsight.Causes,
stmtInsight.Query,
stmtInsight.Status,
stmtInsight.StartTime,
stmtInsight.EndTime,
stmtInsight.FullScan,
stmtInsight.User,
stmtInsight.ApplicationName,
stmtInsight.UserPriority,
stmtInsight.Database,
stmtInsight.PlanGist,
stmtInsight.Retries,
stmtInsight.AutoRetryReason,
stmtInsight.Nodes,
stmtInsight.IndexRecommendations,
stmtInsight.ImplicitTxn,
stmtInsight.CPUSQLNanos,
stmtInsight.ErrorCode,
stmtInsight.Contention,
// Details column. This column is null for now, but exists in case there is extra information
// we might want to add in the future, this would be the place to easily add without the need for a creation
// of a new column.
tree.DNull,
)
var placeholders []any
for i := 1; i <= columnCount; i++ {
placeholders = append(placeholders, columnCount*insightIdx+i)
}

rows = append(rows, fmt.Sprintf(`($%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v,
$%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v, $%v)`,
placeholders...))
}

insertStmt := fmt.Sprintf(`INSERT INTO obsservice.statement_execution_insights (
timestamp,
org_id,
cluster_id,
Expand Down Expand Up @@ -62,50 +161,25 @@ func (t *StmtInsightsProcessor) Process(
error_code,
contention_time,
details
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
$14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24,
$25, $26, $27, $28, $29, $30, $31
)`

if _, err := db.ExecContext(
ctx,
insertStmt,
stmtInsight.EventInfo.Timestamp,
stmtInsight.EventInfo.OrgID,
stmtInsight.EventInfo.ClusterID,
stmtInsight.EventInfo.TenantID,
stmtInsight.EventInfo.EventID,
stmtInsight.SessionID,
stmtInsight.TransactionID,
fmt.Sprint(stmtInsight.TxnFingerprintID),
stmtInsight.ID,
fmt.Sprint(stmtInsight.FingerprintID),
stmtInsight.Problem,
stmtInsight.Causes,
stmtInsight.Query,
stmtInsight.Status,
stmtInsight.StartTime,
stmtInsight.EndTime,
stmtInsight.FullScan,
stmtInsight.User,
stmtInsight.ApplicationName,
stmtInsight.UserPriority,
stmtInsight.Database,
stmtInsight.PlanGist,
stmtInsight.Retries,
stmtInsight.AutoRetryReason,
stmtInsight.Nodes,
stmtInsight.IndexRecommendations,
stmtInsight.ImplicitTxn,
stmtInsight.CPUSQLNanos,
stmtInsight.ErrorCode,
stmtInsight.Contention,
// Details column. This column is null for now, but exists in case there is extra information
// we might want to add in the future, this would be the place to easily add without the need for a creation
// of a new column.
tree.DNull,
); err != nil {
) VALUES %s`, strings.Join(rows, ", "))

p.mu.lastExportTs = timeutil.Now()
p.mu.insights = nil

return insertStmt, args
}

func (p *StmtInsightsProcessor) exportInsights(
ctx context.Context, insertStmt string, args []interface{},
) error {
db, err := OpenDBSync(ctx, p.SinkPGURL)
// TODO(maryliag): create a pool for the connections
if err != nil {
return err
}
defer db.Close()

if _, err = db.ExecContext(ctx, insertStmt, args...); err != nil {
return err
}

Expand Down

0 comments on commit 8b246d0

Please sign in to comment.