Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of ActiveDefrag to reduce latencies #1242

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->timeEventNextId = 1;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
Expand Down
5 changes: 3 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3201,10 +3201,11 @@ standardConfig static_configs[] = {
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-keepalive", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcpkeepalive, 300, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-migration-barrier", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_migration_barrier, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */
createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */
createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration),
createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.replica_priority, 100, INTEGER_CONFIG, NULL, NULL),
Expand Down
1,042 changes: 630 additions & 412 deletions src/defrag.c

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {

/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
static void dictDefragBucket(dictEntry **bucketref, const dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
Expand Down Expand Up @@ -1487,7 +1487,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
* where NULL means that no reallocation happened and the old memory is still
* valid. */
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) {
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata) {
int htidx0, htidx1;
const dictEntry *de, *next;
unsigned long m0, m1;
Expand Down
2 changes: 1 addition & 1 deletion src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);

Expand Down
23 changes: 18 additions & 5 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d) return 0;
Expand All @@ -748,14 +748,27 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
* within dict, it only reallocates the memory used by the dict structure itself using
* the provided allocation function. This feature was added for the active defrag feature.
*
* The 'defragfn' callback is called with a reference to the dict
* that callback can reallocate. */
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = 0; didx < kvs->num_dicts; didx++) {
* With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time
* to execute. A "cursor" is used to perform the operation iteratively. When first called, a
* cursor value of 0 should be provided. The return value is an updated cursor which should be
* provided on the next iteration. The operation is complete when 0 is returned.
*
* The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = cursor; didx < kvs->num_dicts; didx++) {
dict **d = kvstoreGetDictRef(kvs, didx), *newd;
if (!*d) continue;

listNode *rehashing_node = NULL;
if (listLength(kvs->rehashing) > 0) {
rehashing_node = ((kvstoreDictMetadata *)dictMetadata(*d))->rehashing_node;
}

if ((newd = defragfn(*d))) *d = newd;
if (rehashing_node) listNodeValue(rehashing_node) = *d;
return (didx + 1);
}
return 0;
}

uint64_t kvstoreGetHash(kvstore *kvs, const void *key) {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata);
typedef dict *(kvstoreDictLUTDefragFunction)(dict *d);
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn);
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn);
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key);
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key);
dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing);
Expand Down
26 changes: 4 additions & 22 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1062,8 +1062,8 @@ void databasesCron(void) {
}
}

/* Defrag keys gradually. */
activeDefragCycle();
/* Start active defrag cycle or adjust defrag CPU if needed. */
monitorActiveDefrag();

