Skip to content

Commit

Permalink
Add flag to set max-bg-workers argument
Browse files Browse the repository at this point in the history
Users may have reason to change the number of background workers
that are used and therefore should be able to override the default
(8) that tune suggests. This PR adds that flag and adjusts relevant
parameters accordingly.
  • Loading branch information
Jeff Lambert authored and RobAtticus committed Nov 24, 2020
1 parent 2d69d7c commit 80ad061
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 149 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
*.dll
*.so
*.dylib
timescaledb-tune

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
coverage.txt

# Popular IDEs
.idea/
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ If you want recommendations for a specific amount of memory and/or CPUs:
$ timescaledb-tune --memory="4GB" --cpus=2
```

If you want to set a specific number of background workers (`timescaledb.max_background_workers`):
```bash
$ timescaledb-tune --max-bg-workers=16
```

If you have a dedicated disk for WAL, or want to specify how much of a
shared disk should be used for WAL:
```bash
Expand Down
2 changes: 2 additions & 0 deletions cmd/timescaledb-tune/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"runtime"
"strings"

"github.com/timescale/timescaledb-tune/pkg/pgtune"
"github.com/timescale/timescaledb-tune/pkg/tstune"
)

Expand All @@ -35,6 +36,7 @@ func init() {
flag.StringVar(&f.PGVersion, "pg-version", "", "Major version of PostgreSQL to base recommendations on. Default is determined via pg_config. Valid values: "+strings.Join(tstune.ValidPGVersions, ", "))
flag.StringVar(&f.WALDiskSize, "wal-disk-size", "", "Size of the disk where the WAL resides, in PostgreSQL format <int value><units>, e.g., 4GB. Using this flag helps tune WAL behavior.")
flag.Uint64Var(&f.MaxConns, "max-conns", 0, "Max number of connections for the database. Default is equal to our best recommendation")
flag.IntVar(&f.MaxBGWorkers, "max-bg-workers", pgtune.MaxBackgroundWorkersDefault, "Max number of background workers")
flag.StringVar(&f.ConfPath, "conf-path", "", "Path to postgresql.conf. If blank, heuristics will be used to find it")
flag.StringVar(&f.DestPath, "out-path", "", "Path to write the new configuration file. If blank, will use the same file that is read from")
flag.StringVar(&f.PGConfig, "pg-config", "pg_config", "Path to the pg_config binary")
Expand Down
7 changes: 3 additions & 4 deletions pkg/pgtune/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@ import (
// unaffected by number of CPUs and max connections; the exception is work_mem,
// so the adjustment is done in the init function
var memoryToBaseVals = map[uint64]map[string]uint64{
10 * parse.Gigabyte: map[string]uint64{
10 * parse.Gigabyte: {
SharedBuffersKey: 2560 * parse.Megabyte,
EffectiveCacheKey: 7680 * parse.Megabyte,
MaintenanceWorkMemKey: 1280 * parse.Megabyte,
WorkMemKey: 64 * parse.Megabyte,
},
12 * parse.Gigabyte: map[string]uint64{
12 * parse.Gigabyte: {
SharedBuffersKey: 3 * parse.Gigabyte,
EffectiveCacheKey: 9 * parse.Gigabyte,
MaintenanceWorkMemKey: 1536 * parse.Megabyte,
WorkMemKey: 78643 * parse.Kilobyte,
},
32 * parse.Gigabyte: map[string]uint64{

32 * parse.Gigabyte: {
SharedBuffersKey: 8 * parse.Gigabyte,
EffectiveCacheKey: 24 * parse.Gigabyte,
MaintenanceWorkMemKey: maintenanceWorkMemLimit,
Expand Down
3 changes: 3 additions & 0 deletions pkg/pgtune/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (
// MaxConnectionsDefault is the recommended default value for max_connections.
const MaxConnectionsDefault uint64 = 100

// MaxBackgroundWorkersDefault is the recommended default value for timescaledb.max_background_workers.
const MaxBackgroundWorkersDefault int = 8

// getMaxConns gives a default amount of connections based on a memory step
// function.
func getMaxConns(totalMemory uint64) uint64 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgtune/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestMiscRecommenderRecommendPanic(t *testing.T) {
func TestMiscSettingsGroup(t *testing.T) {
for totalMemory, outerMatrix := range miscSettingsMatrix {
for maxConns, matrix := range outerMatrix {
config, err := NewSystemConfig(totalMemory, 8, "10", walDiskUnset, maxConns)
config, err := NewSystemConfig(totalMemory, 8, "10", walDiskUnset, maxConns, MaxBackgroundWorkersDefault)
if err != nil {
t.Errorf("unexpected error on system config creation: got %v", err)
}
Expand Down
27 changes: 16 additions & 11 deletions pkg/pgtune/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ const (
MaxParallelWorkersGatherKey = "max_parallel_workers_per_gather"
MaxParallelWorkers = "max_parallel_workers" // pg10+

defaultMaxBackgroundWorkers = 8 // This may be more dynamic in the future
minBuiltInProcesses = 3 // at least checkpointer, WALwriter, vacuum
minBuiltInProcesses = 3 // at least checkpointer, WALwriter, vacuum

errOneCPU = "cannot make recommendations with just 1 CPU"
errOneCPU = "cannot make recommendations with just 1 CPU"
errWorkers = "cannot make recommendations with less than %d workers"
)

// ParallelLabel is the label used to refer to the parallelism settings group
Expand All @@ -31,13 +31,14 @@ var ParallelKeys = []string{

// ParallelRecommender gives recommendations for ParallelKeys based on system resources.
type ParallelRecommender struct {
cpus int
cpus int
maxBGWorkers int
}

// NewParallelRecommender returns a ParallelRecommender that recommends based on
// the given number of cpus.
func NewParallelRecommender(cpus int) *ParallelRecommender {
return &ParallelRecommender{cpus}
func NewParallelRecommender(cpus, maxBGWorkers int) *ParallelRecommender {
return &ParallelRecommender{cpus, maxBGWorkers}
}

// IsAvailable returns whether this Recommender is usable given the system
Expand All @@ -53,17 +54,20 @@ func (r *ParallelRecommender) Recommend(key string) string {
if r.cpus <= 1 {
panic(errOneCPU)
}
if r.maxBGWorkers < MaxBackgroundWorkersDefault {
panic(fmt.Sprintf(errWorkers, MaxBackgroundWorkersDefault))
}
if key == MaxWorkerProcessesKey {
// Need enough processes to handle built-ins (e.g., autovacuum),
// TimescaleDB background workers, and the number of parallel workers
// (equal to the number of CPUs).
val = fmt.Sprintf("%d", minBuiltInProcesses+defaultMaxBackgroundWorkers+r.cpus)
val = fmt.Sprintf("%d", minBuiltInProcesses+r.maxBGWorkers+r.cpus)
} else if key == MaxParallelWorkers {
val = fmt.Sprintf("%d", r.cpus)
} else if key == MaxParallelWorkersGatherKey {
val = fmt.Sprintf("%d", int(math.Round(float64(r.cpus)/2.0)))
} else if key == MaxBackgroundWorkers {
val = fmt.Sprintf("%d", defaultMaxBackgroundWorkers)
val = fmt.Sprintf("%d", r.maxBGWorkers)
} else {
panic(fmt.Sprintf("unknown key: %s", key))
}
Expand All @@ -72,8 +76,9 @@ func (r *ParallelRecommender) Recommend(key string) string {

// ParallelSettingsGroup is the SettingsGroup to represent parallelism settings.
type ParallelSettingsGroup struct {
pgVersion string
cpus int
pgVersion string
cpus int
maxBGWorkers int
}

// Label should always return the value ParallelLabel.
Expand All @@ -89,5 +94,5 @@ func (sg *ParallelSettingsGroup) Keys() []string {

// GetRecommender should return a new ParallelRecommender.
func (sg *ParallelSettingsGroup) GetRecommender() Recommender {
return NewParallelRecommender(sg.cpus)
return NewParallelRecommender(sg.cpus, sg.maxBGWorkers)
}
142 changes: 95 additions & 47 deletions pkg/pgtune/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,105 @@ import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/timescale/timescaledb-tune/pkg/pgutils"
)

// parallelSettingsMatrix stores the test cases for ParallelRecommender along
// with the expected values for its keys
var parallelSettingsMatrix = map[int]map[string]string{
2: map[string]string{
MaxBackgroundWorkers: fmt.Sprintf("%d", defaultMaxBackgroundWorkers),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 2+minBuiltInProcesses+defaultMaxBackgroundWorkers),
MaxParallelWorkersGatherKey: "1",
MaxParallelWorkers: "2",
var parallelSettingsMatrix = map[int]map[int]map[string]string{
2: {
MaxBackgroundWorkersDefault: {
MaxBackgroundWorkers: fmt.Sprintf("%d", MaxBackgroundWorkersDefault),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 2+minBuiltInProcesses+MaxBackgroundWorkersDefault),
MaxParallelWorkersGatherKey: "1",
MaxParallelWorkers: "2",
},
MaxBackgroundWorkersDefault * 2: {
MaxBackgroundWorkers: fmt.Sprintf("%d", MaxBackgroundWorkersDefault*2),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 2+minBuiltInProcesses+MaxBackgroundWorkersDefault*2),
MaxParallelWorkersGatherKey: "1",
MaxParallelWorkers: "2",
},
},
4: map[string]string{
MaxBackgroundWorkers: fmt.Sprintf("%d", defaultMaxBackgroundWorkers),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 4+minBuiltInProcesses+defaultMaxBackgroundWorkers),
MaxParallelWorkersGatherKey: "2",
MaxParallelWorkers: "4",
4: {
MaxBackgroundWorkersDefault: {
MaxBackgroundWorkers: fmt.Sprintf("%d", MaxBackgroundWorkersDefault),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 4+minBuiltInProcesses+MaxBackgroundWorkersDefault),
MaxParallelWorkersGatherKey: "2",
MaxParallelWorkers: "4",
},
MaxBackgroundWorkersDefault * 4: {
MaxBackgroundWorkers: fmt.Sprintf("%d", MaxBackgroundWorkersDefault*4),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 4+minBuiltInProcesses+MaxBackgroundWorkersDefault*4),
MaxParallelWorkersGatherKey: "2",
MaxParallelWorkers: "4",
},
},
5: map[string]string{
MaxBackgroundWorkers: fmt.Sprintf("%d", defaultMaxBackgroundWorkers),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 5+minBuiltInProcesses+defaultMaxBackgroundWorkers),
MaxParallelWorkersGatherKey: "3",
MaxParallelWorkers: "5",
5: {
MaxBackgroundWorkersDefault: {
MaxBackgroundWorkers: fmt.Sprintf("%d", MaxBackgroundWorkersDefault),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 5+minBuiltInProcesses+MaxBackgroundWorkersDefault),
MaxParallelWorkersGatherKey: "3",
MaxParallelWorkers: "5",
},
MaxBackgroundWorkersDefault * 5: {
MaxBackgroundWorkers: fmt.Sprintf("%d", MaxBackgroundWorkersDefault*5),
MaxWorkerProcessesKey: fmt.Sprintf("%d", 5+minBuiltInProcesses+MaxBackgroundWorkersDefault*5),
MaxParallelWorkersGatherKey: "3",
MaxParallelWorkers: "5",
},
},
}

func TestNewParallelRecommender(t *testing.T) {
rand.Seed(time.Now().UnixNano())
for i := 0; i < 1000000; i++ {
cpus := rand.Intn(128)
r := NewParallelRecommender(cpus)
// ensure a minimum of background workers
workers := rand.Intn(128-MaxBackgroundWorkersDefault+1) + MaxBackgroundWorkersDefault
r := NewParallelRecommender(cpus, workers)
if r == nil {
t.Errorf("unexpected nil recommender")
}
if got := r.cpus; got != cpus {
t.Errorf("recommender has incorrect cpus: got %d want %d", got, cpus)
}
if got := r.maxBGWorkers; got != workers {
t.Errorf("recommender has incorrect workers: got %d want %d", got, workers)
}
}
}

func TestParallelRecommenderIsAvailable(t *testing.T) {
if r := NewParallelRecommender(0); r.IsAvailable() {
if r := NewParallelRecommender(0, MaxBackgroundWorkersDefault); r.IsAvailable() {
t.Errorf("unexpectedly available for 0 cpus")
}
if r := NewParallelRecommender(1); r.IsAvailable() {
if r := NewParallelRecommender(1, MaxBackgroundWorkersDefault); r.IsAvailable() {
t.Errorf("unexpectedly available for 1 cpus")
}

for i := 2; i < 1000; i++ {
if r := NewParallelRecommender(i); !r.IsAvailable() {
if r := NewParallelRecommender(i, MaxBackgroundWorkersDefault); !r.IsAvailable() {
t.Errorf("unexpected UNavailable for %d cpus", i)
}
}
}

func TestParallelRecommenderRecommend(t *testing.T) {
for cpus, matrix := range parallelSettingsMatrix {
r := &ParallelRecommender{cpus}
testRecommender(t, r, ParallelKeys, matrix)
for cpus, tempMatrix := range parallelSettingsMatrix {
for workers, matrix := range tempMatrix {
r := &ParallelRecommender{cpus, workers}
testRecommender(t, r, ParallelKeys, matrix)
}
}
}

func TestParallelRecommenderRecommendPanics(t *testing.T) {
// test invalid key panic
func() {
r := &ParallelRecommender{5}
r := &ParallelRecommender{5, MaxBackgroundWorkersDefault}
defer func() {
if re := recover(); re == nil {
t.Errorf("did not panic when should")
Expand All @@ -77,44 +111,58 @@ func TestParallelRecommenderRecommendPanics(t *testing.T) {
r.Recommend("foo")
}()

// test invalid CPU panic
func() {
r := &ParallelRecommender{1}
defer func() {
if re := recover(); re == nil {
t.Errorf("did not panic when should")
}
}()
r := &ParallelRecommender{1, MaxBackgroundWorkersDefault}
r.Recommend("foo")
}()

// test invalid worker panic
func() {
defer func() {
if re := recover(); re == nil {
t.Errorf("did not panic when should")
}
}()
r := &ParallelRecommender{5, MaxBackgroundWorkersDefault - 1}
r.Recommend("foo")
}()
}

func TestParallelSettingsGroup(t *testing.T) {
keyCount := len(ParallelKeys)
for cpus, matrix := range parallelSettingsMatrix {
config := getDefaultTestSystemConfig(t)
config.CPUs = cpus
config.PGMajorVersion = pgutils.MajorVersion96 // 9.6 lacks one key
sg := GetSettingsGroup(ParallelLabel, config)
if got := len(sg.Keys()); got != keyCount-1 {
t.Errorf("incorrect number of keys for PG %s: got %d want %d", pgutils.MajorVersion96, got, keyCount-1)
}
testSettingGroup(t, sg, matrix, ParallelLabel, ParallelKeys)
for cpus, tempMatrix := range parallelSettingsMatrix {
for workers, matrix := range tempMatrix {
config := getDefaultTestSystemConfig(t)
config.CPUs = cpus
config.PGMajorVersion = pgutils.MajorVersion96 // 9.6 lacks one key
config.MaxBGWorkers = workers
sg := GetSettingsGroup(ParallelLabel, config)
if got := len(sg.Keys()); got != keyCount-1 {
t.Errorf("incorrect number of keys for PG %s: got %d want %d", pgutils.MajorVersion96, got, keyCount-1)
}
testSettingGroup(t, sg, matrix, ParallelLabel, ParallelKeys)

// PG10 adds a key
config.PGMajorVersion = pgutils.MajorVersion10
sg = GetSettingsGroup(ParallelLabel, config)
if got := len(sg.Keys()); got != keyCount {
t.Errorf("incorrect number of keys for PG %s: got %d want %d", pgutils.MajorVersion10, got, keyCount)
}
testSettingGroup(t, sg, matrix, ParallelLabel, ParallelKeys)
// PG10 adds a key
config.PGMajorVersion = pgutils.MajorVersion10
sg = GetSettingsGroup(ParallelLabel, config)
if got := len(sg.Keys()); got != keyCount {
t.Errorf("incorrect number of keys for PG %s: got %d want %d", pgutils.MajorVersion10, got, keyCount)
}
testSettingGroup(t, sg, matrix, ParallelLabel, ParallelKeys)

config.PGMajorVersion = pgutils.MajorVersion11
sg = GetSettingsGroup(ParallelLabel, config)
if got := len(sg.Keys()); got != keyCount {
t.Errorf("incorrect number of keys for PG %s: got %d want %d", pgutils.MajorVersion11, got, keyCount)
config.PGMajorVersion = pgutils.MajorVersion11
sg = GetSettingsGroup(ParallelLabel, config)
if got := len(sg.Keys()); got != keyCount {
t.Errorf("incorrect number of keys for PG %s: got %d want %d", pgutils.MajorVersion11, got, keyCount)
}
testSettingGroup(t, sg, matrix, ParallelLabel, ParallelKeys)
}
testSettingGroup(t, sg, matrix, ParallelLabel, ParallelKeys)

}

}
Loading

0 comments on commit 80ad061

Please sign in to comment.