Skip to content

Commit

Permalink
set primary with force
Browse files Browse the repository at this point in the history
* part ten, prev. commit: cdf797a

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 31, 2025
1 parent cb5b64d commit 373d60f
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 85 deletions.
2 changes: 1 addition & 1 deletion ais/clustermap.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package ais provides core functionality for the AIStore object storage.
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
*/
package ais

Expand Down
39 changes: 12 additions & 27 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2673,6 +2673,18 @@ func (p *proxy) httpdaeget(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
if checkReady := r.Header.Get(apc.HdrReadyToJoinClu); checkReady != "" {
err := p.pready(smap, true)
if err == nil {
if cmn.GCO.Get().Rebalance.Enabled && smap.CountTargets() > 1 {
err = errors.New(p.String() + ": please disable global rebalance for the duration of the critical (force-join) operation")
}
}
if err != nil {
p.writeErr(w, r, err)
return
}
}
p.writeJSON(w, r, smap, what)
default:
p.htrun.httpdaeget(w, r, query, nil /*htext*/)
Expand Down Expand Up @@ -2834,33 +2846,6 @@ func (p *proxy) httpdaepost(w http.ResponseWriter, r *http.Request) {
}
}

func (p *proxy) smapFromURL(baseURL string) (smap *smapX, err error) {
cargs := allocCargs()
{
cargs.req = cmn.HreqArgs{
Method: http.MethodGet,
Base: baseURL,
Path: apc.URLPathDae.S,
Query: url.Values{apc.QparamWhat: []string{apc.WhatSmap}},
}
cargs.timeout = apc.DefaultTimeout
cargs.cresv = cresjGeneric[smapX]{} // -> smapX
}
res := p.call(cargs, p.owner.smap.get())
if res.err != nil {
err = res.errorf("failed to get Smap from %s", baseURL)
} else {
smap = res.v.(*smapX)
if err = smap.validate(); err != nil {
err = fmt.Errorf("%s: invalid %s from %s: %v", p, smap, baseURL, err)
smap = nil
}
}
freeCargs(cargs)
freeCR(res)
return
}

