diff --git a/go/transactions/transactions.go b/go/transactions/transactions.go index eb4e004..9004fd6 100644 --- a/go/transactions/transactions.go +++ b/go/transactions/transactions.go @@ -17,7 +17,10 @@ limitations under the License. package transactions import ( + "strings" + "github.com/vitessio/vt/go/data" + "vitess.io/vitess/go/vt/sqlparser" ) @@ -28,16 +31,12 @@ type ( } Connection struct { - // The connection ID - ID int - - buf []data.Query + Transaction []data.Query Autocommit bool } - TxSignature struct { - } + TxSignature struct{} ) func Run(cfg Config) { @@ -85,10 +84,63 @@ func Run(cfg Config) { panic(err.Error()) } - connections := map[int]*Connection{} + transactions := map[int]*Connection{} loader = cfg.Loader.Load(cfg.FileName) + ch := make(chan []data.Query, 1000) + for { + query, kontinue := loader.Next() + if !kontinue { + break + } + switch query.Type { + case data.Skip, data.Error, data.VExplain, data.Unknown: + panic("unexpected query type") + case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion: + // no-op for keys + case data.QueryT: + stmt, err := sqlparser.NewTestParser().Parse(query.Query) + if err != nil { + continue + } + switch stmt.(type) { + case *sqlparser.Begin: + case *sqlparser.Commit: + connection := transactions[query.ConnectionID] + ch <- connection.Transaction + connection.Transaction = nil + case *sqlparser.Set: + set := stmt.(*sqlparser.Set) + for _, expr := range set.Exprs { + if expr.Var.Name.Lowered() == "autocommit" { + val, ok := expr.Expr.(*sqlparser.Literal) + if !ok { + continue + } + val2 := strings.ToLower(val.Val) + if val2 == "1" || val2 == "on" || val2 == "true" { + transactions[query.ConnectionID].Autocommit = true + } else { + transactions[query.ConnectionID].Autocommit = false + } + } + } + default: + if sqlparser.IsDMLStatement(stmt) { + connection, ok := transactions[query.ConnectionID] + if !ok { + connection = &Connection{Autocommit: defaultAutocommit} + transactions[query.ConnectionID] = connection + } + if connection.Autocommit { + ch <- []data.Query{query} + } else { + connection.Transaction = append(connection.Transaction, query) + } + } + } + } } }