Skip to content

Commit

Permalink
Merge #117537
Browse files Browse the repository at this point in the history
117537: multitenant: check service mode during authorization r=dt a=stevendanna

When in service mode none, we do not expect to receive requests from
virtual cluster SQL nodes.

Note that during an `ALTER VIRTUAL CLUSTER <NAME> STOP SERVICE` we may
reject some requests from the graceful shutdown of a virtual cluster.
We are OK with this at the moment since service for a virtual cluster
is really only stopped if we expect to reset its data.

Epic: none

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Jan 22, 2024
2 parents 3c837c2 + 8a93f6e commit e568bb2
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 117 deletions.
34 changes: 0 additions & 34 deletions pkg/ccl/serverccl/server_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,37 +607,3 @@ func TestServerControllerLoginLogout(t *testing.T) {
require.ElementsMatch(t, []string{"session", "tenant"}, cookieNames)
require.ElementsMatch(t, []string{"", ""}, cookieValues)
}

func TestServiceShutdownUsesGracefulDrain(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
})
defer s.Stopper().Stop(ctx)

drainCh := make(chan struct{})

// Start a shared process server.
_, _, err := s.TenantController().StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantName: "hello",
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
RequireGracefulDrain: true,
DrainReportCh: drainCh,
},
},
})
require.NoError(t, err)

_, err = db.Exec("ALTER TENANT hello STOP SERVICE")
require.NoError(t, err)

