Skip to content

Commit

Permalink
Add primitive logic to read and group transactions together
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Nov 20, 2024
1 parent b33e5d1 commit c9cd0b0
Show file tree
Hide file tree
Showing 14 changed files with 476 additions and 148 deletions.
33 changes: 32 additions & 1 deletion go/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,37 @@ type (
}
)

// ForeachSQLQuery reads a query log file and calls the provided function for each normal SQL query in the log.
// If the query log contains directives, they will be read and queries will be skipped as necessary.
func ForeachSQLQuery(loader IteratorLoader, f func(Query) error) error {
skip := false
for {
query, kontinue := loader.Next()
if !kontinue {
break
}

switch query.Type {
case Skip, Error, VExplain:
skip = true
case Unknown:
return fmt.Errorf("unknown command type: %s", query.Type)
case Comment, CommentWithCommand, EmptyLine, WaitForAuthoritative, SkipIfBelowVersion:
// no-op for keys
case SQLQuery:
if skip {
skip = false
continue
}
if err := f(query); err != nil {
return err
}
}
}

return nil
}

// for a single query, it has some prefix. Prefix mapps to a query type.
// e.g query_vertical maps to typ.Q_QUERY_VERTICAL
func (q *Query) getQueryType(qu string) error {
Expand All @@ -64,7 +95,7 @@ func (q *Query) getQueryType(qu string) error {
if q.Type != CommentWithCommand {
// A query that will sent to vitess
q.Query = qu
q.Type = QueryT
q.Type = SQLQuery
} else {
log.WithFields(log.Fields{"line": q.Line, "command": q.FirstWord, "arguments": q.Query}).Error("invalid command")
return fmt.Errorf("invalid command %s", q.FirstWord)
Expand Down
4 changes: 2 additions & 2 deletions go/data/query_log_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *mysqlLogReaderState) finalizeQuery() Query {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
Type: SQLQuery,
ConnectionID: s.prevConnectionID,
}
s.prevQuery = ""
Expand All @@ -159,7 +159,7 @@ func (s *mysqlLogReaderState) processQuery(matches []string) Query {
query := Query{
Query: s.prevQuery,
Line: s.queryStart,
Type: QueryT,
Type: SQLQuery,
ConnectionID: s.prevConnectionID,
}
s.prevQuery = ""
Expand Down
6 changes: 3 additions & 3 deletions go/data/query_log_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ func TestSmallSnippet(t *testing.T) {
{
Query: "SET GLOBAL log_output = 'FILE'",
Line: 4,
Type: QueryT,
Type: SQLQuery,
ConnectionID: 32,
}, {
Query: "show databases",
Line: 5,
Type: QueryT,
Type: SQLQuery,
ConnectionID: 32,
}, {
Query: `UPDATE _vt.schema_migrations
Expand Down Expand Up @@ -74,7 +74,7 @@ WHERE
)
LIMIT 1`,
Line: 6,
Type: QueryT,
Type: SQLQuery,
ConnectionID: 24,
},
}
Expand Down
4 changes: 2 additions & 2 deletions go/data/typ.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import "strings"
type CmdType int

const (
QueryT CmdType = iota
SQLQuery CmdType = iota
Error
Skip
Unknown
Expand All @@ -37,7 +37,7 @@ const (
)

var commandMap = map[string]CmdType{ //nolint:gochecknoglobals // this is instead of a const
"query": QueryT,
"query": SQLQuery,
"error": Error,
"skip": Skip,
"skip_if_below_version": SkipIfBelowVersion,
Expand Down
4 changes: 2 additions & 2 deletions go/data/vtgate_log_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
return Query{
Query: query,
Line: s.lineNumber,
Type: QueryT,
Type: SQLQuery,
ConnectionID: connectionID,
}, true
}
Expand All @@ -137,7 +137,7 @@ func (s *vtgateLogReaderState) Next() (Query, bool) {
return Query{
Query: parsedQuery,
Line: s.lineNumber,
Type: QueryT,
Type: SQLQuery,
ConnectionID: connectionID,
}, true
}
Expand Down
33 changes: 8 additions & 25 deletions go/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,45 +77,28 @@ func Run(cfg Config) error {
}

func run(out io.Writer, cfg Config) error {
si := &schemaInfo{
tables: make(map[string]columns),
si := &SchemaInfo{
Tables: make(map[string]Columns),
}
ql := &queryList{
queries: make(map[string]*QueryAnalysisResult),
failed: make(map[string]*QueryFailedResult),
}

loader := cfg.Loader.Load(cfg.FileName)
skip := false
for {
query, kontinue := loader.Next()
if !kontinue {
break
}

switch query.Type {
case data.Skip, data.Error, data.VExplain:
skip = true
case data.Unknown:
return fmt.Errorf("unknown command type: %s", query.Type)
case data.Comment, data.CommentWithCommand, data.EmptyLine, data.WaitForAuthoritative, data.SkipIfBelowVersion:
// no-op for keys
case data.QueryT:
if skip {
skip = false
continue
}
process(query, si, ql)
}
}
_ = data.ForeachSQLQuery(loader, func(query data.Query) error {
process(query, si, ql)
return nil
})

closeErr := loader.Close()
jsonWriteErr := ql.writeJSONTo(out)

return errors.Join(closeErr, jsonWriteErr)
}

func process(q data.Query, si *schemaInfo, ql *queryList) {
func process(q data.Query, si *SchemaInfo, ql *queryList) {
ast, bv, err := sqlparser.NewTestParser().Parse2(q.Query)
if err != nil {
ql.addFailedQuery(q, err)
Expand All @@ -130,7 +113,7 @@ func process(q data.Query, si *schemaInfo, ql *queryList) {
}
}

func (ql *queryList) processQuery(si *schemaInfo, ast sqlparser.Statement, q data.Query, bv sqlparser.BindVars) {
func (ql *queryList) processQuery(si *SchemaInfo, ast sqlparser.Statement, q data.Query, bv sqlparser.BindVars) {
// handle panics
defer func() {
if r := recover(); r != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func TestKeys(t *testing.T) {
func TestKeysNonAuthoritativeTable(t *testing.T) {
q := data.Query{
Query: "select id from user where id = 20",
Type: data.QueryT,
Type: data.SQLQuery,
}
si := &schemaInfo{}
si := &SchemaInfo{}
ql := &queryList{
queries: make(map[string]*QueryAnalysisResult),
failed: make(map[string]*QueryFailedResult),
Expand Down
40 changes: 20 additions & 20 deletions go/keys/schemaInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,42 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

var _ semantics.SchemaInformation = (*schemaInfo)(nil)
var _ semantics.SchemaInformation = (*SchemaInfo)(nil)

type (
schemaInfo struct {
ksName string
tables map[string]columns
SchemaInfo struct {
KsName string
Tables map[string]Columns
}

columns []vindexes.Column
Columns []vindexes.Column
)

func (s *schemaInfo) handleCreateTable(create *sqlparser.CreateTable) {
columns := make(columns, 0, len(create.TableSpec.Columns))
func (s *SchemaInfo) handleCreateTable(create *sqlparser.CreateTable) {
columns := make(Columns, 0, len(create.TableSpec.Columns))
for _, col := range create.TableSpec.Columns {
columns = append(columns, vindexes.Column{
Name: col.Name,
Type: col.Type.SQLType(),
})
}
s.tables[create.Table.Name.String()] = columns
s.Tables[create.Table.Name.String()] = columns
}

func (s *schemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodata.TabletType, key.Destination, error) {
func (s *SchemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodata.TabletType, key.Destination, error) {
var tbl *vindexes.Table
ks := tablename.Qualifier.String()
if ks == "" {
ks = s.ksName
ks = s.KsName
}

if !tablename.Qualifier.NotEmpty() || tablename.Qualifier.String() == s.ksName {
if !tablename.Qualifier.NotEmpty() || tablename.Qualifier.String() == s.KsName {
// This is a table from our keyspace. We should be able to find it
columns, found := s.tables[tablename.Name.String()]
columns, found := s.Tables[tablename.Name.String()]
if found {
tbl = &vindexes.Table{
Name: tablename.Name,
Keyspace: &vindexes.Keyspace{Name: s.ksName, Sharded: true},
Keyspace: &vindexes.Keyspace{Name: s.KsName, Sharded: true},
Columns: columns,
ColumnListAuthoritative: true,
}
Expand All @@ -81,30 +81,30 @@ func (s *schemaInfo) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes
return tbl, nil, ks, topodata.TabletType_REPLICA, nil, nil
}

func (s *schemaInfo) ConnCollation() collations.ID {
func (s *SchemaInfo) ConnCollation() collations.ID {
return collations.CollationBinaryID
}

func (s *schemaInfo) Environment() *vtenv.Environment {
func (s *SchemaInfo) Environment() *vtenv.Environment {
return vtenv.NewTestEnv()
}

func (s *schemaInfo) ForeignKeyMode(string) (vschemapb.Keyspace_ForeignKeyMode, error) {
func (s *SchemaInfo) ForeignKeyMode(string) (vschemapb.Keyspace_ForeignKeyMode, error) {
return vschemapb.Keyspace_unmanaged, nil
}

func (s *schemaInfo) GetForeignKeyChecksState() *bool {
func (s *SchemaInfo) GetForeignKeyChecksState() *bool {
return nil
}

func (s *schemaInfo) KeyspaceError(string) error {
func (s *SchemaInfo) KeyspaceError(string) error {
return nil
}

func (s *schemaInfo) GetAggregateUDFs() []string {
func (s *SchemaInfo) GetAggregateUDFs() []string {
return nil // TODO: maybe this should be a flag?
}

func (s *schemaInfo) FindMirrorRule(sqlparser.TableName) (*vindexes.MirrorRule, error) {
func (s *SchemaInfo) FindMirrorRule(sqlparser.TableName) (*vindexes.MirrorRule, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion go/keys/schemaInfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func TestSchemaInfo(t *testing.T) {
parser := sqlparser.NewTestParser()

si := &schemaInfo{tables: make(map[string]columns)}
si := &SchemaInfo{Tables: make(map[string]Columns)}

ast, err := parser.Parse(`CREATE TABLE IF NOT EXISTS warehouse (
w_id INT NOT NULL,
Expand Down
42 changes: 42 additions & 0 deletions go/testdata/small-slow-query-log
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/bin/mysqld, Version: 8.0.26 (Source distribution). started with:
Tcp port: 3306 Unix socket: /tmp/mysql.sock
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
begin;
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
update tblA set apa = 'toto' where foo = 12 and id = 43;
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
update tblB set monkey = 'pippi' where bar = 12 and id = 44;
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
commit;
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
begin;
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
update tblA set apa = 'toto' where foo = 43 and id = 5;
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
update tblB set monkey = 'pippi' where bar = 43 and id = 16;
# Time: 2023-08-01T12:00:01.852861Z
# User@Host: user[user] @ [XXX.XXX.XXX.XXX] Id: 779060
# Query_time: 0.000043 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1690891201;
commit;
28 changes: 28 additions & 0 deletions go/testdata/small-slow-query-log.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[
{
"Queries": [
"update tblA set apa = 'toto' where foo = :1 and id = :2",
"update tblB set monkey = 'pippi' where bar = :1 and id = :3"
],
"Count": 0,
"Predicates": [
"tblA.foo = :1",
"tblA.id = :2",
"tblB.bar = :1",
"tblB.id = :3"
]
},
{
"Queries": [
"update tblA set apa = 'toto' where foo = :1 and id = :2",
"update tblB set monkey = 'pippi' where bar = :1 and id = :3"
],
"Count": 0,
"Predicates": [
"tblA.foo = :1",
"tblA.id = :2",
"tblB.bar = :1",
"tblB.id = :3"
]
}
]
2 changes: 1 addition & 1 deletion go/tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (t *Tester) handleQuery(q data.Query) {
t.prepareVExplain(q.Query)
case data.WaitForAuthoritative:
t.waitAuthoritative(q.Query)
case data.QueryT:
case data.SQLQuery:
if t.vexplain == "" {
t.runQuery(q)
return
Expand Down
Loading

0 comments on commit c9cd0b0

Please sign in to comment.