diff --git a/states/scan_binlog.go b/states/scan_binlog.go index 6e7192a..ad9ecf3 100644 --- a/states/scan_binlog.go +++ b/states/scan_binlog.go @@ -175,8 +175,8 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara err = s.scanBinlogs(pkObject, fieldObjects, func(pk storage.PrimaryKey, offset int, values map[int64]any) error { pkv := pk.GetValue() + ts := values[1].(int64) if !p.IgnoreDelete { - ts := values[1].(int64) if deletedRecords[pkv] > uint64(ts) { return nil } @@ -188,6 +188,8 @@ func (s *InstanceState) ScanBinlogCommand(ctx context.Context, p *ScanBinlogPara env := lo.MapKeys(values, func(_ any, fid int64) string { return fields[fid].Name }) + env["$pk"] = pkv + env["$timestamp"] = ts program, err := expr.Compile(p.Expr, expr.Env(env)) if err != nil { return err diff --git a/states/scan_deltalog.go b/states/scan_deltalog.go index 1807ba8..b3aaf79 100644 --- a/states/scan_deltalog.go +++ b/states/scan_deltalog.go @@ -132,8 +132,8 @@ func (s *InstanceState) ScanDeltalogCommand(ctx context.Context, p *ScanDeltalog }() if len(p.Expr) != 0 { env := map[string]any{ - "pk": pk.GetValue(), - "ts": ts, + "$pk": pk.GetValue(), + "$timestamp": ts, } program, err := expr.Compile(p.Expr, expr.Env(env)) if err != nil {