Skip to content

Commit

Permalink
add replicate meta to store the barrier meta msg
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jan 6, 2025
1 parent 369b60f commit 85f50a1
Show file tree
Hide file tree
Showing 26 changed files with 1,672 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ static-check:
@echo "Running go-lint check:"
@(env bash $(PWD)/scripts/run_go_lint.sh)

CORE_API := DataHandler MessageManager MetaOp Reader ChannelManager TargetAPI Writer FactoryCreator
CORE_API := DataHandler MessageManager MetaOp Reader ChannelManager TargetAPI Writer FactoryCreator ReplicateStore ReplicateMeta
SERVER_API := MetaStore MetaStoreFactory CDCService

generate-mockery:
Expand Down
1 change: 1 addition & 0 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type ReplicateAPIEvent struct {
ReplicateInfo *commonpb.ReplicateInfo
ReplicateParam ReplicateParam
TaskID string
MsgID string
Error error
}

Expand Down
35 changes: 35 additions & 0 deletions core/api/replicate_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* //
* http://www.apache.org/licenses/LICENSE-2.0
* //
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package api

import "context"

type ReplicateStore interface {
Get(ctx context.Context, key string, withPrefix bool) ([]MetaMsg, error)
Put(ctx context.Context, key string, value MetaMsg) error
Remove(ctx context.Context, key string) error
}

type ReplicateMeta interface {
UpdateTaskDropCollectionMsg(ctx context.Context, msg TaskDropCollectionMsg) (bool, error)
GetTaskDropCollectionMsg(ctx context.Context, taskID string, msgID string) ([]TaskDropCollectionMsg, error)
UpdateTaskDropPartitionMsg(ctx context.Context, msg TaskDropPartitionMsg) (bool, error)
GetTaskDropPartitionMsg(ctx context.Context, taskID string, msgID string) ([]TaskDropPartitionMsg, error)
RemoveTaskMsg(ctx context.Context, taskID string, msgID string) error
}
145 changes: 145 additions & 0 deletions core/api/task_msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* //
* http://www.apache.org/licenses/LICENSE-2.0
* //
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package api

import (
"encoding/json"
"fmt"
"sort"

"github.com/cockroachdb/errors"
"github.com/mitchellh/mapstructure"
)

type MetaMsgType int

const (
DropCollectionMetaMsgType MetaMsgType = iota + 1
DropPartitionMetaMsgType
)

type BaseTaskMsg struct {
TaskID string `json:"task_id"`
MsgID string `json:"msg_id"`
TargetChannels []string `json:"target_channels"`
ReadyChannels []string `json:"ready_channels"`
}

func (msg BaseTaskMsg) IsReady() bool {
if len(msg.TargetChannels) != len(msg.ReadyChannels) {
return false
}
sort.Strings(msg.TargetChannels)
sort.Strings(msg.ReadyChannels)
for i := range msg.TargetChannels {
if msg.TargetChannels[i] != msg.ReadyChannels[i] {
return false
}
}
return true
}

type MetaMsg struct {
Base BaseTaskMsg `json:"base"`
Type MetaMsgType `json:"type"`
Data map[string]interface{} `json:"data"`
}

func (msg MetaMsg) ToJSON() (string, error) {
bs, err := json.Marshal(msg)
if err != nil {
return "", err
}
return string(bs), nil
}

type TaskDropCollectionMsg struct {
Base BaseTaskMsg `mapstructure:"-"`
DatabaseName string `mapstructure:"database_name"`
CollectionName string `mapstructure:"collection_name"`
DropTS uint64 `mapstructure:"drop_ts"`
}

func (msg TaskDropCollectionMsg) ConvertToMetaMsg() (MetaMsg, error) {
var m map[string]interface{}
err := mapstructure.Decode(msg, &m)
if err != nil {
return MetaMsg{}, err
}
return MetaMsg{
Base: msg.Base,
Type: DropCollectionMetaMsgType,
Data: m,
}, nil
}

func GetTaskDropCollectionMsg(msg MetaMsg) (TaskDropCollectionMsg, error) {
if msg.Type != DropCollectionMetaMsgType {
return TaskDropCollectionMsg{}, errors.Newf("type %d is not DropCollectionMetaMsg", msg.Type)
}
var m TaskDropCollectionMsg
err := mapstructure.Decode(msg.Data, &m)
if err != nil {
return TaskDropCollectionMsg{}, err
}
m.Base = msg.Base
return m, nil
}

func GetDropCollectionMsgID(collectionID int64) string {
return fmt.Sprintf("drop-collection-%d", collectionID)
}

type TaskDropPartitionMsg struct {
Base BaseTaskMsg `mapstructure:"-"`
DatabaseName string `mapstructure:"database_name"`
CollectionName string `mapstructure:"collection_name"`
PartitionName string `mapstructure:"partition_name"`
DropTS uint64 `mapstructure:"drop_ts"`
}

