Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Nov 20, 2024
1 parent b33e5d1 commit 06118b4
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 79 deletions.
33 changes: 32 additions & 1 deletion go/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,37 @@ type (
}
)

// ForeachSQLQuery reads a query log file and calls the provided function for each normal SQL query in the log.
// If the query log contains directives, they will be read and queries will be skipped as necessary.
func ForeachSQLQuery(loader IteratorLoader, f func(Query) error) error {
skip := false
for {
query, kontinue := loader.Next()
if !kontinue {
break
}

switch query.Type {
case Skip, Error, VExplain:
skip = true
case Unknown:
return fmt.Errorf("unknown command type: %s", query.Type)
case Comment, CommentWithCommand, EmptyLine, WaitForAuthoritative, SkipIfBelowVersion:
// no-op for keys
case SQLQuery:
if skip {
skip = false
continue
}
if err := f(query); err != nil {
return err
}
}
}

return nil
}

// for a single query, it has some prefix. Prefix mapps to a query type.
// e.g query_vertical maps to typ.Q_QUERY_VERTICAL
func (q *Query) getQueryType(qu string) error {
Expand All @@ -64,7 +95,7 @@ func (q *Query) getQueryType(qu string) error {
if q.Type != CommentWithCommand {
// A query that will sent to vitess
q.Query = qu
q.Type = QueryT
q.Type = SQLQuery
} else {
log.WithFields(log.Fields{"line": q.Line, "command": q.FirstWord, "arguments": q.Query}).Error("invalid command")
return fmt.Errorf("invalid command %s", q.FirstWord)
Expand Down
4 changes: 2 additions & 2 deletions go/data/query_log_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *mysqlLogReaderState) finalizeQuery() Query {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
Type: SQLQuery,
ConnectionID: s.prevConnectionID,
}
s.prevQuery = ""
Expand All @@ -159,7 +159,7 @@ func (s *mysqlLogReaderState) processQuery(matches []string) Query {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
Type: SQLQuery,
ConnectionID: s.prevConnectionID,
}
s.prevQuery = ""
Expand Down
6 changes: 3 additions & 3 deletions go/data/query_log_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ func TestSmallSnippet(t *testing.T) {
{
Query: "SET GLOBAL log_output = 'FILE'",
Line: 4,
Type: QueryT,
Type: SQLQuery,
ConnectionID: 32,
}, {
Query: "show databases",
Line: 5,
Type: QueryT,
Type: SQLQuery,
ConnectionID: 32,
}, {
Query: `UPDATE _vt.schema_migrations
Expand Down Expand Up @@ -74,7 +74,7 @@ WHERE
)
LIMIT 1`,
Line: 6,
Type: QueryT,
Type: SQLQuery,
ConnectionID: 24,
},
}
Expand Down
4 changes: 2 additions & 2 deletions go/data/typ.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import "strings"
type CmdType int

const (
QueryT CmdType = iota
SQLQuery CmdType = iota
Error
Skip
Unknown
Expand All @@ -37,7 +37,7 @@ const (
)

var commandMap = map[string]CmdType{ //nolint:gochecknoglobals // this is instead of a const
"query": QueryT,
"query": SQLQuery,
"error": Error,
"skip": Skip,
"skip_if_below_version": SkipIfBelowVersion,
Expand Down
4 changes: 2 additions & 2 deletions go/data/vtgate_log_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
return Query{
Query: query,
Line: s.lineNumber,
Type: QueryT,
Type: SQLQuery,
ConnectionID: connectionID,
}, true
}
Expand All @@ -137,7 +137,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
return Query{
Query: parsedQuery,
Line: s.lineNumber,
Type: QueryT,
Type: SQLQuery,
ConnectionID: connectionID,
}, true
}
Expand Down
25 changes: 4 additions & 21 deletions go/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,11 @@ func run(out io.Writer, cfg Config) error {
}

loader := cfg.Loader.Load(cfg.FileName)
skip := false
for {
query, kontinue := loader.Next()
if !kontinue {
break
}

switch query.Type {
case data.Skip, data.Error, data.VExplain:
skip = true
case data.Unknown:
return fmt.Errorf("unknown command type: %s", query.Type)
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
// no-op for keys
case data.QueryT:
if skip {
skip = false
continue
}
process(query, si, ql)
}
}
_ = data.ForeachSQLQuery(loader, func(query data.Query) error {
process(query, si, ql)
return nil
})

closeErr := loader.Close()
jsonWriteErr := ql.writeJSONTo(out)
Expand Down
2 changes: 1 addition & 1 deletion go/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestKeys(t *testing.T) {
func TestKeysNonAuthoritativeTable(t *testing.T) {
q := data.Query{
Query: "select id from user where id = 20",
Type: data.QueryT,
Type: data.SQLQuery,
}
si := &schemaInfo{}
ql := &queryList{
Expand Down
2 changes: 1 addition & 1 deletion go/tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (t *Tester) handleQuery(q data.Query) {
t.prepareVExplain(q.Query)
case data.WaitForAuthoritative:
t.waitAuthoritative(q.Query)
case data.QueryT:
case data.SQLQuery:
if t.vexplain == "" {
t.runQuery(q)
return
Expand Down
93 changes: 47 additions & 46 deletions go/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package transactions

import (
"io"
"strings"

"github.com/vitessio/vt/go/data"
Expand All @@ -40,53 +41,10 @@ type (
)

func Run(cfg Config) {
// Figure out if autocommit is enabled
// If we see:
// 1. BEGIN we can assume autocommit is disabled
// 2. COMMIT and no BEGIN we can assume autocommit is enabled
// 3. ROLLBACK and no BEGIN we can assume autocommit is enabled
// 4. SET autocommit = 1/0
count := 1000
defaultAutocommit := true
loader := cfg.Loader.Load(cfg.FileName)
for {
count--
if count == 0 {
// enough already. we'll assume autocommit is enabled because that is the default
break
}
query, kontinue := loader.Next()
if !kontinue {
break
}

switch query.Type {
case data.Skip, data.Error, data.VExplain, data.Unknown:
panic("unexpected query type")
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
// no-op for keys
case data.QueryT:
stmt, err := sqlparser.NewTestParser().Parse(query.Query)
if err != nil {
continue
}
switch stmt.(type) {
case *sqlparser.Begin:
defaultAutocommit = false
break
case *sqlparser.Commit:
break
}
}
}
err := loader.Close()
if err != nil {
panic(err.Error())
}

defaultAutocommit := GetAutocommitGuess(cfg)
transactions := map[int]*Connection{}

loader = cfg.Loader.Load(cfg.FileName)
loader := cfg.Loader.Load(cfg.FileName)
ch := make(chan []data.Query, 1000)

for {
Expand All @@ -100,7 +58,7 @@ func Run(cfg Config) {
panic("unexpected query type")
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
// no-op for keys
case data.QueryT:
case data.SQLQuery:
stmt, err := sqlparser.NewTestParser().Parse(query.Query)
if err != nil {
continue
Expand Down Expand Up @@ -144,3 +102,46 @@ func Run(cfg Config) {
}
}
}

func GetAutocommitGuess(cfg Config) bool {
// Figure out if autocommit is enabled
// If we see:
// 1. BEGIN we can assume autocommit is disabled
// 2. COMMIT and no BEGIN we can assume autocommit is enabled
// 3. ROLLBACK and no BEGIN we can assume autocommit is enabled
// 4. SET autocommit = 1/0
count := 1000
defaultAutocommit := true
loader := cfg.Loader.Load(cfg.FileName)
defer func() {
err := loader.Close()
if err != nil {
panic(err.Error())
}
}()
_ = data.ForeachSQLQuery(loader, func(query data.Query) error {
count--
if count == 0 {
// enough already. we'll assume autocommit is enabled because that is the default
return io.EOF
}

stmt, err := sqlparser.NewTestParser().Parse(query.Query)
if err != nil {
return nil
}

switch stmt.(type) {
case *sqlparser.Begin:
// BEGIN seen, so autocommit is disabled
return io.EOF
case *sqlparser.Commit:
defaultAutocommit = false
// no BEGIN seen, so autocommit is disabled
return io.EOF
}

return nil
})
return defaultAutocommit
}

0 comments on commit 06118b4

Please sign in to comment.