Skip to content

Commit

Permalink
Sync TiCDC after TiDB (#4171) (#4178)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Sep 3, 2021
1 parent f0d553e commit aac2b9e
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 109 deletions.
14 changes: 7 additions & 7 deletions pkg/controller/tidbcluster/tidb_cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,6 @@ func (c *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster)
return err
}

// - waiting for the pd cluster available(pd cluster is in quorum)
// - create or update ticdc deployment
// - sync ticdc cluster status from pd to TidbCluster object
if err := c.ticdcMemberManager.Sync(tc); err != nil {
return err
}

// works that should be done to make the pd cluster current state match the desired state:
// - create or update the pd service
// - create or update the pd headless service
Expand Down Expand Up @@ -226,6 +219,13 @@ func (c *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster)
return err
}

// - waiting for the pd cluster available(pd cluster is in quorum)
// - create or update ticdc deployment
// - sync ticdc cluster status from pd to TidbCluster object
if err := c.ticdcMemberManager.Sync(tc); err != nil {
return err
}

// syncing the labels from Pod to PVC and PV, these labels include:
// - label.StoreIDLabelKey
// - label.MemberIDLabelKey
Expand Down
9 changes: 3 additions & 6 deletions pkg/manager/member/pd_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,9 @@ func (u *pdUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Stat
if !tc.Status.PD.Synced {
return fmt.Errorf("tidbcluster: [%s/%s]'s pd status sync failed, can not to be upgraded", ns, tcName)
}
if tc.Status.TiCDC.Phase == v1alpha1.UpgradePhase ||
tc.PDScaling() {
klog.Infof("TidbCluster: [%s/%s]'s ticdc status is %v, "+
"pd status is %v, can not upgrade pd",
ns, tcName, tc.Status.TiCDC.Phase,
tc.Status.PD.Phase)
if tc.PDScaling() {
klog.Infof("TidbCluster: [%s/%s]'s pd status is %v, can not upgrade pd",
ns, tcName, tc.Status.PD.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
Expand Down
15 changes: 0 additions & 15 deletions pkg/manager/member/pd_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,21 +258,6 @@ func TestPDUpgraderUpgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(2)))
},
},
{
name: "pd can not upgrade when cdc is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.TiCDC.Phase = v1alpha1.UpgradePhase
tc.Status.PD.Synced = true
},
changePods: nil,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},
}