func (msg TaskDropPartitionMsg) ConvertToMetaMsg() (MetaMsg, error) {
var m map[string]interface{}
err := mapstructure.Decode(msg, &m)
if err != nil {
return MetaMsg{}, err
}
return MetaMsg{
Base: msg.Base,
Type: DropPartitionMetaMsgType,
Data: m,
}, nil
}

func GetTaskDropPartitionMsg(msg MetaMsg) (TaskDropPartitionMsg, error) {
if msg.Type != DropPartitionMetaMsgType {
return TaskDropPartitionMsg{}, errors.Newf("type %d is not DropPartitionMetaMsg", msg.Type)
}
var m TaskDropPartitionMsg
err := mapstructure.Decode(msg.Data, &m)
if err != nil {
return TaskDropPartitionMsg{}, err
}
m.Base = msg.Base
return m, nil
}

func GetDropPartitionMsgID(collectionID int64, partitionID int64) string {
return fmt.Sprintf("drop-partition-%d-%d", collectionID, partitionID)
}
94 changes: 94 additions & 0 deletions core/api/task_msg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* //
* http://www.apache.org/licenses/LICENSE-2.0
* //
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package api

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTaskDropCollectionMsg(t *testing.T) {
dropCollectionMsg := TaskDropCollectionMsg{
Base: BaseTaskMsg{
TaskID: "1001",
MsgID: "1002",
TargetChannels: []string{
"c1", "c2",
},
ReadyChannels: []string{
"c1",
},
},
CollectionName: "test_collection",
DatabaseName: "test_db",
}

metaMsg, err := dropCollectionMsg.ConvertToMetaMsg()
if err != nil {
t.Errorf("TaskDropCollectionMsg.ConvertToMetaMsg() failed: %v", err)
}
assert.Equal(t, DropCollectionMetaMsgType, metaMsg.Type)
assert.Equal(t, "1001", metaMsg.Base.TaskID)
assert.Equal(t, "1002", metaMsg.Base.MsgID)
assert.Equal(t, []string{"c1", "c2"}, metaMsg.Base.TargetChannels)
assert.Equal(t, []string{"c1"}, metaMsg.Base.ReadyChannels)
assert.Equal(t, "test_collection", metaMsg.Data["collection_name"])
assert.Equal(t, "test_db", metaMsg.Data["database_name"])
assert.Equal(t, 3, len(metaMsg.Data))

// test convert to TaskDropCollectionMsg
taskMsg, err := GetTaskDropCollectionMsg(metaMsg)
if err != nil {
t.Errorf("GetTaskDropCollectionMsg() failed: %v", err)
}
assert.Equal(t, "1001", taskMsg.Base.TaskID)
assert.Equal(t, "1002", taskMsg.Base.MsgID)
assert.Equal(t, []string{"c1", "c2"}, taskMsg.Base.TargetChannels)
assert.Equal(t, []string{"c1"}, taskMsg.Base.ReadyChannels)
assert.Equal(t, "test_collection", taskMsg.CollectionName)
assert.Equal(t, "test_db", taskMsg.DatabaseName)
}

func TestMetaMsgToJson(t *testing.T) {
metaMsg := MetaMsg{
Base: BaseTaskMsg{
TaskID: "1001",
MsgID: "1002",
TargetChannels: []string{
"c1", "c2",
},
ReadyChannels: []string{
"c1",
},
},
Type: DropCollectionMetaMsgType,
Data: map[string]interface{}{
"collection_name": "test_collection",
"database_name": "test_db",
},
}

jsonStr, err := metaMsg.ToJSON()
if err != nil {
t.Errorf("MetaMsg.ToJSON() failed: %v", err)
}
assert.NotEmpty(t, jsonStr)
t.Logf("jsonStr = %s", jsonStr)
}
6 changes: 6 additions & 0 deletions core/api/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Writer interface {
HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) ([]byte, []byte, error)
HandleOpMessagePack(ctx context.Context, msgPack *msgstream.MsgPack) ([]byte, error)
RecoveryMetaMsg(ctx context.Context, taskID string) error
}

type DefaultWriter struct{}
Expand All @@ -50,3 +51,8 @@ func (d *DefaultWriter) HandleOpMessagePack(ctx context.Context, msgPack *msgstr
log.Warn("HandleOpMessagePack is not implemented, please check it")
return nil, nil
}

func (d *DefaultWriter) RecoveryMetaMsg(ctx context.Context, taskID string) error {
log.Warn("RecoveryMetaMsg is not implemented, please check it")
return nil
}
1 change: 1 addition & 0 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=
Expand Down
Loading

0 comments on commit 85f50a1

Please sign in to comment.