// Wait for the server to shut down. This also asserts that the
// graceful drain has occurred.
<-drainCh
}
4 changes: 4 additions & 0 deletions pkg/multitenant/tenantcapabilities/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import (
// Reader provides access to the global tenant capability state. The global
// tenant capability state may be arbitrarily stale.
type Reader interface {
// GetInfo returns the tenant information for the specified tenant.
GetInfo(id roachpb.TenantID) (_ Entry, _ <-chan struct{}, found bool)

// GetCapabilities returns the tenant capabilities for the specified tenant.
GetCapabilities(id roachpb.TenantID) (_ *tenantcapabilitiespb.TenantCapabilities, found bool)

// GetGlobalCapabilityState returns the capability state for all tenants.
GetGlobalCapabilityState() map[roachpb.TenantID]*tenantcapabilitiespb.TenantCapabilities
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiespb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -106,10 +106,13 @@ func (a *Authorizer) HasCapabilityForBatch(
return nil
}

cp, mode := a.getMode(ctx, tenID)
entry, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
return a.capCheckForBatch(ctx, tenID, ba, cp)
if entry.ServiceMode == mtinfopb.ServiceModeNone {
return errors.Newf("operation not allowed when in service mode %q", entry.ServiceMode)
}
return a.capCheckForBatch(ctx, tenID, ba, entry)
case authorizerModeAllowAll:
return nil
case authorizerModeV222:
Expand Down Expand Up @@ -149,7 +152,7 @@ func (a *Authorizer) capCheckForBatch(
ctx context.Context,
tenID roachpb.TenantID,
ba *kvpb.BatchRequest,
cp *tenantcapabilitiespb.TenantCapabilities,
entry tenantcapabilities.Entry,
) error {
for _, ru := range ba.Requests {
request := ru.GetInner()
Expand All @@ -158,7 +161,7 @@ func (a *Authorizer) capCheckForBatch(
continue
}
if !hasCap || requiredCap == onlySystemTenant ||
!tenantcapabilities.MustGetBoolByID(cp, requiredCap) {
!tenantcapabilities.MustGetBoolByID(entry.TenantCapabilities, requiredCap) {
// All allowable request types must be explicitly opted into the
// reqMethodToCap map. If a request type is missing from the map
// (!hasCap), we must be conservative and assume it is
Expand All @@ -175,6 +178,13 @@ func newTenantDoesNotHaveCapabilityError(cap tenantcapabilities.ID, req kvpb.Req
return errors.Newf("client tenant does not have capability %q (%T)", cap, req)
}

var (
errCannotQueryMetadata = errors.New("client tenant does not have capability to query cluster node metadata")
errCannotQueryTSDB = errors.New("client tenant does not have capability to query timeseries data")
errCannotUseNodelocal = errors.New("client tenant does not have capability to use nodelocal storage")
errCannotDebugProcess = errors.New("client tenant does not have capability to debug the process")
)

var reqMethodToCap = map[kvpb.Method]tenantcapabilities.ID{
// The following requests are authorized for all workloads.
kvpb.AddSSTable: noCapCheckNeeded,
Expand Down Expand Up @@ -249,27 +259,24 @@ func (a *Authorizer) HasNodeStatusCapability(ctx context.Context, tenID roachpb.
if tenID.IsSystem() {
return nil
}
errFn := func() error {
return errors.New("client tenant does not have capability to query cluster node metadata")
}
cp, mode := a.getMode(ctx, tenID)
entry, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
break // fallthrough to the next check.
case authorizerModeAllowAll:
return nil
case authorizerModeV222:
return errFn()
return errCannotQueryMetadata
default:
err := errors.AssertionFailedf("unknown authorizer mode: %d", mode)
logcrash.ReportOrPanic(ctx, &a.settings.SV, "%v", err)
return err
}

if !tenantcapabilities.MustGetBoolByID(
cp, tenantcapabilities.CanViewNodeInfo,
entry.TenantCapabilities, tenantcapabilities.CanViewNodeInfo,
) {
return errFn()
return errCannotQueryMetadata
}
return nil
}
Expand All @@ -278,28 +285,25 @@ func (a *Authorizer) HasTSDBQueryCapability(ctx context.Context, tenID roachpb.T
if tenID.IsSystem() {
return nil
}
errFn := func() error {
return errors.Newf("client tenant does not have capability to query timeseries data")
}

cp, mode := a.getMode(ctx, tenID)
entry, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
break // fallthrough to the next check.
case authorizerModeAllowAll:
return nil
case authorizerModeV222:
return errFn()
return errCannotQueryTSDB
default:
err := errors.AssertionFailedf("unknown authorizer mode: %d", mode)
logcrash.ReportOrPanic(ctx, &a.settings.SV, "%v", err)
return err
}

if !tenantcapabilities.MustGetBoolByID(
cp, tenantcapabilities.CanViewTSDBMetrics,
entry.TenantCapabilities, tenantcapabilities.CanViewTSDBMetrics,
) {
return errFn()
return errCannotQueryTSDB
}
return nil
}
Expand All @@ -310,27 +314,24 @@ func (a *Authorizer) HasNodelocalStorageCapability(
if tenID.IsSystem() {
return nil
}
errFn := func() error {
return errors.Newf("client tenant does not have capability to use nodelocal storage")
}
cp, mode := a.getMode(ctx, tenID)
entry, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
break // fallthrough to the next check.
case authorizerModeAllowAll:
return nil
case authorizerModeV222:
return errFn()
return errCannotUseNodelocal
default:
err := errors.AssertionFailedf("unknown authorizer mode: %d", mode)
logcrash.ReportOrPanic(ctx, &a.settings.SV, "%v", err)
return err
}

if !tenantcapabilities.MustGetBoolByID(
cp, tenantcapabilities.CanUseNodelocalStorage,
entry.TenantCapabilities, tenantcapabilities.CanUseNodelocalStorage,
) {
return errFn()
return errCannotUseNodelocal
}
return nil
}
Expand All @@ -340,7 +341,7 @@ func (a *Authorizer) IsExemptFromRateLimiting(ctx context.Context, tenID roachpb
if tenID.IsSystem() {
return true
}
cp, mode := a.getMode(ctx, tenID)
entry, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
break // fallthrough to the next check.
Expand All @@ -354,13 +355,39 @@ func (a *Authorizer) IsExemptFromRateLimiting(ctx context.Context, tenID roachpb
return false
}

return tenantcapabilities.MustGetBoolByID(cp, tenantcapabilities.ExemptFromRateLimiting)
return tenantcapabilities.MustGetBoolByID(entry.TenantCapabilities, tenantcapabilities.ExemptFromRateLimiting)
}

func (a *Authorizer) HasProcessDebugCapability(ctx context.Context, tenID roachpb.TenantID) error {
if tenID.IsSystem() {
return nil
}
entry, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
break // fallthrough to the next check.
case authorizerModeAllowAll:
return nil
case authorizerModeV222:
return errCannotDebugProcess
default:
err := errors.AssertionFailedf("unknown authorizer mode: %d", mode)
logcrash.ReportOrPanic(ctx, &a.settings.SV, "%v", err)
return err
}

if !tenantcapabilities.MustGetBoolByID(
entry.TenantCapabilities, tenantcapabilities.CanDebugProcess,
) {
return errCannotDebugProcess
}
return nil
}

// getMode retrieves the authorization mode.
func (a *Authorizer) getMode(
ctx context.Context, tid roachpb.TenantID,
) (cp *tenantcapabilitiespb.TenantCapabilities, selectedMode authorizerModeType) {
) (entry tenantcapabilities.Entry, selectedMode authorizerModeType) {
// We prioritize what the cluster setting tells us.
selectedMode = authorizerModeType(authorizerMode.Get(&a.settings.SV))

Expand All @@ -380,7 +407,7 @@ func (a *Authorizer) getMode(
} else {
// We have a reader. Did we get data from the rangefeed yet?
var found bool
cp, found = reader.GetCapabilities(tid)
entry, _, found = reader.GetInfo(tid)
if !found {
// No data from the rangefeed yet. Assume caps are still
// unavailable.
Expand All @@ -389,36 +416,8 @@ func (a *Authorizer) getMode(
tid)
selectedMode = authorizerModeV222
}
}
}
return cp, selectedMode
}

func (a *Authorizer) HasProcessDebugCapability(ctx context.Context, tenID roachpb.TenantID) error {
if tenID.IsSystem() {
return nil
}
errFn := func() error {
return errors.New("client tenant does not have capability to debug the process")
}
cp, mode := a.getMode(ctx, tenID)
switch mode {
case authorizerModeOn:
break // fallthrough to the next check.
case authorizerModeAllowAll:
return nil
case authorizerModeV222:
return errFn()
default:
err := errors.AssertionFailedf("unknown authorizer mode: %d", mode)
logcrash.ReportOrPanic(ctx, &a.settings.SV, "%v", err)
return err
}

if !tenantcapabilities.MustGetBoolByID(
cp, tenantcapabilities.CanDebugProcess,
) {
return errFn()
}
}
return nil
return entry, selectedMode
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestDataDriven(t *testing.T) {
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
clusterSettings := cluster.MakeTestingClusterSettings()
ctx := context.Background()
mockReader := mockReader(make(map[roachpb.TenantID]*tenantcapabilitiespb.TenantCapabilities))
mockReader := mockReader(make(map[roachpb.TenantID]*tenantcapabilities.Entry))
authorizer := New(clusterSettings, nil /* TestingKnobs */)
authorizer.BindReader(mockReader)

Expand All @@ -86,17 +86,12 @@ func TestDataDriven(t *testing.T) {
}
switch d.Cmd {
case "upsert":
_, caps, err := tenantcapabilitiestestutils.ParseTenantCapabilityUpsert(t, d)
entry, err := tenantcapabilitiestestutils.ParseTenantCapabilityUpsert(t, d)
if err != nil {
return err.Error()
}
mockReader.updateState([]*tenantcapabilities.Update{
{
Entry: tenantcapabilities.Entry{
TenantID: tenID,
TenantCapabilities: caps,
},
},
{Entry: entry},
})
case "delete":
update := tenantcapabilitiestestutils.ParseTenantCapabilityDelete(t, d)
Expand Down Expand Up @@ -138,7 +133,7 @@ func TestDataDriven(t *testing.T) {
})
}

type mockReader map[roachpb.TenantID]*tenantcapabilitiespb.TenantCapabilities
type mockReader map[roachpb.TenantID]*tenantcapabilities.Entry

var _ tenantcapabilities.Reader = mockReader{}

Expand All @@ -147,22 +142,37 @@ func (m mockReader) updateState(updates []*tenantcapabilities.Update) {
if update.Deleted {
delete(m, update.TenantID)
} else {
m[update.TenantID] = update.TenantCapabilities
m[update.TenantID] = &update.Entry
}
}
}

var unused = make(<-chan struct{})

// GetInfo implements the tenantcapabilities.Reader interface.
func (m mockReader) GetInfo(id roachpb.TenantID) (tenantcapabilities.Entry, <-chan struct{}, bool) {
entry, found := m[id]
if found {
return *entry, unused, found
}
return tenantcapabilities.Entry{}, unused, found
}

// GetCapabilities implements the tenantcapabilities.Reader interface.
func (m mockReader) GetCapabilities(
id roachpb.TenantID,
) (*tenantcapabilitiespb.TenantCapabilities, bool) {
cp, found := m[id]
return cp, found
entry, found := m[id]
return entry.TenantCapabilities, found
}

// GetGlobalCapabilityState implements the tenantcapabilities.Reader interface.
func (m mockReader) GetGlobalCapabilityState() map[roachpb.TenantID]*tenantcapabilitiespb.TenantCapabilities {
return m
ret := make(map[roachpb.TenantID]*tenantcapabilitiespb.TenantCapabilities, len(m))
for id, entry := range m {
ret[id] = entry.TenantCapabilities
}
return ret
}

func TestAllBatchCapsAreBoolean(t *testing.T) {
Expand Down
Loading

0 comments on commit e568bb2

Please sign in to comment.