for i := range tests {
Expand Down
9 changes: 4 additions & 5 deletions pkg/manager/member/pump_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,13 @@ func (m *pumpMemberManager) syncPumpStatefulSetForTidbCluster(tc *v1alpha1.TidbC
}

// Wait for PD & TiKV upgrading done
if tc.Status.TiCDC.Phase == v1alpha1.UpgradePhase ||
tc.Status.TiFlash.Phase == v1alpha1.UpgradePhase ||
if tc.Status.TiFlash.Phase == v1alpha1.UpgradePhase ||
tc.Status.PD.Phase == v1alpha1.UpgradePhase ||
tc.Status.TiKV.Phase == v1alpha1.UpgradePhase {
klog.Infof("TidbCluster: [%s/%s]'s ticdc status is %s, tiflash status is %s, "+
klog.Infof("TidbCluster: [%s/%s]'s tiflash status is %s, "+
"pd status is %s, tikv status is %s, can not upgrade pump",
tc.Namespace, tc.Name, tc.Status.TiCDC.Phase, tc.Status.TiFlash.Phase,
tc.Status.PD.Phase, tc.Status.TiKV.Phase)
tc.Namespace, tc.Name,
tc.Status.TiFlash.Phase, tc.Status.PD.Phase, tc.Status.TiKV.Phase)
return nil
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/manager/member/ticdc_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,9 @@ func (m *ticdcMemberManager) syncTiCDCConfigMap(tc *v1alpha1.TidbCluster, set *a

// Sync fulfills the manager.Manager interface
func (m *ticdcMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()

if tc.Spec.TiCDC == nil {
return nil
}
if tc.Spec.Paused {
klog.Infof("TidbCluster %s/%s is paused, skip syncing ticdc deployment", ns, tcName)
return nil
}

// Sync CDC Headless Service
if err := m.syncCDCHeadlessService(tc); err != nil {
Expand Down Expand Up @@ -158,6 +151,11 @@ func (m *ticdcMemberManager) syncStatefulSet(tc *v1alpha1.TidbCluster) error {
ns, tcName, err)
}

if tc.Spec.Paused {
klog.Infof("TidbCluster %s/%s is paused, skip syncing ticdc statefulset", tc.GetNamespace(), tc.GetName())
return nil
}

cm, err := m.syncTiCDCConfigMap(tc, oldSts)
if err != nil {
return err
Expand Down Expand Up @@ -246,6 +244,11 @@ func (m *ticdcMemberManager) syncTiCDCStatus(tc *v1alpha1.TidbCluster, sts *apps
}

func (m *ticdcMemberManager) syncCDCHeadlessService(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.Infof("TidbCluster %s/%s is paused, skip syncing ticdc service", tc.GetNamespace(), tc.GetName())
return nil
}

ns := tc.GetNamespace()
tcName := tc.GetName()

Expand Down
19 changes: 19 additions & 0 deletions pkg/manager/member/ticdc_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ func (u *ticdcUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulS
ns := tc.GetNamespace()
tcName := tc.GetName()

if tc.Status.PD.Phase == v1alpha1.UpgradePhase ||
tc.Status.TiKV.Phase == v1alpha1.UpgradePhase ||
tc.Status.TiFlash.Phase == v1alpha1.UpgradePhase ||
tc.Status.Pump.Phase == v1alpha1.UpgradePhase ||
tc.Status.TiDB.Phase == v1alpha1.UpgradePhase {
klog.Infof("TidbCluster: [%s/%s]'s pd status is %s, "+
"tikv status is %s, tiflash status is %s, pump status is %s, "+
"tidb status is %s, can not upgrade ticdc",
ns, tcName,
tc.Status.PD.Phase, tc.Status.TiKV.Phase, tc.Status.TiFlash.Phase,
tc.Status.Pump.Phase, tc.Status.TiDB.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
}
newSet.Spec.Template.Spec = *podSpec
return nil
}

tc.Status.TiCDC.Phase = v1alpha1.UpgradePhase
if !templateEqual(newSet, oldSet) {
return nil
Expand Down
95 changes: 95 additions & 0 deletions pkg/manager/member/ticdc_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,101 @@ func TestTiCDCUpgrader_Upgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
{
name: "ticdc can not upgrade when pd is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Phase = v1alpha1.UpgradePhase
tc.Status.TiFlash.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
tc.Status.Pump.Phase = v1alpha1.NormalPhase
tc.Status.TiDB.Phase = v1alpha1.NormalPhase
tc.Status.TiCDC.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
errorExpect: false,
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
},
},
{
name: "ticdc can not upgrade when tiflash is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Phase = v1alpha1.UpgradePhase
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
tc.Status.Pump.Phase = v1alpha1.NormalPhase
tc.Status.TiDB.Phase = v1alpha1.NormalPhase
tc.Status.TiCDC.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
errorExpect: false,
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
},
},
{
name: "ticdc can not upgrade when tikv is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Phase = v1alpha1.UpgradePhase
tc.Status.Pump.Phase = v1alpha1.NormalPhase
tc.Status.TiDB.Phase = v1alpha1.NormalPhase
tc.Status.TiCDC.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
errorExpect: false,
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
},
},
{
name: "ticdc can not upgrade when pump is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
tc.Status.Pump.Phase = v1alpha1.UpgradePhase
tc.Status.TiDB.Phase = v1alpha1.NormalPhase
tc.Status.TiCDC.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
errorExpect: false,
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
},
},
{
name: "ticdc can not upgrade when tidb is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
tc.Status.Pump.Phase = v1alpha1.NormalPhase
tc.Status.TiDB.Phase = v1alpha1.UpgradePhase
tc.Status.TiCDC.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
errorExpect: false,
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(tc.Status.TiCDC.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(1)))
},
},
}

