Skip to content

Commit

Permalink
schemastore: fix drop database and add more unit tests (#725)
Browse files Browse the repository at this point in the history
* small refactor

* add unit test framework

* fix bug and add more tests

* add more tests
  • Loading branch information
lidezhu authored Dec 24, 2024
1 parent 4295a64 commit 7cb6ef3
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 215 deletions.
13 changes: 5 additions & 8 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred
}()

g, gctx := errgroup.WithContext(ctx)
cc, err := Connect(gctx, credential, s.store.storeAddr)
conn, err := Connect(gctx, credential, s.store.storeAddr)
if err != nil {
log.Warn("region request worker create grpc stream failed",
zap.Uint64("workerID", s.workerID),
Expand All @@ -166,22 +166,19 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred
return isCanceled()
}
defer func() {
_ = cc.Conn.Close()
_ = conn.Conn.Close()
}()

g.Go(func() error {
return s.receiveAndDispatchChangeEvents(gctx, cc)
return s.receiveAndDispatchChangeEvents(conn)
})
g.Go(func() error { return s.processRegionSendTask(gctx, cc) })
g.Go(func() error { return s.processRegionSendTask(gctx, conn) })
_ = g.Wait()
return isCanceled()
}

// receiveAndDispatchChangeEventsToProcessor receives events from the grpc stream and dispatches them to ds.
func (s *regionRequestWorker) receiveAndDispatchChangeEvents(
ctx context.Context,
conn *ConnAndClient,
) error {
func (s *regionRequestWorker) receiveAndDispatchChangeEvents(conn *ConnAndClient) error {
for {
changeEvent, err := conn.Client.Recv()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func loadAndApplyDDLHistory(
&ddlEvent,
databaseMap,
tableMap,
partitionMap,
tablesDDLHistory,
tableTriggerDDLHistory); err != nil {
log.Panic("updateDDLHistory error", zap.Error(err))
Expand Down
33 changes: 23 additions & 10 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ func (p *persistentStorage) initializeFromDisk() {
}
}

func (p *persistentStorage) close() error {
return p.db.Close()
}

// getAllPhysicalTables returns all physical tables in the snapshot
// caller must ensure current resolve ts is larger than snapTs
func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filter.Filter) ([]commonEvent.Table, error) {
Expand Down Expand Up @@ -664,13 +668,6 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {
}

p.mu.Unlock()
// log.Info("handle resolved ddl event",
// zap.Int64("schemaID", ddlEvent.CurrentSchemaID),
// zap.Int64("tableID", ddlEvent.CurrentTableID),
// zap.Uint64("finishedTs", ddlEvent.FinishedTs),
// zap.Int64("schemaVersion", ddlEvent.SchemaVersion),
// zap.String("ddlType", model.ActionType(ddlEvent.Type).String()),
// zap.String("query", ddlEvent.Query))

// Note: need write ddl event to disk before update ddl history,
// becuase other goroutines may read ddl events from disk according to ddl history
Expand All @@ -685,6 +682,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {
&ddlEvent,
p.databaseMap,
p.tableMap,
p.partitionMap,
p.tablesDDLHistory,
p.tableTriggerDDLHistory); err != nil {
p.mu.Unlock()
Expand Down Expand Up @@ -792,8 +790,16 @@ func buildPersistedDDLEventFromJob(
return tableInfo.SchemaID
}

// TODO: handle err
query, _ := transformDDLJobQuery(job)
var query string
// only in unit test, job.Query is empty
if job.Query != "" {
var err error
query, err = transformDDLJobQuery(job)
if err != nil {
log.Panic("transformDDLJobQuery failed",
zap.Error(err))
}
}

event := PersistedDDLEvent{
ID: job.ID,
Expand Down Expand Up @@ -970,6 +976,7 @@ func updateDDLHistory(
ddlEvent *PersistedDDLEvent,
databaseMap map[int64]*BasicDatabaseInfo,
tableMap map[int64]*BasicTableInfo,
partitionMap map[int64]BasicPartitionInfo,
tablesDDLHistory map[int64][]uint64,
tableTriggerDDLHistory []uint64,
) ([]uint64, error) {
Expand All @@ -988,7 +995,13 @@ func updateDDLHistory(
case model.ActionDropSchema:
tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs)
for tableID := range databaseMap[ddlEvent.CurrentSchemaID].Tables {
appendTableHistory(tableID)
if partitionInfo, ok := partitionMap[tableID]; ok {
for id := range partitionInfo {
appendTableHistory(id)
}
} else {
appendTableHistory(tableID)
}
}
case model.ActionCreateTable, model.ActionRecoverTable,
model.ActionDropTable:
Expand Down
Loading

0 comments on commit 7cb6ef3

Please sign in to comment.