/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
Expand Down Expand Up @@ -1532,22 +1532,6 @@ void whileBlockedCron(void) {
mstime_t latency;
latencyStartMonitor(latency);

/* In some cases we may be called with big intervals, so we may need to do
* extra work here. This is because some of the functions in serverCron rely
* on the fact that it is performed every 10 ms or so. For instance, if
* activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we
* need to call it multiple times. */
long hz_ms = 1000 / server.hz;
while (server.blocked_last_cron < server.mstime) {
/* Defrag keys gradually. */
activeDefragCycle();

server.blocked_last_cron += hz_ms;

/* Increment cronloop so that run_with_period works. */
server.cronloops++;
}

/* Other cron jobs do not need to be done in a loop. No need to check
* server.blocked_last_cron since we have an early exit at the top. */

Expand Down Expand Up @@ -2041,7 +2025,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.active_defrag_running = 0;
server.active_defrag_cpu_percent = 0;
server.active_defrag_configuration_changed = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
Expand Down Expand Up @@ -2655,8 +2639,6 @@ void initServer(void) {
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later, (void (*)(void *))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
Expand Down Expand Up @@ -5610,7 +5592,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"mem_aof_buffer:%zu\r\n", mh->aof_buffer,
"mem_allocator:%s\r\n", ZMALLOC_LIB,
"mem_overhead_db_hashtable_rehashing:%zu\r\n", mh->overhead_db_hashtable_rehashing,
"active_defrag_running:%d\r\n", server.active_defrag_running,
"active_defrag_running:%d\r\n", server.active_defrag_cpu_percent,
"lazyfree_pending_objects:%zu\r\n", lazyfreeGetPendingObjectsCount(),
"lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount()));
freeMemoryOverheadData(mh);
Expand Down
10 changes: 5 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,6 @@ typedef struct serverDb {
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} serverDb;

/* forward declaration for functions ctx */
Expand Down Expand Up @@ -1669,7 +1668,7 @@ struct valkeyServer {
int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */
int shutdown_flags; /* Flags passed to prepareForShutdown(). */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
int active_defrag_cpu_percent; /* Current desired CPU percentage for active defrag */
char *pidfile; /* PID file path */
int arch_bits; /* 32 or 64 depending on sizeof(long) */
int cronloops; /* Number of times the cron function run */
Expand Down Expand Up @@ -1868,8 +1867,9 @@ struct valkeyServer {
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cpu_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cpu_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cycle_us; /* standard duration of defrag cycle */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from
within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
Expand Down Expand Up @@ -3312,7 +3312,7 @@ void bytesToHuman(char *s, size_t size, unsigned long long n);
void enterExecutionUnit(int update_cached_time, long long us);
void exitExecutionUnit(void);
void resetServerStats(void);
void activeDefragCycle(void);
void monitorActiveDefrag(void);
unsigned int getLRUClock(void);
unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);
Expand Down
19 changes: 16 additions & 3 deletions tests/unit/memefficiency.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ run_solo {defrag} {
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75

after 1000 ;# Give defrag time to work (might be multiple cycles)

# Wait for the active defrag to stop working.
wait_for_condition 2000 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -138,12 +140,13 @@ run_solo {defrag} {
r config resetstat
r config set key-load-delay -25 ;# sleep on average 1/25 usec
r debug loadaof
after 1000 ;# give defrag a chance to work before turning it off
r config set activedefrag no

# measure hits and misses right after aof loading
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]

after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
Expand Down Expand Up @@ -203,7 +206,7 @@ run_solo {defrag} {
$rd read ; # Discard script load replies
$rd read ; # Discard set replies
}
after 120 ;# serverCron only updates the info once in 100ms
after 1000 ;# give defrag some time to work
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -239,6 +242,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag time to work (might be multiple cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -361,6 +366,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -430,7 +437,6 @@ run_solo {defrag} {
$rd read ; # Discard set replies
}

after 120 ;# serverCron only updates the info once in 100ms
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
Expand Down Expand Up @@ -466,6 +472,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand All @@ -475,6 +483,7 @@ run_solo {defrag} {
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
r config set activedefrag no ;# disable before we accidentally create more frag

# test the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
Expand Down Expand Up @@ -561,6 +570,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down Expand Up @@ -685,6 +696,8 @@ run_solo {defrag} {
fail "defrag not started."
}

after 1000 ;# Give defrag some time to work (it may run several cycles)

# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
Expand Down
16 changes: 11 additions & 5 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2300,9 +2300,8 @@ rdb-save-incremental-fsync yes
# Fragmentation is a natural process that happens with every allocator (but
# less so with Jemalloc, fortunately) and certain workloads. Normally a server
# restart is needed in order to lower the fragmentation, or at least to flush
# away all the data and create it again. However thanks to this feature
# implemented by Oran Agra, this process can happen at runtime
JimB123 marked this conversation as resolved.
Show resolved Hide resolved
# in a "hot" way, while the server is running.
# away all the data and create it again. However thanks to this feature, this
# process can happen at runtime in a "hot" way, while the server is running.
#
# Basically when the fragmentation is over a certain level (see the
# configuration options below) the server will start to create new copies of the
Expand Down Expand Up @@ -2341,17 +2340,24 @@ rdb-save-incremental-fsync yes
# active-defrag-threshold-upper 100

# Minimal effort for defrag in CPU percentage, to be used when the lower
# threshold is reached
# threshold is reached.
# Note: this is not actually a cycle time, but is an overall CPU percentage
# active-defrag-cycle-min 1

# Maximal effort for defrag in CPU percentage, to be used when the upper
# threshold is reached
# threshold is reached.
# Note: this is not actually a cycle time, but is an overall CPU percentage
# active-defrag-cycle-max 25

# Maximum number of set/hash/zset/list fields that will be processed from
# the main dictionary scan
# active-defrag-max-scan-fields 1000

# The time spent (in microseconds) of the periodic active defrag process. This
# affects the latency impact of active defrag on client commands. Smaller numbers
# will result in less latency impact at the cost of increased defrag overhead.
# active-defrag-cycle-us 500

# Jemalloc background thread for purging will be enabled by default
jemalloc-bg-thread yes

Expand Down
Loading