From c9cd0b05bcdb8b699b847ec4cb89ceb529627dc2 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Wed, 20 Nov 2024 13:59:51 -0600 Subject: [PATCH] Add primitive logic to read and group transactions together Signed-off-by: Florent Poinsard --- go/data/query.go | 33 ++- go/data/query_log_parse.go | 4 +- go/data/query_log_parse_test.go | 6 +- go/data/typ.go | 4 +- go/data/vtgate_log_parse.go | 4 +- go/keys/keys.go | 33 +-- go/keys/keys_test.go | 4 +- go/keys/schemaInfo.go | 40 +-- go/keys/schemaInfo_test.go | 2 +- go/testdata/small-slow-query-log | 42 +++ go/testdata/small-slow-query-log.json | 28 ++ go/tester/tester.go | 2 +- go/transactions/transactions.go | 398 ++++++++++++++++++++------ go/transactions/transactions_test.go | 24 ++ 14 files changed, 476 insertions(+), 148 deletions(-) create mode 100644 go/testdata/small-slow-query-log create mode 100644 go/testdata/small-slow-query-log.json diff --git a/go/data/query.go b/go/data/query.go index 17d4c4e..3fce6e4 100644 --- a/go/data/query.go +++ b/go/data/query.go @@ -53,6 +53,37 @@ type ( } ) +// ForeachSQLQuery reads a query log file and calls the provided function for each normal SQL query in the log. +// If the query log contains directives, they will be read and queries will be skipped as necessary. +func ForeachSQLQuery(loader IteratorLoader, f func(Query) error) error { + skip := false + for { + query, kontinue := loader.Next() + if !kontinue { + break + } + + switch query.Type { + case Skip, Error, VExplain: + skip = true + case Unknown: + return fmt.Errorf("unknown command type: %s", query.Type) + case Comment, CommentWithCommand, EmptyLine, WaitForAuthoritative, SkipIfBelowVersion: + // no-op for keys + case SQLQuery: + if skip { + skip = false + continue + } + if err := f(query); err != nil { + return err + } + } + } + + return nil +} + // for a single query, it has some prefix. Prefix mapps to a query type. // e.g query_vertical maps to typ.Q_QUERY_VERTICAL func (q *Query) getQueryType(qu string) error { @@ -64,7 +95,7 @@ func (q *Query) getQueryType(qu string) error { if q.Type != CommentWithCommand { // A query that will sent to vitess q.Query = qu - q.Type = QueryT + q.Type = SQLQuery } else { log.WithFields(log.Fields{"line": q.Line, "command": q.FirstWord, "arguments": q.Query}).Error("invalid command") return fmt.Errorf("invalid command %s", q.FirstWord) diff --git a/go/data/query_log_parse.go b/go/data/query_log_parse.go index 23cec63..eddab64 100644 --- a/go/data/query_log_parse.go +++ b/go/data/query_log_parse.go @@ -147,7 +147,7 @@ func (s *mysqlLogReaderState) finalizeQuery() Query { query := Query{ Query: s.prevQuery, Line: s.queryStart, - Type: QueryT, + Type: SQLQuery, ConnectionID: s.prevConnectionID, } s.prevQuery = "" @@ -159,7 +159,7 @@ func (s *mysqlLogReaderState) processQuery(matches []string) Query { query := Query{ Query: s.prevQuery, Line: s.queryStart, - Type: QueryT, + Type: SQLQuery, ConnectionID: s.prevConnectionID, } s.prevQuery = "" diff --git a/go/data/query_log_parse_test.go b/go/data/query_log_parse_test.go index b7497d6..22a7f32 100644 --- a/go/data/query_log_parse_test.go +++ b/go/data/query_log_parse_test.go @@ -41,12 +41,12 @@ func TestSmallSnippet(t *testing.T) { { Query: "SET GLOBAL log_output = 'FILE'", Line: 4, - Type: QueryT, + Type: SQLQuery, ConnectionID: 32, }, { Query: "show databases", Line: 5, - Type: QueryT, + Type: SQLQuery, ConnectionID: 32, }, { Query: `UPDATE _vt.schema_migrations @@ -74,7 +74,7 @@ WHERE ) LIMIT 1`, Line: 6, - Type: QueryT, + Type: SQLQuery, ConnectionID: 24, }, } diff --git a/go/data/typ.go b/go/data/typ.go index 20b6282..7fdc6b5 100644 --- a/go/data/typ.go +++ b/go/data/typ.go @@ -21,7 +21,7 @@ import "strings" type CmdType int const ( - QueryT CmdType = iota + SQLQuery CmdType = iota Error Skip Unknown @@ -37,7 +37,7 @@ const ( ) var commandMap = map[string]CmdType{ //nolint:gochecknoglobals // this is instead of a const - "query": QueryT, + "query": SQLQuery, "error": Error, "skip": Skip, "skip_if_below_version": SkipIfBelowVersion, diff --git a/go/data/vtgate_log_parse.go b/go/data/vtgate_log_parse.go index e0ef595..ab89563 100644 --- a/go/data/vtgate_log_parse.go +++ b/go/data/vtgate_log_parse.go @@ -112,7 +112,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) { return Query{ Query: query, Line: s.lineNumber, - Type: QueryT, + Type: SQLQuery, ConnectionID: connectionID, }, true } @@ -137,7 +137,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) { return Query{ Query: parsedQuery, Line: s.lineNumber, - Type: QueryT, + Type: SQLQuery, ConnectionID: connectionID, }, true } diff --git a/go/keys/keys.go b/go/keys/keys.go index bc5b995..8a4116a 100644 --- a/go/keys/keys.go +++ b/go/keys/keys.go @@ -77,8 +77,8 @@ func Run(cfg Config) error { } func run(out io.Writer, cfg Config) error { - si := &schemaInfo{ - tables: make(map[string]columns), + si := &SchemaInfo{ + Tables: make(map[string]Columns), } ql := &queryList{ queries: make(map[string]*QueryAnalysisResult), @@ -86,28 +86,11 @@ func run(out io.Writer, cfg Config) error { } loader := cfg.Loader.Load(cfg.FileName) - skip := false - for { - query, kontinue := loader.Next() - if !kontinue { - break - } - switch query.Type { - case data.Skip, data.Error, data.VExplain: - skip = true - case data.Unknown: - return fmt.Errorf("unknown command type: %s", query.Type) - case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion: - // no-op for keys - case data.QueryT: - if skip { - skip = false - continue - } - process(query, si, ql) - } - } + _ = data.ForeachSQLQuery(loader, func(query data.Query) error { + process(query, si, ql) + return nil + }) closeErr := loader.Close() jsonWriteErr := ql.writeJSONTo(out) @@ -115,7 +98,7 @@ func run(out io.Writer, cfg Config) error { return errors.Join(closeErr, jsonWriteErr) } -func process(q data.Query, si *schemaInfo, ql *queryList) { +func process(q data.Query, si *SchemaInfo, ql *queryList) { ast, bv, err := sqlparser.NewTestParser().Parse2(q.Query) if err != nil { ql.addFailedQuery(q, err) @@ -130,7 +113,7 @@ func process(q data.Query, si *schemaInfo, ql *queryList) { } } -func (ql *queryList) processQuery(si *schemaInfo, ast sqlparser.Statement, q data.Query, bv sqlparser.BindVars) { +func (ql *queryList) processQuery(si *SchemaInfo, ast sqlparser.Statement, q data.Query, bv sqlparser.BindVars) { // handle panics defer func() { if r := recover(); r != nil { diff --git a/go/keys/keys_test.go b/go/keys/keys_test.go index f588634..e644aba 100644 --- a/go/keys/keys_test.go +++ b/go/keys/keys_test.go @@ -82,9 +82,9 @@ func TestKeys(t *testing.T) { func TestKeysNonAuthoritativeTable(t *testing.T) { q := data.Query{ Query: "select id from user where id = 20", - Type: data.QueryT, + Type: data.SQLQuery, } - si := &schemaInfo{} + si := &SchemaInfo{} ql := &queryList{ queries: make(map[string]*QueryAnalysisResult), failed: make(map[string]*QueryFailedResult), diff --git a/go/keys/schemaInfo.go b/go/keys/schemaInfo.go index 5416a8c..417901e 100644 --- a/go/keys/schemaInfo.go +++ b/go/keys/schemaInfo.go @@ -27,42 +27,42 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" ) -var _ semantics.SchemaInformation = (*schemaInfo)(nil) +var _ semantics.SchemaInformation = (*SchemaInfo)(nil) type ( - schemaInfo struct { - ksName string - tables map[string]columns + SchemaInfo struct { + KsName string + Tables map[string]Columns } - columns []vindexes.Column + Columns []vindexes.Column ) -func (s *schemaInfo) handleCreateTable(create *sqlparser.CreateTable) { - columns := make(columns, 0, len(create.TableSpec.Columns)) +func (s *SchemaInfo) handleCreateTable(create *sqlparser.CreateTable) { + columns := make(Columns, 0, len(create.TableSpec.Columns)) for _, col := range create.TableSpec.Columns { columns = append(columns, vindexes.Column{ Name: col.Name, Type: col.Type.SQLType(), }) } - s.tables[create.Table.Name.String()] = columns + s.Tables[create.Table.Name.String()] = columns } -func (s *schemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodata.TabletType, key.Destination, error) { +func (s *SchemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodata.TabletType, key.Destination, error) { var tbl *vindexes.Table ks := tablename.Qualifier.String() if ks == "" { - ks = s.ksName + ks = s.KsName } - if !tablename.Qualifier.NotEmpty() || tablename.Qualifier.String() == s.ksName { + if !tablename.Qualifier.NotEmpty() || tablename.Qualifier.String() == s.KsName { // This is a table from our keyspace. We should be able to find it - columns, found := s.tables[tablename.Name.String()] + columns, found := s.Tables[tablename.Name.String()] if found { tbl = &vindexes.Table{ Name: tablename.Name, - Keyspace: &vindexes.Keyspace{Name: s.ksName, Sharded: true}, + Keyspace: &vindexes.Keyspace{Name: s.KsName, Sharded: true}, Columns: columns, ColumnListAuthoritative: true, } @@ -81,30 +81,30 @@ func (s *schemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes return tbl, nil, ks, topodata.TabletType_REPLICA, nil, nil } -func (s *schemaInfo) ConnCollation() collations.ID { +func (s *SchemaInfo) ConnCollation() collations.ID { return collations.CollationBinaryID } -func (s *schemaInfo) Environment() *vtenv.Environment { +func (s *SchemaInfo) Environment() *vtenv.Environment { return vtenv.NewTestEnv() } -func (s *schemaInfo) ForeignKeyMode(string) (vschemapb.Keyspace_ForeignKeyMode, error) { +func (s *SchemaInfo) ForeignKeyMode(string) (vschemapb.Keyspace_ForeignKeyMode, error) { return vschemapb.Keyspace_unmanaged, nil } -func (s *schemaInfo) GetForeignKeyChecksState() *bool { +func (s *SchemaInfo) GetForeignKeyChecksState() *bool { return nil } -func (s *schemaInfo) KeyspaceError(string) error { +func (s *SchemaInfo) KeyspaceError(string) error { return nil } -func (s *schemaInfo) GetAggregateUDFs() []string { +func (s *SchemaInfo) GetAggregateUDFs() []string { return nil // TODO: maybe this should be a flag? } -func (s *schemaInfo) FindMirrorRule(sqlparser.TableName) (*vindexes.MirrorRule, error) { +func (s *SchemaInfo) FindMirrorRule(sqlparser.TableName) (*vindexes.MirrorRule, error) { return nil, nil } diff --git a/go/keys/schemaInfo_test.go b/go/keys/schemaInfo_test.go index 30ca4fb..77bfa8a 100644 --- a/go/keys/schemaInfo_test.go +++ b/go/keys/schemaInfo_test.go @@ -29,7 +29,7 @@ import ( func TestSchemaInfo(t *testing.T) { parser := sqlparser.NewTestParser() - si := &schemaInfo{tables: make(map[string]columns)} + si := &SchemaInfo{Tables: make(map[string]Columns)} ast, err := parser.Parse(`CREATE TABLE IF NOT EXISTS warehouse ( w_id INT NOT NULL, diff --git a/go/testdata/small-slow-query-log b/go/testdata/small-slow-query-log new file mode 100644 index 0000000..2464cd9 --- /dev/null +++ b/go/testdata/small-slow-query-log @@ -0,0 +1,42 @@ +/bin/mysqld, Version: 8.0.26 (Source distribution). started with: +Tcp port: 3306 Unix socket: /tmp/mysql.sock +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +begin; +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +update tblA set apa = 'toto' where foo = 12 and id = 43; +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +update tblB set monkey = 'pippi' where bar = 12 and id = 44; +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +commit; +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +begin; +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +update tblA set apa = 'toto' where foo = 43 and id = 5; +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +update tblB set monkey = 'pippi' where bar = 43 and id = 16; +# Time: 2023-08-01T12:00:01.852861Z +# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060 +# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0 +SET timestamp=1690891201; +commit; diff --git a/go/testdata/small-slow-query-log.json b/go/testdata/small-slow-query-log.json new file mode 100644 index 0000000..4254e3f --- /dev/null +++ b/go/testdata/small-slow-query-log.json @@ -0,0 +1,28 @@ +[ + { + "Queries": [ + "update tblA set apa = 'toto' where foo = :1 and id = :2", + "update tblB set monkey = 'pippi' where bar = :1 and id = :3" + ], + "Count": 0, + "Predicates": [ + "tblA.foo = :1", + "tblA.id = :2", + "tblB.bar = :1", + "tblB.id = :3" + ] + }, + { + "Queries": [ + "update tblA set apa = 'toto' where foo = :1 and id = :2", + "update tblB set monkey = 'pippi' where bar = :1 and id = :3" + ], + "Count": 0, + "Predicates": [ + "tblA.foo = :1", + "tblA.id = :2", + "tblB.bar = :1", + "tblB.id = :3" + ] + } +] diff --git a/go/tester/tester.go b/go/tester/tester.go index f654a0e..10ff819 100644 --- a/go/tester/tester.go +++ b/go/tester/tester.go @@ -189,7 +189,7 @@ func (t *Tester) handleQuery(q data.Query) { t.prepareVExplain(q.Query) case data.WaitForAuthoritative: t.waitAuthoritative(q.Query) - case data.QueryT: + case data.SQLQuery: if t.vexplain == "" { t.runQuery(q) return diff --git a/go/transactions/transactions.go b/go/transactions/transactions.go index 9004fd6..1c935ed 100644 --- a/go/transactions/transactions.go +++ b/go/transactions/transactions.go @@ -17,11 +17,18 @@ limitations under the License. package transactions import ( + "encoding/json" + "fmt" + "io" + "os" + "strconv" "strings" - "github.com/vitessio/vt/go/data" - "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/semantics" + + "github.com/vitessio/vt/go/data" + "github.com/vitessio/vt/go/keys" ) type ( @@ -31,15 +38,290 @@ type ( } Connection struct { - Transaction []data.Query + Transaction []sqlparser.Statement Autocommit bool } - TxSignature struct{} + TxSignature struct { + Queries []string + Count int + Predicates []predicateInfo + } + + predicateInfo struct { + Table string + Col string + Op sqlparser.ComparisonExprOperator + Val string + } ) +func (pi predicateInfo) String() string { + return fmt.Sprintf("%s.%s %s %s", pi.Table, pi.Col, pi.Op.ToString(), pi.Val) +} + +func (tx TxSignature) MarshalJSON() ([]byte, error) { + // Transform Predicates to an array of strings + predicateStrings := make([]string, len(tx.Predicates)) + for i, predicate := range tx.Predicates { + predicateStrings[i] = predicate.String() + } + + return json.Marshal(struct { + Queries []string + Count int + Predicates []string + }{ + Queries: tx.Queries, + Count: tx.Count, + Predicates: predicateStrings, + }) +} + func Run(cfg Config) { + run(os.Stdout, cfg) +} + +//nolint:funlen,gocognit,gocyclo,cyclop // this is dirty WIP +func run(out io.Writer, cfg Config) { + defaultAutocommit := GetAutocommitGuess(cfg) + transactions := map[int]*Connection{} + + loader := cfg.Loader.Load(cfg.FileName) + ch := make(chan []sqlparser.Statement, 1000) + + _ = data.ForeachSQLQuery(loader, func(query data.Query) error { + stmt, err := sqlparser.NewTestParser().Parse(query.Query) + if err != nil { + fmt.Println(err.Error()) + return nil + } + switch stmt := stmt.(type) { + case *sqlparser.Begin: + case *sqlparser.Commit: + connection := transactions[query.ConnectionID] + ch <- connection.Transaction + connection.Transaction = nil + case *sqlparser.Set: + for _, expr := range stmt.Exprs { + if expr.Var.Name.Lowered() == "autocommit" { + val, ok := expr.Expr.(*sqlparser.Literal) + if !ok { + continue + } + val2 := strings.ToLower(val.Val) + if val2 == "1" || val2 == "on" || val2 == "true" { + transactions[query.ConnectionID].Autocommit = true + } else { + transactions[query.ConnectionID].Autocommit = false + } + } + } + default: + if !sqlparser.IsDMLStatement(stmt) { + return nil + } + connection, ok := transactions[query.ConnectionID] + if !ok { + connection = &Connection{Autocommit: defaultAutocommit} + transactions[query.ConnectionID] = connection + } + if connection.Autocommit { + ch <- []sqlparser.Statement{stmt} + } else { + connection.Transaction = append(connection.Transaction, stmt) + } + } + return nil + }) + + // WIP: dummy data + // TODO: Use real schema information data with the 'vt schema' JSON output + si := &keys.SchemaInfo{ + Tables: map[string]keys.Columns{ + "tblA": { + {Name: sqlparser.NewIdentifierCI("apa")}, + {Name: sqlparser.NewIdentifierCI("foo")}, + {Name: sqlparser.NewIdentifierCI("id")}, + }, + "tblB": { + {Name: sqlparser.NewIdentifierCI("monkey")}, + {Name: sqlparser.NewIdentifierCI("bar")}, + {Name: sqlparser.NewIdentifierCI("id")}, + }, + "user": { + {Name: sqlparser.NewIdentifierCI("id")}, + {Name: sqlparser.NewIdentifierCI("name")}, + }, + "user_extra": { + {Name: sqlparser.NewIdentifierCI("user_id")}, + {Name: sqlparser.NewIdentifierCI("age")}, + }, + }, + } + + var txs []TxSignature +outer: + for { + select { + // TODO: when a transaction has the exact same signature, increment its usage count instead of adding a new one + case queries := <-ch: + var tx TxSignature + idToLiteral := make(map[string]int) + nextID := 1 + for _, query := range queries { + st, err := semantics.Analyze(query, "ks", si) + if err != nil { + panic(err) + } + + switch query := query.(type) { + case *sqlparser.Update: + // Step 0: + // We want to find all the predicates that can impact our vindex choice in the query. + // TODO: Implement more types of predicates, right now only comparisons with 1 column and 1 literal are handled. + // TODO: This whole step can actually be re-used for DELETE. + //nolint:nestif // this is dirty WIP + if query.Where != nil { + // Step 1: + // Find all predicates in the where clause that use a column and a literal + var predicates []predicateInfo + wheres := sqlparser.SplitAndExpression(nil, query.Where.Expr) + for _, where := range wheres { + if cmp, ok := where.(*sqlparser.ComparisonExpr); ok { + lhs, lhsOK := cmp.Left.(*sqlparser.ColName) + rhs, rhsOK := cmp.Right.(*sqlparser.ColName) + + if rhsStr := exprToString(cmp.Right); lhsOK && rhsStr != "" { + predicates = append(predicates, createPredicateInfo(st, lhs, cmp.Operator, rhsStr)) + } + + if lhsStr := exprToString(cmp.Left); rhsOK && lhsStr != "" { + switchedOp, ok := cmp.Operator.SwitchSides() + if ok { + predicates = append(predicates, createPredicateInfo(st, rhs, switchedOp, lhsStr)) + } + } + } + } + + // Step 2: + // Now that we have all the predicates, let's replace their literals with an ID + for i, predicate := range predicates { + id, ok := idToLiteral[predicate.Val] + if !ok { + idToLiteral[predicate.Val] = nextID + id = nextID + nextID++ + } + predicates[i].Val = fmt.Sprintf(":%d", id) + var foundOne bool + for _, txPred := range tx.Predicates { + if txPred == predicate { + foundOne = true + break + } + } + if !foundOne { + tx.Predicates = append(tx.Predicates, predicates[i]) + } + } + } + + // Step 3: + // Normalize the AST our own way: + // - Replace the value in SET by "v" + // - Replace the literals found in where clause comparisons by the corresponding ID we got earlier + normalizedAST := sqlparser.Rewrite(query, func(cursor *sqlparser.Cursor) bool { + switch node := cursor.Node().(type) { + case *sqlparser.SetExpr: + cursor.Replace(&sqlparser.SetExpr{ + Var: node.Var, + Expr: sqlparser.NewArgument("v"), + }) + case *sqlparser.Where: + var newWhere sqlparser.Where + wheres := sqlparser.SplitAndExpression(nil, query.Where.Expr) + for _, where := range wheres { + switch cmp := where.(type) { + case *sqlparser.ComparisonExpr: + lhs, lhsOK := cmp.Left.(*sqlparser.Literal) + rhs, rhsOK := cmp.Right.(*sqlparser.Literal) + if !lhsOK && !rhsOK || lhsOK && rhsOK { + newWhere.Expr = sqlparser.AndExpressions(newWhere.Expr) + continue + } + + var newCmp sqlparser.ComparisonExpr + newCmp.Operator = cmp.Operator + if lhsOK { + id, ok := idToLiteral[lhs.Val] + if !ok { + panic("we must be able to find a corresponding id") + } + newCmp.Left = sqlparser.NewArgument(strconv.Itoa(id)) + newCmp.Right = cmp.Right + } else { + id, ok := idToLiteral[rhs.Val] + if !ok { + panic("we must be able to find a corresponding id") + } + newCmp.Right = sqlparser.NewArgument(strconv.Itoa(id)) + newCmp.Left = cmp.Left + } + newWhere.Expr = sqlparser.AndExpressions(newWhere.Expr, &newCmp) + default: + newWhere.Expr = sqlparser.AndExpressions(newWhere.Expr, where) + } + } + cursor.Replace(&newWhere) + } + return true + }, nil) + tx.Queries = append(tx.Queries, sqlparser.String(normalizedAST)) + default: + panic("not supported for now") + } + } + txs = append(txs, tx) + default: + break outer + } + } + + txsJSON, err := json.MarshalIndent(txs, "", " ") + if err != nil { + panic(err) + } + fmt.Fprintf(out, "%s\n", string(txsJSON)) +} + +func createPredicateInfo(st *semantics.SemTable, expr *sqlparser.ColName, op sqlparser.ComparisonExprOperator, value string) predicateInfo { + tableInfo, err := st.TableInfoForExpr(expr) + if err != nil { + panic(err) + } + table := tableInfo.GetVindexTable() + if table == nil { + panic("table not found") + } + return predicateInfo{ + Table: table.Name.String(), + Col: expr.Name.String(), + Op: op, + Val: value, + } +} + +func exprToString(expr sqlparser.Expr) string { + if v, ok := expr.(*sqlparser.Literal); ok { + return v.Val + } + return "" +} + +func GetAutocommitGuess(cfg Config) bool { // Figure out if autocommit is enabled // If we see: // 1. BEGIN we can assume autocommit is disabled @@ -49,98 +331,36 @@ func Run(cfg Config) { count := 1000 defaultAutocommit := true loader := cfg.Loader.Load(cfg.FileName) - for { + defer func() { + err := loader.Close() + if err != nil { + panic(err.Error()) + } + }() + _ = data.ForeachSQLQuery(loader, func(query data.Query) error { count-- if count == 0 { // enough already. we'll assume autocommit is enabled because that is the default - break - } - query, kontinue := loader.Next() - if !kontinue { - break + return io.EOF } - switch query.Type { - case data.Skip, data.Error, data.VExplain, data.Unknown: - panic("unexpected query type") - case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion: - // no-op for keys - case data.QueryT: - stmt, err := sqlparser.NewTestParser().Parse(query.Query) - if err != nil { - continue - } - switch stmt.(type) { - case *sqlparser.Begin: - defaultAutocommit = false - break - case *sqlparser.Commit: - break - } + stmt, err := sqlparser.NewTestParser().Parse(query.Query) + if err != nil { + fmt.Println(err.Error()) + return nil } - } - err := loader.Close() - if err != nil { - panic(err.Error()) - } - transactions := map[int]*Connection{} - - loader = cfg.Loader.Load(cfg.FileName) - ch := make(chan []data.Query, 1000) - - for { - query, kontinue := loader.Next() - if !kontinue { - break + switch stmt.(type) { + case *sqlparser.Begin: + // BEGIN seen, so autocommit is disabled + return io.EOF + case *sqlparser.Commit: + defaultAutocommit = false + // no BEGIN seen, so autocommit is disabled + return io.EOF } - switch query.Type { - case data.Skip, data.Error, data.VExplain, data.Unknown: - panic("unexpected query type") - case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion: - // no-op for keys - case data.QueryT: - stmt, err := sqlparser.NewTestParser().Parse(query.Query) - if err != nil { - continue - } - switch stmt.(type) { - case *sqlparser.Begin: - case *sqlparser.Commit: - connection := transactions[query.ConnectionID] - ch <- connection.Transaction - connection.Transaction = nil - case *sqlparser.Set: - set := stmt.(*sqlparser.Set) - for _, expr := range set.Exprs { - if expr.Var.Name.Lowered() == "autocommit" { - val, ok := expr.Expr.(*sqlparser.Literal) - if !ok { - continue - } - val2 := strings.ToLower(val.Val) - if val2 == "1" || val2 == "on" || val2 == "true" { - transactions[query.ConnectionID].Autocommit = true - } else { - transactions[query.ConnectionID].Autocommit = false - } - } - } - default: - if sqlparser.IsDMLStatement(stmt) { - connection, ok := transactions[query.ConnectionID] - if !ok { - connection = &Connection{Autocommit: defaultAutocommit} - transactions[query.ConnectionID] = connection - } - if connection.Autocommit { - ch <- []data.Query{query} - } else { - connection.Transaction = append(connection.Transaction, query) - } - } - } - } - } + return nil + }) + return defaultAutocommit } diff --git a/go/transactions/transactions_test.go b/go/transactions/transactions_test.go index ccd1d35..e0eb317 100644 --- a/go/transactions/transactions_test.go +++ b/go/transactions/transactions_test.go @@ -15,3 +15,27 @@ limitations under the License. */ package transactions + +import ( + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/vitessio/vt/go/data" +) + +func TestRun(t *testing.T) { + sb := &strings.Builder{} + run(sb, Config{ + FileName: "../testdata/small-slow-query-log", + Loader: data.SlowQueryLogLoader{}, + }) + + out, err := os.ReadFile("../testdata/small-slow-query-log.json") + require.NoError(t, err) + + assert.Equal(t, string(out), sb.String()) +}