Skip to content

Commit

Permalink
Add logic for reading query log, batching queries related to a transa…
Browse files Browse the repository at this point in the history
…ction, handle autocommit default/explicit setting. Queries sent to a channgel. No tests

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Nov 20, 2024
1 parent 8ce83b7 commit e974ee8
Showing 1 changed file with 59 additions and 7 deletions.
66 changes: 59 additions & 7 deletions go/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package transactions

import (
"strings"

"github.com/vitessio/vt/go/data"

"vitess.io/vitess/go/vt/sqlparser"
)

Expand All @@ -28,16 +31,12 @@ type (
}

Connection struct {
// The connection ID
ID int

buf []data.Query
Transaction []data.Query

Autocommit bool
}

TxSignature struct {
}
TxSignature struct{}
)

func Run(cfg Config) {
Expand Down Expand Up @@ -85,10 +84,63 @@ func Run(cfg Config) {
panic(err.Error())
}

connections := map[int]*Connection{}
transactions := map[int]*Connection{}

loader = cfg.Loader.Load(cfg.FileName)
ch := make(chan []data.Query, 1000)

for {
query, kontinue := loader.Next()
if !kontinue {
break
}

switch query.Type {
case data.Skip, data.Error, data.VExplain, data.Unknown:
panic("unexpected query type")
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
// no-op for keys
case data.QueryT:
stmt, err := sqlparser.NewTestParser().Parse(query.Query)
if err != nil {
continue
}
switch stmt.(type) {
case *sqlparser.Begin:
case *sqlparser.Commit:
connection := transactions[query.ConnectionID]
ch <- connection.Transaction
connection.Transaction = nil
case *sqlparser.Set:
set := stmt.(*sqlparser.Set)
for _, expr := range set.Exprs {
if expr.Var.Name.Lowered() == "autocommit" {
val, ok := expr.Expr.(*sqlparser.Literal)
if !ok {
continue
}
val2 := strings.ToLower(val.Val)
if val2 == "1" || val2 == "on" || val2 == "true" {
transactions[query.ConnectionID].Autocommit = true
} else {
transactions[query.ConnectionID].Autocommit = false
}
}
}
default:
if sqlparser.IsDMLStatement(stmt) {
connection, ok := transactions[query.ConnectionID]
if !ok {
connection = &Connection{Autocommit: defaultAutocommit}
transactions[query.ConnectionID] = connection
}
if connection.Autocommit {
ch <- []data.Query{query}
} else {
connection.Transaction = append(connection.Transaction, query)
}
}
}
}
}
}

0 comments on commit e974ee8

Please sign in to comment.