Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: introduce max_user_connections #59197

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(241), session.CurrentBootstrapVersion)
require.Equal(t, int64(242), session.CurrentBootstrapVersion)
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2961,6 +2961,11 @@ error = '''
Aborted connection %d to db: '%-.192s' user: '%-.48s' host: '%-.255s' (%-.64s)
'''

["server:1203"]
error = '''
User %-.64s has exceeded the 'max_user_connections' resource
'''

["server:1251"]
error = '''
Client does not support authentication protocol requested by server; consider upgrading MySQL client
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
google.golang.org/protobuf v1.36.1
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.29.11 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrCrashedOnRepair: mysql.Message("Table '%-.192s' is marked as crashed and last (automatic?) repair failed", nil),
ErrWarningNotCompleteRollback: mysql.Message("Some non-transactional changed tables couldn't be rolled back", nil),
ErrTransCacheFull: mysql.Message("Multi-statement transaction required more than 'maxBinlogCacheSize' bytes of storage; increase this mysqld variable and try again", nil),
ErrTooManyUserConnections: mysql.Message("User %-.64s already has more than 'maxUserConnections' active connections", nil),
ErrTooManyUserConnections: mysql.Message("User %-.64s has exceeded the 'max_user_connections' resource", nil),
ErrSetConstantsOnly: mysql.Message("You may only use constant expressions with SET", nil),
ErrLockWaitTimeout: mysql.Message("Lock wait timeout exceeded; try restarting transaction", nil),
ErrLockTableFull: mysql.Message("The total number of locks exceeds the lock table size", nil),
Expand Down
13 changes: 10 additions & 3 deletions pkg/executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,8 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {
`SELECT plugin, Account_locked, user_attributes->>'$.metadata', Token_issuer,
Password_reuse_history, Password_reuse_time, Password_expired, Password_lifetime,
user_attributes->>'$.Password_locking.failed_login_attempts',
user_attributes->>'$.Password_locking.password_lock_time_days', authentication_string
user_attributes->>'$.Password_locking.password_lock_time_days', authentication_string,
Max_user_connections
FROM %n.%n WHERE User=%? AND Host=%?`,
mysql.SystemDB, mysql.UserTable, userName, strings.ToLower(hostName))
if err != nil {
Expand Down Expand Up @@ -1831,6 +1832,12 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {
}
authData := rows[0].GetString(10)

maxUserConnections := rows[0].GetInt64(11)
maxUserConnectionsStr := ""
if maxUserConnections > 0 {
maxUserConnectionsStr = fmt.Sprintf(" WITH MAX_USER_CONNECTIONS %d", maxUserConnections)
}

rows, _, err = exec.ExecRestrictedSQL(ctx, nil, `SELECT Priv FROM %n.%n WHERE User=%? AND Host=%?`, mysql.SystemDB, mysql.GlobalPrivTable, userName, hostName)
if err != nil {
return errors.Trace(err)
Expand All @@ -1853,8 +1860,8 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {
}

// FIXME: the returned string is not escaped safely
showStr := fmt.Sprintf("CREATE USER '%s'@'%s' IDENTIFIED WITH '%s'%s REQUIRE %s%s %s ACCOUNT %s PASSWORD HISTORY %s PASSWORD REUSE INTERVAL %s%s%s%s",
e.User.Username, e.User.Hostname, authPlugin, authStr, require, tokenIssuer, passwordExpiredStr, accountLocked, passwordHistory, passwordReuseInterval, failedLoginAttempts, passwordLockTimeDays, userAttributes)
showStr := fmt.Sprintf("CREATE USER '%s'@'%s' IDENTIFIED WITH '%s'%s REQUIRE %s%s%s %s ACCOUNT %s PASSWORD HISTORY %s PASSWORD REUSE INTERVAL %s%s%s%s",
e.User.Username, e.User.Hostname, authPlugin, authStr, require, tokenIssuer, maxUserConnectionsStr, passwordExpiredStr, accountLocked, passwordHistory, passwordReuseInterval, failedLoginAttempts, passwordLockTimeDays, userAttributes)
e.appendRow([]any{showStr})
return nil
}
Expand Down
63 changes: 60 additions & 3 deletions pkg/executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ type SimpleExec struct {
staleTxnStartTS uint64
}

type resourceOptionsInfo struct {
maxQueriesPerHour int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the maxQueriesPerHour and the other 2 resource limits required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three parameters are reserved and are not currently implemented

maxUpdatesPerHour int64
maxConnectionsPerHour int64
maxUserConnections int64
}

type passwordOrLockOptionsInfo struct {
lockAccount string
passwordExpired string
Expand Down Expand Up @@ -817,6 +824,22 @@ func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error {
return nil
}

func (info *resourceOptionsInfo) loadResourceOptions(userResource []*ast.ResourceOption) error {
for _, option := range userResource {
switch option.Type {
case ast.MaxQueriesPerHour:
info.maxQueriesPerHour = min(option.Count, math.MaxInt16)
case ast.MaxUpdatesPerHour:
info.maxUpdatesPerHour = min(option.Count, math.MaxInt16)
case ast.MaxConnectionsPerHour:
info.maxConnectionsPerHour = min(option.Count, math.MaxInt16)
case ast.MaxUserConnections:
info.maxUserConnections = min(option.Count, math.MaxInt16)
}
}
return nil
}

func whetherSavePasswordHistory(plOptions *passwordOrLockOptionsInfo) bool {
var passwdSaveNum, passwdSaveTime int64
// If the user specifies a default, read the global variable.
Expand Down Expand Up @@ -1046,6 +1069,18 @@ func (e *SimpleExec) executeCreateUser(ctx context.Context, s *ast.CreateUserStm
return err
}

userResource := &resourceOptionsInfo{
maxQueriesPerHour: 0,
maxUpdatesPerHour: 0,
maxConnectionsPerHour: 0,
maxUserConnections: 0,
}

err = userResource.loadResourceOptions(s.ResourceOptions)
if err != nil {
return err
}

plOptions := &passwordOrLockOptionsInfo{
lockAccount: "N",
passwordExpired: "N",
Expand Down Expand Up @@ -1114,8 +1149,8 @@ func (e *SimpleExec) executeCreateUser(ctx context.Context, s *ast.CreateUserStm
passwordInit := true
// Get changed user password reuse info.
savePasswdHistory := whetherSavePasswordHistory(plOptions)
sqlTemplate := "INSERT INTO %n.%n (Host, User, authentication_string, plugin, user_attributes, Account_locked, Token_issuer, Password_expired, Password_lifetime, Password_reuse_time, Password_reuse_history) VALUES "
valueTemplate := "(%?, %?, %?, %?, %?, %?, %?, %?, %?"
sqlTemplate := "INSERT INTO %n.%n (Host, User, authentication_string, plugin, user_attributes, Account_locked, Token_issuer, Password_expired, Password_lifetime, Max_user_connections, Password_reuse_time, Password_reuse_history) VALUES "
valueTemplate := "(%?, %?, %?, %?, %?, %?, %?, %?, %?, %?"

sqlescape.MustFormatSQL(sql, sqlTemplate, mysql.SystemDB, mysql.UserTable)
if savePasswdHistory {
Expand Down Expand Up @@ -1200,7 +1235,7 @@ func (e *SimpleExec) executeCreateUser(ctx context.Context, s *ast.CreateUserStm
}

hostName := strings.ToLower(spec.User.Hostname)
sqlescape.MustFormatSQL(sql, valueTemplate, hostName, spec.User.Username, pwd, authPlugin, userAttributesStr, plOptions.lockAccount, recordTokenIssuer, plOptions.passwordExpired, plOptions.passwordLifetime)
sqlescape.MustFormatSQL(sql, valueTemplate, hostName, spec.User.Username, pwd, authPlugin, userAttributesStr, plOptions.lockAccount, recordTokenIssuer, plOptions.passwordExpired, plOptions.passwordLifetime, userResource.maxUserConnections)
// add Password_reuse_time value.
if plOptions.passwordReuseIntervalChange && (plOptions.passwordReuseInterval != notSpecified) {
sqlescape.MustFormatSQL(sql, `, %?`, plOptions.passwordReuseInterval)
Expand Down Expand Up @@ -1695,6 +1730,20 @@ func (e *SimpleExec) executeAlterUser(ctx context.Context, s *ast.AlterUserStmt)
s.Specs = []*ast.UserSpec{spec}
}

userResource := &resourceOptionsInfo{
maxQueriesPerHour: 0,
maxUpdatesPerHour: 0,
maxConnectionsPerHour: 0,
// can't set 0 to maxUserConnections as default, because user could set 0 to this field.
// -1(invalid value) as a default parameter.
maxUserConnections: -1,
}

err = userResource.loadResourceOptions(s.ResourceOptions)
if err != nil {
return err
}

plOptions := passwordOrLockOptionsInfo{
lockAccount: "",
passwordExpired: "",
Expand Down Expand Up @@ -1924,6 +1973,14 @@ func (e *SimpleExec) executeAlterUser(ctx context.Context, s *ast.AlterUserStmt)
fields = append(fields, alterField{"password_lifetime=%?", plOptions.passwordLifetime})
}

if userResource.maxUserConnections >= 0 {
// need `CREATE USER` privilege for the operation of modifying max_user_connections.
if !hasCreateUserPriv {
return plannererrors.ErrSpecificAccessDenied.GenWithStackByArgs("CREATE USER")
}
fields = append(fields, alterField{"max_user_connections=%?", userResource.maxUserConnections})
}

var newAttributes []string
if s.CommentOrAttributeOption != nil {
if s.CommentOrAttributeOption.Type == ast.UserCommentType {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/simpletest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 11,
shard_count = 12,
deps = [
"//pkg/config",
"//pkg/parser/ast",
Expand Down
61 changes: 61 additions & 0 deletions pkg/executor/test/simpletest/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,67 @@ func TestRole(t *testing.T) {
tk.MustExec("SET ROLE NONE")
}

func TestMaxUserConnections(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

// test global variables max_user_connections.
result := tk.MustQuery(`show variables like 'max_user_connections'`)
result.Check(testkit.Rows("max_user_connections 0"))
tk.MustExec(`set global max_user_connections = 3;`)
tk.MustQuery(`show variables like 'max_user_connections'`).Check(testkit.Rows("max_user_connections 3"))
// if the value < 0, set 0 to max_user_connections.
tk.MustExec(`set global max_user_connections = -1;`)
tk.MustQuery(`show variables like 'max_user_connections'`).Check(testkit.Rows("max_user_connections 0"))
// if the value > 100000, set 100000 to max_user_connections.
tk.MustExec(`set global max_user_connections = 100001;`)
tk.MustQuery(`show variables like 'max_user_connections'`).Check(testkit.Rows("max_user_connections 100000"))
tk.MustExec(`set global max_user_connections = 0;`)
tk.MustQuery(`show variables like 'max_user_connections'`).Check(testkit.Rows("max_user_connections 0"))

// create user with the default max_user_connections 0
createUserSQL := `CREATE USER 'test'@'localhost';`
tk.MustExec(createUserSQL)
result = tk.MustQuery(`select user, max_user_connections from mysql.user`)
result.Check(testkit.Rows("root 0", "test 0"))

// create user with max_user_connections 3
createUserSQL = `CREATE USER 'test1'@'localhost' WITH MAX_USER_CONNECTIONS 3;`
tk.MustExec(createUserSQL)
result = tk.MustQuery(`select user, max_user_connections from mysql.user WHERE User="test1"`)
result.Check(testkit.Rows("test1 3"))

// test alter user with MAX_USER_CONNECTIONS
alterUserSQL := `ALTER USER 'test1'@'localhost' WITH MAX_USER_CONNECTIONS 4;`
tk.MustExec(alterUserSQL)
result = tk.MustQuery(`select user, max_user_connections from mysql.user WHERE User="test1"`)
result.Check(testkit.Rows("test1 4"))
alterUserSQL = `ALTER USER 'test1'@'localhost' WITH MAX_USER_CONNECTIONS -2;`
_, err := tk.Exec(alterUserSQL)
require.Error(t, err)
require.Equal(t, err.Error(), "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 58 near \"-2;\" ")
alterUserSQL = `ALTER USER 'test1'@'localhost' WITH MAX_USER_CONNECTIONS 0;`
tk.MustExec(alterUserSQL)
result = tk.MustQuery(`select user, max_user_connections from mysql.user WHERE User="test1"`)
result.Check(testkit.Rows("test1 0"))

// grant the privilege of 'create user' to 'test1'@'localhost'
tkTest1 := testkit.NewTestKit(t, store)
require.NoError(t, tkTest1.Session().Auth(&auth.UserIdentity{Username: "test1", Hostname: "localhost"}, nil, nil, nil))
_, err = tkTest1.Exec(`ALTER USER 'test1'@'localhost' WITH MAX_USER_CONNECTIONS 2`)
require.Error(t, err)
require.EqualError(t, err, "[planner:1227]Access denied; you need (at least one of) the CREATE USER privilege(s) for this operation")
tk.MustExec(`GRANT CREATE USER ON *.* TO 'test1'@'localhost'`)
_, err = tkTest1.Exec(`ALTER USER 'test1'@'localhost' WITH MAX_USER_CONNECTIONS 2`)
require.Nil(t, err)

// revert the privilege of 'create user' for 'test1'@'localhost'
tk.MustExec(`REVOKE CREATE USER ON *.* FROM 'test1'@'localhost'`)
_, err = tkTest1.Exec(`ALTER USER 'test1'@'localhost' WITH MAX_USER_CONNECTIONS 2`)
require.Error(t, err)
require.EqualError(t, err, "[planner:1227]Access denied; you need (at least one of) the CREATE USER privilege(s) for this operation")
}

func TestUser(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
15 changes: 13 additions & 2 deletions pkg/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23289,8 +23289,19 @@ yynewstate:
case 2517:
{
parser.yyVAL.item = yyS[yypt-0].item
yylex.AppendError(yylex.Errorf("TiDB does not support WITH ConnectionOptions now, they would be parsed but ignored."))
parser.lastErrorAsWarn()
needWarning := false
for _, option := range yyS[yypt-0].item.([]*ast.ResourceOption) {
switch option.Type {
case ast.MaxUserConnections:
// do nothing.
default:
needWarning = true
}
}
if needWarning {
yylex.AppendError(yylex.Errorf("TiDB does not support WITH ConnectionOptions but MAX_USER_CONNECTIONS now, they would be parsed but ignored."))
parser.lastErrorAsWarn()
}
}
case 2518:
{
Expand Down
15 changes: 13 additions & 2 deletions pkg/parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -13770,8 +13770,19 @@ ConnectionOptions:
| "WITH" ConnectionOptionList
{
$$ = $2
yylex.AppendError(yylex.Errorf("TiDB does not support WITH ConnectionOptions now, they would be parsed but ignored."))
parser.lastErrorAsWarn()
Comment on lines -13773 to -13774
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only support MAX_USER_CONNECTIONS, maybe keep reporting error for all others?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only support MAX_USER_CONNECTIONS, maybe keep reporting error for all others?

Good comments. I have modify it. If enter other WITH xxx but WITH MAX_USER_CONNECTIONS, it wiil report warning.

needWarning := false
for _, option := range $2.([]*ast.ResourceOption) {
switch option.Type {
case ast.MaxUserConnections:
// do nothing.
default:
needWarning = true
}
}
if needWarning {
yylex.AppendError(yylex.Errorf("TiDB does not support WITH ConnectionOptions but MAX_USER_CONNECTIONS now, they would be parsed but ignored."))
parser.lastErrorAsWarn()
}
}

ConnectionOptionList:
Expand Down
3 changes: 3 additions & 0 deletions pkg/privilege/privilege.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type Manager interface {

// GetAuthPluginForConnection gets the authentication plugin used in connection establishment.
GetAuthPluginForConnection(ctx context.Context, user, host string) (string, error)

//GetUserResources gets the max user connections for the account identified by the user and host
GetUserResources(user, host string) (int64, error)
}

const key keyType = 0
Expand Down
5 changes: 4 additions & 1 deletion pkg/privilege/privileges/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const (
References_priv,Alter_priv,Execute_priv,Index_priv,Create_view_priv,Show_view_priv,
Create_role_priv,Drop_role_priv,Create_tmp_table_priv,Lock_tables_priv,Create_routine_priv,
Alter_routine_priv,Event_priv,Shutdown_priv,Reload_priv,File_priv,Config_priv,Repl_client_priv,Repl_slave_priv,
Account_locked,Plugin,Token_issuer,User_attributes,password_expired,password_last_changed,password_lifetime FROM mysql.user`
Account_locked,Plugin,Token_issuer,User_attributes,password_expired,password_last_changed,password_lifetime,max_user_connections FROM mysql.user`
sqlLoadGlobalGrantsTable = `SELECT HIGH_PRIORITY Host,User,Priv,With_Grant_Option FROM mysql.global_grants`
)

Expand Down Expand Up @@ -120,6 +120,7 @@ type UserRecord struct {
PasswordExpired bool
PasswordLastChanged time.Time
PasswordLifeTime int64
MaxUserConnections int64
ResourceGroup string
}

Expand Down Expand Up @@ -998,6 +999,8 @@ func (p *immutable) decodeUserTableRow(row chunk.Row, fs []*resolve.ResultField)
continue
}
value.PasswordLifeTime = row.GetInt64(i)
case f.ColumnAsName.L == "max_user_connections":
value.MaxUserConnections = row.GetInt64(i)
case f.Column.GetType() == mysql.TypeEnum:
if row.GetEnum(i).String() != "Y" {
continue
Expand Down
15 changes: 15 additions & 0 deletions pkg/privilege/privileges/privileges.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,21 @@ func (p *UserPrivileges) isValidHash(record *UserRecord) bool {
return false
}

// GetUserResources gets the maximum number of connections for the current user
func (p *UserPrivileges) GetUserResources(user, host string) (int64, error) {
mysqlPriv := p.Handle.Get()
record := mysqlPriv.connectionVerification(user, host)
if record == nil {
logutil.BgLogger().Error("get user privilege record fail",
zap.String("user", user), zap.String("host", host))
return 0, errors.New("Failed to get user record")
}
if p.isValidHash(record) {
return record.MaxUserConnections, nil
}
return 0, errors.New("Failed to get max user connections")
}

// GetAuthPluginForConnection gets the authentication plugin used in connection establishment.
func (p *UserPrivileges) GetAuthPluginForConnection(ctx context.Context, user, host string) (string, error) {
if SkipWithGrant {
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"rpc_server.go",
"server.go",
"stat.go",
"user_connections.go",
],
importpath = "github.com/pingcap/tidb/pkg/server",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -149,6 +150,7 @@ go_test(
"stat_test.go",
"tidb_library_test.go",
"tidb_test.go",
"user_connections_test.go",
],
data = glob(["testdata/**"]),
embed = [":server"],
Expand Down
Loading