Skip to content

Commit

Permalink
feat: add row listener + fix logs
Browse files Browse the repository at this point in the history
  • Loading branch information
vitorsalgado committed Feb 20, 2024
1 parent 181901d commit 464dd74
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 19 deletions.
89 changes: 89 additions & 0 deletions even_handler_row_only.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"context"
"log/slog"

"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)

var _ canal.EventHandler = (*RowOnlyEventHandler)(nil)

type RowOnlyEventHandler struct {
logger *slog.Logger
}

func (e *RowOnlyEventHandler) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
return nil
}

func (e *RowOnlyEventHandler) OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error {
return nil
}

func (e *RowOnlyEventHandler) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error {
return nil
}

func (e *RowOnlyEventHandler) OnRawEvent(event *replication.BinlogEvent) error {
return nil
}

func (e *RowOnlyEventHandler) OnRotate(header *replication.EventHeader, r *replication.RotateEvent) error {
return nil
}

func (e *RowOnlyEventHandler) OnRow(evt *canal.RowsEvent) error {
ctx := context.Background()
attr := make([]slog.Attr, 0)
attr = append(attr,
slog.String("event_type", evt.Header.EventType.String()),
slog.Any("rows", evt.Rows),
slog.String("action", evt.Action))

if evt.Table == nil {
e.logger.LogAttrs(ctx, slog.LevelInfo, "OnRow", attr...)
return nil
}

columns := make([]slog.Attr, len(evt.Table.Columns))
for _, v := range evt.Table.Columns {
columns = append(columns,
slog.String("name", v.Name),
slog.String("raw_type", v.RawType))
}

attr = append(attr,
slog.String("table", evt.Table.String()),
slog.Group("columns", slog.Attr{Value: slog.GroupValue(columns...)}))

e.logger.LogAttrs(ctx, slog.LevelInfo, "OnRow", attr...)

return nil
}

func (e *RowOnlyEventHandler) OnTableChanged(header *replication.EventHeader, schema string, table string) error {
e.logger.Info("OnTableChanged",
slog.String("event_type", header.EventType.String()),
slog.String("table", table),
slog.String("schema", schema))

return nil
}

func (e *RowOnlyEventHandler) OnUnmarshal(data []byte) (interface{}, error) {
e.logger.Info("OnUnmarshal",
slog.String("data", string(data)))

return nil, nil
}

func (e *RowOnlyEventHandler) OnXID(header *replication.EventHeader, pos mysql.Position) error {
return nil
}

func (e *RowOnlyEventHandler) String() string {
return "binL.RowOnlyEventHandler"
}
8 changes: 5 additions & 3 deletions event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"log/slog"

"github.com/go-mysql-org/go-mysql/canal"
Expand Down Expand Up @@ -67,14 +68,15 @@ func (e *EventHandler) OnRotate(header *replication.EventHeader, r *replication.
}

func (e *EventHandler) OnRow(evt *canal.RowsEvent) error {
ctx := context.Background()
attr := make([]slog.Attr, 0)
attr = append(attr,
slog.String("event_type", evt.Header.EventType.String()),
slog.Any("rows", evt.Rows),
slog.String("action", evt.Action))

if evt.Table == nil {
e.logger.Info("OnRow", attr)
e.logger.LogAttrs(ctx, slog.LevelInfo, "OnRow", attr...)
return nil
}

Expand All @@ -87,9 +89,9 @@ func (e *EventHandler) OnRow(evt *canal.RowsEvent) error {

attr = append(attr,
slog.String("table", evt.Table.String()),
slog.Group("columns", columns))
slog.Group("columns", slog.Attr{Value: slog.GroupValue(columns...)}))

e.logger.Info("OnRow", attr)
e.logger.LogAttrs(ctx, slog.LevelInfo, "OnRow", attr...)

return nil
}
Expand Down
24 changes: 12 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func main() {
defer cancel()

var db *sql.DB
limit := 3
backoff := 2 * time.Second
limit := 5
backoff := 3 * time.Second
factor := 2
for i := 0; i < limit; i++ {
if i > 0 {
Expand Down Expand Up @@ -56,25 +56,25 @@ func main() {
return
}

cfg := canal.NewDefaultConfig()
cfg.Addr = conf.CanalAddr
cfg.User = conf.CanalUser
cfg.Password = conf.CanalPassword
cfg.ServerID = 1
cfg.Flavor = "mysql"
cfg.Dump = canal.DumpConfig{}
cfg.IncludeTableRegex = conf.CanalTableRegex
canalConf := canal.NewDefaultConfig()
canalConf.Dump = canal.DumpConfig{ExecutionPath: ""}
canalConf.Addr = conf.CanalAddr
canalConf.User = conf.CanalUser
canalConf.Password = conf.CanalPassword
canalConf.ServerID = 1
canalConf.Flavor = "mysql"
canalConf.IncludeTableRegex = conf.CanalTableRegex

logger.Info(fmt.Sprintf("connecting to the database on: %s", conf.ConnString))

can, err := canal.NewCanal(cfg)
can, err := canal.NewCanal(canalConf)
if err != nil {
logger.Error("error creating the canal", slog.String("error", err.Error()))
os.Exit(1)
return
}

can.SetEventHandler(&EventHandler{logger})
can.SetEventHandler(&RowOnlyEventHandler{logger})

go func() {
<-ctx.Done()
Expand Down
28 changes: 24 additions & 4 deletions scripts/mysql/1_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,30 @@ CREATE DATABASE IF NOT EXISTS binl;
USE binl;

CREATE TABLE IF NOT EXISTS metrics (
id INT NOT NULL AUTO_INCREMENT,
`description` VARCHAR(50) NULL,
`value` DOUBLE NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
id int not null AUTO_INCREMENT,
`description` varchar(50) null,
`value` DOUBLE not null,
created_at timestamp not null default CURRENT_TIMESTAMP,

PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS customers (
id int not null AUTO_INCREMENT,
`name` varchar(100) null,
created_at timestamp not null default CURRENT_TIMESTAMP,

PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS customer_addresses (
id int not null AUTO_INCREMENT,
customer_id int not null,
city varchar(100) not null,

PRIMARY KEY (id, customer_id),
INDEX idx_customer_id (customer_id),
FOREIGN KEY (customer_id)
REFERENCES customers(id)
ON DELETE CASCADE
);

0 comments on commit 464dd74

Please sign in to comment.