for _, test := range tests {
Expand Down
7 changes: 3 additions & 4 deletions pkg/manager/member/tidb_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ func (u *tidbUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSe
ns := tc.GetNamespace()
tcName := tc.GetName()

if tc.Status.TiCDC.Phase == v1alpha1.UpgradePhase ||
tc.Status.PD.Phase == v1alpha1.UpgradePhase ||
if tc.Status.PD.Phase == v1alpha1.UpgradePhase ||
tc.Status.TiKV.Phase == v1alpha1.UpgradePhase ||
tc.Status.TiFlash.Phase == v1alpha1.UpgradePhase ||
tc.Status.Pump.Phase == v1alpha1.UpgradePhase ||
tc.TiDBScaling() {
klog.Infof("TidbCluster: [%s/%s]'s ticdc status is %s, pd status is %s, "+
klog.Infof("TidbCluster: [%s/%s]'s pd status is %s, "+
"tikv status is %s, tiflash status is %s, pump status is %s, "+
"tidb status is %s, can not upgrade tidb",
ns, tcName, tc.Status.TiCDC.Phase,
ns, tcName,
tc.Status.PD.Phase, tc.Status.TiKV.Phase, tc.Status.TiFlash.Phase,
tc.Status.Pump.Phase, tc.Status.TiDB.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
Expand Down
10 changes: 0 additions & 10 deletions pkg/manager/member/tidb_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,6 @@ func TestTiDBUpgrader_Upgrade(t *testing.T) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
{
name: "cdc is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.TiCDC.Phase = v1alpha1.UpgradePhase
},
getLastAppliedConfigErr: false,
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet) {
g.Expect(newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(pointer.Int32Ptr(1)))
},
},
{
name: "pump is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
Expand Down
9 changes: 4 additions & 5 deletions pkg/manager/member/tiflash_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ func (u *tiflashUpgrader) Upgrade(tc *v1alpha1.TidbCluster, oldSet *apps.Statefu
ns := tc.GetNamespace()
tcName := tc.GetName()

if tc.Status.TiCDC.Phase == v1alpha1.UpgradePhase ||
tc.Status.PD.Phase == v1alpha1.UpgradePhase ||
if tc.Status.PD.Phase == v1alpha1.UpgradePhase ||
tc.TiFlashScaling() {
klog.Infof("TidbCluster: [%s/%s]'s ticdc status is %s, "+
"pd status is %s, tiflash status is %s, can not upgrade tiflash", ns, tcName,
tc.Status.TiCDC.Phase, tc.Status.PD.Phase, tc.Status.TiFlash.Phase)
klog.Infof("TidbCluster: [%s/%s]'s pd status is %s, tiflash status is %s, can not upgrade tiflash",
ns, tcName,
tc.Status.PD.Phase, tc.Status.TiFlash.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
Expand Down
22 changes: 1 addition & 21 deletions pkg/manager/member/tiflash_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,26 +214,6 @@ func TestTiFlashUpgraderUpgrade(t *testing.T) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},
{
name: "tiflash can not upgrade when cdc is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.TiCDC.Phase = v1alpha1.UpgradePhase
tc.Status.TiFlash.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
changePods: nil,
updatePodErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, pods map[string]*corev1.Pod) {
g.Expect(tc.Status.TiFlash.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},
{
name: "tiflash can not upgrade when pd is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
Expand All @@ -257,7 +237,7 @@ func TestTiFlashUpgraderUpgrade(t *testing.T) {
{
name: "get last apply config error",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.TiCDC.Phase = v1alpha1.UpgradePhase
tc.Status.PD.Phase = v1alpha1.UpgradePhase
tc.Status.TiFlash.Phase = v1alpha1.NormalPhase
tc.Status.TiFlash.Synced = true
},
Expand Down
10 changes: 4 additions & 6 deletions pkg/manager/member/tikv_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ func (u *tikvUpgrader) Upgrade(meta metav1.Object, oldSet *apps.StatefulSet, new
var status *v1alpha1.TiKVStatus
switch meta := meta.(type) {
case *v1alpha1.TidbCluster:
if meta.Status.TiCDC.Phase == v1alpha1.UpgradePhase ||
meta.Status.TiFlash.Phase == v1alpha1.UpgradePhase ||
if meta.Status.TiFlash.Phase == v1alpha1.UpgradePhase ||
meta.Status.PD.Phase == v1alpha1.UpgradePhase ||
meta.TiKVScaling() {
klog.Infof("TidbCluster: [%s/%s]'s ticdc status is %v, "+
"tiflash status is %v, pd status is %v, "+
klog.Infof("TidbCluster: [%s/%s]'s tiflash status is %v, pd status is %v, "+
"tikv status is %v, can not upgrade tikv",
ns, tcName, meta.Status.TiCDC.Phase, meta.Status.TiFlash.Phase,
meta.Status.PD.Phase, meta.Status.TiKV.Phase)
ns, tcName,
meta.Status.TiFlash.Phase, meta.Status.PD.Phase, meta.Status.TiKV.Phase)
_, podSpec, err := GetLastAppliedConfig(oldSet)
if err != nil {
return err
Expand Down
22 changes: 0 additions & 22 deletions pkg/manager/member/tikv_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,28 +307,6 @@ func TestTiKVUpgraderUpgrade(t *testing.T) {
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},
{
name: "tikv can not upgrade when cdc is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
tc.Status.TiCDC.Phase = v1alpha1.UpgradePhase
tc.Status.TiKV.Phase = v1alpha1.NormalPhase
tc.Status.TiKV.Synced = true
},
changeOldSet: func(oldSet *apps.StatefulSet) {
SetStatefulSetLastAppliedConfigAnnotation(oldSet)
},
changePods: nil,
beginEvictLeaderErr: false,
endEvictLeaderErr: false,
updatePodErr: false,
errExpectFn: func(g *GomegaWithT, err error) {
g.Expect(err).NotTo(HaveOccurred())
},
expectFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, pods map[string]*corev1.Pod) {
g.Expect(tc.Status.TiKV.Phase).To(Equal(v1alpha1.NormalPhase))
g.Expect(*newSet.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(int32(3)))
},
},
{
name: "tikv can not upgrade when tiflash is upgrading",
changeFn: func(tc *v1alpha1.TidbCluster) {
Expand Down
Loading

0 comments on commit aac2b9e

Please sign in to comment.