func (p *proxy) ensureConfigURLs() (config *globalConfig, err error) {
config, err = p.owner.config.modify(&configModifier{pre: p._configURLs})
if err != nil {
Expand Down
160 changes: 110 additions & 50 deletions ais/psetforce.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package ais provides core functionality for the AIStore object storage.
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
*/
package ais

Expand All @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"runtime"
"strings"
"time"

"github.com/NVIDIA/aistore/api/apc"
Expand All @@ -21,16 +22,19 @@ import (
"github.com/NVIDIA/aistore/core/meta"
)

// set new primary, scenarios including joining entire (split-brain) cluster
// Set new primary, including:
// - simple re-designation within the same cluster (b)
// - joining entire (split-brain) cluster (a)
// The latter is advanced usage only!

//
// cluster
// cluster -------------------------------------------------------------------
//

// validation followed by either a) or b) below
func (p *proxy) cluSetPrimary(w http.ResponseWriter, r *http.Request) {
const (
warn = "setting new primary is already in progress"
warnInProgress = "setting new primary is already in progress"
)
apiItems, err := p.parseURL(w, r, apc.URLPathCluProxy.L, 1, false)
if err != nil {
Expand All @@ -42,7 +46,7 @@ func (p *proxy) cluSetPrimary(w http.ResponseWriter, r *http.Request) {
return
}
if !smap.IsPrimary(p.si) {
err := fmt.Errorf("%s is (already?) not primary, please review cluster map for changes (%s) and, possibly, try again", p, smap.StringEx())
err := fmt.Errorf("%s is (already?) not primary, please review cluster map for changes (%s) and try again", p, smap.StringEx())
debug.AssertNoErr(err)
p.writeErr(w, r, err)
return
Expand All @@ -56,20 +60,29 @@ func (p *proxy) cluSetPrimary(w http.ResponseWriter, r *http.Request) {

npsi := smap.GetProxy(npid)

// a) another cluster, with force?
// a) another cluster, with force
if npsi == nil {
query := r.URL.Query()
if !cos.IsParseBool(query.Get(apc.QparamForce)) {
err := &errNodeNotFound{msg: "set-primary failure (force == false):", id: npid, si: p.si, smap: smap}
q := r.URL.Query()
if !cos.IsParseBool(q.Get(apc.QparamForce)) {
msg := fmt.Sprintf("set-primary failure ('%s' query is false):", apc.QparamForce)
err := &errNodeNotFound{msg: msg, id: npid, si: p.si, smap: smap}
p.writeErr(w, r, err, http.StatusNotFound)
return
}
newPrimaryURL := q.Get(apc.QparamPrimaryCandidate)
if newPrimaryURL == "" {
msg := fmt.Sprintf("set-primary failure ('%s' query is empty):", apc.QparamPrimaryCandidate)
err := &errNodeNotFound{msg: msg, id: npid, si: p.si, smap: smap}
p.writeErr(w, r, err, http.StatusNotFound)
return
}
if !p.settingNewPrimary.CAS(false, true) {
p.writeErr(w, r, errors.New(warn+", cannot use force"))
p.writeErr(w, r, errors.New(warnInProgress+", cannot use force"))
return
}
// join with force
p.forceJoin(w, r, npid, query)
// main method, more checks inside
p.forceJoin(w, r, npid, q)
p.settingNewPrimary.Store(false)
return
}
Expand All @@ -83,32 +96,34 @@ func (p *proxy) cluSetPrimary(w http.ResponseWriter, r *http.Request) {
p._setPrimary(w, r, npsi)
p.settingNewPrimary.Store(false)
} else {
nlog.Warningln(warn)
nlog.Warningln(warnInProgress)
}
}

func _checkFlags(npsi *meta.Snode) error {
if npsi.InMaintOrDecomm() {
switch {
case npsi.InMaintOrDecomm():
s := "under maintenance"
if !npsi.InMaint() {
s = "being decommissioned"
}
return fmt.Errorf("%s cannot become a new primary as it is currently %s", npsi, s)
}
if npsi.Flags.IsSet(meta.SnodeNonElectable) {
return fmt.Errorf("%s is non-electable and cannot become a new primary", npsi)
return errors.New(npsi.StringEx() + " cannot become new primary as it is currently " + s)
case npsi.Flags.IsSet(meta.SnodeNonElectable):
return errors.New(npsi.StringEx() + " is non-electable and cannot become new primary")
default:
return nil
}
return nil
}

func (p *proxy) _setPrimary(w http.ResponseWriter, r *http.Request, npsi *meta.Snode) {
//
// (I.1) Prepare phase - inform other nodes.
//
urlPath := apc.URLPathDaeProxy.Join(npsi.ID())
q := make(url.Values, 1)
q.Set(apc.QparamPrepare, "true")
args := allocBcArgs()
var (
urlPath = apc.URLPathDaeProxy.Join(npsi.ID())
q = url.Values{apc.QparamPrepare: []string{"true"}}
args = allocBcArgs()
)
args.req = cmn.HreqArgs{Method: http.MethodPut, Path: urlPath, Query: q}

cluMeta, errM := p.cluMeta(cmetaFillOpt{skipSmap: true, skipPrimeTime: true})
Expand Down Expand Up @@ -182,23 +197,14 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
act = "force-join"
)
// 1. validate args
if p.SID() == npid {
nlog.Warningln(p.String(), "(self) is the", tag, "- nothing to do")
return
}
var (
smap = p.owner.smap.get()
psi = smap.GetProxy(npid)
newPrimaryURL = q.Get(apc.QparamPrimaryCandidate)
nurl = newPrimaryURL
)
if psi == nil && newPrimaryURL == "" {
msg := act + " failure (w/ empty destination URL):"
err := &errNodeNotFound{msg: msg, id: npid, si: p.si, smap: smap}
p.writeErr(w, r, err, http.StatusNotFound)
return
}
if nurl == "" {
// (daeForceJoin => forceJoin) use case must be revisited
nurl = cos.Left(psi.ControlNet.URL, psi.PubNet.URL)
}
if nurl == "" {
Expand All @@ -207,10 +213,16 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
return
}

// 2. get destination Smap (henceforth, nsmap) and validate even more
nsmap, ern := p.smapFromURL(nurl)
// 2. the first inter-cluster network call:
// get and validate destination Smap, henceforth nsmap
nsmap, ern := p._getSmapCheckReady(nurl)
if _destinationNotReady(ern) {
p.writeErr(w, r, ern)
return
}
if ern != nil && psi != nil && psi.PubNet.URL != psi.ControlNet.URL {
nsmap, ern = p.smapFromURL(psi.PubNet.URL)
nlog.Errorln(ern)
nsmap, ern = p._getSmapCheckReady(psi.PubNet.URL)
}
if ern != nil {
err := fmt.Errorf("%s %s to %s[%q, %q]: %s", act, p, tag, npid, newPrimaryURL, ern)
Expand All @@ -228,23 +240,24 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
p.writeErr(w, r, err)
return
}
npsi := nsmap.Primary
npname := npsi.StringEx()

var (
npsi = nsmap.Primary
npname = npsi.StringEx()
)
if npid != npsi.ID() {
err := fmt.Errorf("%s: according to the destination %s %s %s is not _the_ primary", p, nsmap.StringEx(), tag, npname)
p.writeErr(w, r, err)
return
}
if nurl != npsi.ControlNet.URL {
//
// must be reachable via its own `nsmap`, with control-plane network preferable
//
if _, e := p.smapFromURL(npsi.ControlNet.URL); e != nil {
// must be reachable via its own `nsmap`, preferably via control net
if _, e := p._getSmapCheckReady(npsi.ControlNet.URL); e != nil {
nlog.Warningln(tag, npname, "is not reachable via its ctrl URL", npsi.ControlNet.URL)
//
// TODO: require "double-force" or "super-force" flag to proceed
//
if _, e = p.smapFromURL(npsi.PubNet.URL); e != nil {
if _, e = p._getSmapCheckReady(npsi.PubNet.URL); e != nil {
err := fmt.Errorf("%s: %s %s is no reachable via both (pub %q, ctrl %q) URLs", p, tag, npname, npsi.PubNet.URL, npsi.ControlNet.URL)
p.writeErr(w, r, err)
return
Expand Down Expand Up @@ -393,6 +406,44 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
freeCargs(cargs)
}

// TODO: introduce error code or http status code
func _destinationNotReady(err error) bool {
msg := err.Error()
return strings.Contains(msg, "ready") || strings.Contains(msg, "rebalance")
}

func (p *proxy) _getSmapCheckReady(baseURL string) (smap *smapX, err error) {
cargs := allocCargs()
{
cargs.req = cmn.HreqArgs{
Method: http.MethodGet,
Base: baseURL,
Path: apc.URLPathDae.S,
Header: http.Header{apc.HdrReadyToJoinClu: []string{"true"}},
Query: url.Values{apc.QparamWhat: []string{apc.WhatSmap}},
}
cargs.timeout = apc.DefaultTimeout
cargs.cresv = cresjGeneric[smapX]{} // -> smapX
}
res := p.call(cargs, p.owner.smap.get())
if res.err != nil {
err = res.err
if !_destinationNotReady(err) {
err = res.errorf("failed to get Smap from %s", baseURL)
}
} else {
smap = res.v.(*smapX)
if err = smap.validate(); err != nil {
debug.AssertNoErr(err)
err = fmt.Errorf("%s: invalid %s from %s: %v", p, smap, baseURL, err)
smap = nil
}
}
freeCargs(cargs)
freeCR(res)
return
}

// (see _commitForceJoin counterpart)
func (p *proxy) _cluJoinSelf(npsi *meta.Snode, nurl string) error {
joinURL, secondURL := npsi.ControlNet.URL, npsi.PubNet.URL
Expand Down Expand Up @@ -430,9 +481,12 @@ func (p *proxy) _bcastPrepForceJoin(w http.ResponseWriter, r *http.Request, nsma
bargs := allocBcArgs()
{
aimsg := p.newAmsgActVal(apc.ActPrimaryForce, nsmap)
q := make(url.Values, 1)
q.Set(apc.QparamPrepare, "true")
bargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeForceJoin.S, Query: q, Body: cos.MustMarshal(aimsg)}
bargs.req = cmn.HreqArgs{
Method: http.MethodPost,
Path: apc.URLPathDaeForceJoin.S,
Query: url.Values{apc.QparamPrepare: []string{"true"}},
Body: cos.MustMarshal(aimsg),
}
bargs.to = core.AllNodes
}
results := p.bcastGroup(bargs)
Expand All @@ -449,17 +503,21 @@ func (p *proxy) _bcastPrepForceJoin(w http.ResponseWriter, r *http.Request, nsma
return true
}

// (see _commitForceJoin counterpart)
// TODO: rewrite as begin/(commit|abort)
// see _commitForceJoin counterpart
func (p *proxy) _bcastCommitForceJoin(w http.ResponseWriter, r *http.Request, smap *smapX, ncm *cluMeta) bool /*ok*/ {
if smap.CountActiveTs() == 0 && smap.CountActivePs() == 1 {
return true // nothing to do
}
bargs := allocBcArgs()
{
aimsg := p.newAmsgActVal(apc.ActPrimaryForce, ncm)
q := make(url.Values, 1)
q.Set(apc.QparamPrepare, "false") // TODO -- FIXME: begin/commit/abort
bargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeForceJoin.S, Query: q, Body: cos.MustMarshal(aimsg)}
bargs.req = cmn.HreqArgs{
Method: http.MethodPost,
Path: apc.URLPathDaeForceJoin.S,
Query: url.Values{apc.QparamPrepare: []string{"false"}},
Body: cos.MustMarshal(aimsg),
}
bargs.to = core.SelectedNodes
bargs.nodes = make([]meta.NodeMap, 0, 2)
if len(smap.Tmap) > 0 {
Expand Down Expand Up @@ -603,7 +661,7 @@ func (p *proxy) _becomeFinal(ctx *smapModifier, clone *smapX) {
}

//
// all nodes except primary
// all nodes except primary -----------------------------------------------------------------------
//

// (primary forceJoin() calling)
Expand Down Expand Up @@ -742,6 +800,8 @@ func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) {
p.forceJoin(w, r, proxyID, query) // TODO -- FIXME: test
return
}

// regular flow
prepare, err := cos.ParseBool(query.Get(apc.QparamPrepare))
if err != nil {
p.writeErrf(w, r, "failed to parse URL query %q: %v", apc.QparamPrepare, err)
Expand Down
Loading

0 comments on commit 373d60f

Please sign in to comment.