From e02bf1e5f7755257c6135bda7eccca581fe4a956 Mon Sep 17 00:00:00 2001 From: idning Date: Sat, 11 Oct 2014 11:39:24 +0800 Subject: [PATCH] not correct on repl_set_master --- src/Makefile | 2 +- src/ndb.c | 6 ++-- src/ndb_command.c | 8 ++--- src/ndb_cursor.h | 2 -- src/ndb_job.c | 9 ++++-- src/ndb_oplog.c | 2 +- src/ndb_repl.c | 71 +++++++++++++++++++++++++++++++++-------- test/unit/test_oplog.py | 66 ++++++++++++++++++++++++++++++++++---- 8 files changed, 130 insertions(+), 36 deletions(-) diff --git a/src/Makefile b/src/Makefile index da173fd..333366c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -69,7 +69,7 @@ test/test_oplog.o: ndb_oplog.c test/test_oplog.c #test-repl TEST_REPL_BIN=test_repl -TEST_REPL_OBJ=$(UTIL_OBJ) test/test_repl.o ndb_leveldb.o ndb_oplog.o +TEST_REPL_OBJ=$(UTIL_OBJ) test/test_repl.o ndb_leveldb.o ndb_oplog.o ndb_job.o $(TEST_REPL_BIN): $(TEST_REPL_OBJ) $(LD) $(LDFLAGS) -o $@ $^ $(LDLIBS) test/test_repl.o: ndb_repl.c test/test_repl.c diff --git a/src/ndb.c b/src/ndb.c index d415b61..d5a7e73 100644 --- a/src/ndb.c +++ b/src/ndb.c @@ -184,13 +184,13 @@ ndb_init(instance_t *instance) return status; } - status = server_init(instance, &instance->srv, - ndb_conn_recv_done, ndb_conn_send_done); + status = job_init(instance); if (status != NC_OK) { return status; } - status = job_init(instance); + status = server_init(instance, &instance->srv, + ndb_conn_recv_done, ndb_conn_send_done); if (status != NC_OK) { return status; } diff --git a/src/ndb_command.c b/src/ndb_command.c index 6cbe007..6a030c8 100644 --- a/src/ndb_command.c +++ b/src/ndb_command.c @@ -678,12 +678,8 @@ command_process_slaveof(struct conn *conn, msg_t *msg) repl_set_master(repl, master); } - status = job_signal(JOB_REPL); - if (status == NC_OK) { - return command_reply_ok(conn); - } else { - return command_reply_err(conn, "-ERR repl already running\r\n"); - } + return command_reply_ok(conn); + } /* diff --git a/src/ndb_cursor.h b/src/ndb_cursor.h index 1addf4a..5db141c 100644 --- a/src/ndb_cursor.h +++ b/src/ndb_cursor.h @@ -26,5 +26,3 @@ sds cursor_next_key(cursor_t *cursor); rstatus_t cursor_next(cursor_t *cursor, sds *key, sds *val, uint64_t *expire); #endif -/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ - diff --git a/src/ndb_job.c b/src/ndb_job.c index 5408b2f..60f78bc 100644 --- a/src/ndb_job.c +++ b/src/ndb_job.c @@ -95,6 +95,11 @@ job_run(void *arg) { job_t *job = arg; + if (job->type == JOB_REPL) { + job_run_repl(job); + return; + } + while (1) { pthread_mutex_lock(&job->mutex); job->running = 0; @@ -109,10 +114,8 @@ job_run(void *arg) case JOB_COMPACT: job_run_compact(job); break; - case JOB_REPL: /* TODO: jog->run(job) */ - job_run_repl(job); - break; default: + log_warn("unknown job: %d", job->type); break; } } diff --git a/src/ndb_oplog.c b/src/ndb_oplog.c index c3cc023..e1c41b8 100644 --- a/src/ndb_oplog.c +++ b/src/ndb_oplog.c @@ -479,7 +479,7 @@ oplog_range(oplog_t *oplog, uint64_t *first, uint64_t *last) if (oplog->opid == 0) { *first = 0; *last = 0; - return; + return NC_OK; } seg0 = array_get(oplog->segments, 0); diff --git a/src/ndb_repl.c b/src/ndb_repl.c index d7821d9..cdc678c 100644 --- a/src/ndb_repl.c +++ b/src/ndb_repl.c @@ -11,10 +11,9 @@ rstatus_t repl_init(void *owner, repl_t *repl) { - repl->owner = owner; + repl->master = NULL; - /* do nothing, what we want to init should in repl_start */ return NC_OK; } @@ -24,13 +23,6 @@ repl_deinit(repl_t *repl) return NC_OK; } - -rstatus_t -repl_start(repl_t *repl) -{ - return NC_OK; -} - /** * connect to master with retry */ @@ -43,6 +35,8 @@ repl_connect(repl_t *repl) struct timeval timeout = {0, 500000 }; /* default 500 ms */ uint32_t retry; + ASSERT(repl->master != NULL); + host = strdup(repl->master); if (host == NULL) { log_warn("nomem on strdup"); @@ -83,8 +77,11 @@ repl_connect(repl_t *repl) static void repl_disconnect(repl_t *repl) { - redisFree(repl->conn); - repl->conn = NULL; + log_info("disconnected from %s", repl->master); + if (repl->conn) { + redisFree(repl->conn); + repl->conn = NULL; + } } /* PING master */ @@ -349,7 +346,9 @@ repl_sync(repl_t *repl) return status; } - if (!(repl->repl_opid >= range[0] && repl->repl_opid <= range[1])) { + /*TODO: think carefully about this */ + if (repl->repl_opid == 0 || + repl->repl_opid < range[0] || repl->repl_opid > range[1]) { /* init sync or outof sync, need a full resync */ status = repl_sync_full(repl); if (status != NC_OK) { @@ -386,6 +385,12 @@ repl_run(repl_t *repl) rstatus_t status; while (true) { + if (repl->master == NULL) { + log_debug("no repl->master is set, repl wait"); + usleep(repl->sleep_time * 1000); + continue; + } + status = repl_connect(repl); if (status != NC_OK) { log_error("can not connect to master (%s), error: %s\n", repl->master, strerror(errno)); @@ -413,14 +418,54 @@ repl_info_flush(repl_t *repl) return NC_OK; } +/** + * change repl->master + * + * set repl->master to NULL means it's not a slave. + * + * TODO: need thread safe + * + */ rstatus_t repl_set_master(repl_t *repl, char *master) { + instance_t *instance = repl->owner; + store_t *store = &instance->store; + + if (master == NULL && repl->master == NULL) { + log_info("master not change"); + return NC_OK; + } + + if (master && repl->master && + 0 == nc_strncmp(master, repl->master, strlen(master))) { + log_info("master not change"); + return NC_OK; + } + + /* There was no previous master or the user specified a different one, + * we can continue. */ + + /* + * + * TODO: we should set repl.newmaster here. + * and in the repl thread, we check repl.newmaster and if it's set, do disconnect and reconnect + * + * - how about the newmaster is NULL? + */ log_info("set master from %s to %s", repl->master, master); if (repl->master) { + repl_disconnect(repl); + repl->repl_opid = 0; + sdsfree(repl->master); + repl->master = NULL; + } + + if (master) { + store_drop(store); + repl->master = sdsnew(master); } - repl->master = sdsnew(master); return NC_OK; } diff --git a/test/unit/test_oplog.py b/test/unit/test_oplog.py index eb3a2a1..130387e 100644 --- a/test/unit/test_oplog.py +++ b/test/unit/test_oplog.py @@ -8,16 +8,19 @@ import StringIO ndb2 = NDB('127.0.0.5', 5529, '/tmp/r/ndb-5529/', {'loglevel': T_VERBOSE}) +ndb3 = NDB('127.0.0.5', 5530, '/tmp/r/ndb-5530/', {'loglevel': T_VERBOSE}) def _setup(): print 'xxxxx _setup' - ndb2.deploy() - ndb2.start() + for ndb in [ndb2, ndb3]: + ndb.deploy() + ndb.start() def _teardown(): print 'xxxxx _teardown' - assert(ndb2._alive()) - ndb2.stop() + for ndb in [ndb2, ndb3]: + assert(ndb._alive()) + ndb.stop() def test_oplog(): k = 'kkkkk' @@ -92,7 +95,7 @@ def test_repl(): conn.set(k, v) conn.expire(k, 100) - conn2.slaveof('%s:%s' % (ndb.host(), ndb.port())) + conn2.slaveof(ndb.host(), ndb.port()) time.sleep(2) print _get_all_keys(conn) @@ -124,7 +127,7 @@ def test_repl_slave_readonly(): conn.set(k, v) conn.expire(k, 100) - conn2.slaveof('%s:%s' % (ndb.host(), ndb.port())) + conn2.slaveof(ndb.host(), ndb.port()) time.sleep(2) for k, v in kv.items(): @@ -136,6 +139,55 @@ def test_repl_slave_readonly(): assert_fail("READONLY", conn2.delete, 'key') assert_fail("READONLY", conn2.flushdb) +@with_setup(_setup, _teardown) +def test_repl_bad_master(): + conn2 = get_conn(ndb2) + conn2.slaveof(ndb.host(), '500') # 500 is a bad port + for i in range(3): + time.sleep(1) + assert_fail("READONLY", conn2.set, 'k', 'v') + assert(_get_all_keys(conn2) == []) + +''' + +ndb2 slaveof +NULL -> ndb -> ndb3 -> NULL + +''' +@with_setup(_setup, _teardown) +def test_repl_switch_master(): + conn = get_conn() + conn2 = get_conn(ndb2) + conn3 = get_conn(ndb3) + + kv = {'kkk-%s' % i : 'vvv-%s' % i for i in range(12)} + for k, v in kv.items(): + conn.set(k, v) + + kv = {'kkk-%s' % i : 'vvv-%s' % i for i in range(20, 22)} + for k, v in kv.items(): + conn3.set(k, v) + + # slave of ndb + conn2.slaveof(ndb.host(), ndb.port()) + time.sleep(2) + assert(_get_all_keys(conn) == _get_all_keys(conn2)) + + # slave of ndb3 + conn2.slaveof(ndb3.host(), ndb3.port()) + time.sleep(2) + assert(_get_all_keys(conn3) == _get_all_keys(conn2)) + + # slave of NO ONE + conn2.slaveof('no', 'one') + time.sleep(2) + assert(_get_all_keys(conn3) == _get_all_keys(conn2)) # still same as ndb3 + + + conn3.set('new-key', 'new-val') + time.sleep(1) + assert(_get_all_keys(conn3) == _get_all_keys(conn2)) + @with_setup(_setup, _teardown) def test_repl_master_restart(): conn = get_conn() @@ -146,7 +198,7 @@ def test_repl_master_restart(): conn.set(k, v) conn.expire(k, 100) - conn2.slaveof('%s:%s' % (ndb.host(), ndb.port())) + conn2.slaveof(ndb.host(), ndb.port()) time.sleep(2) assert(_get_all_keys(conn) == _get_all_keys(conn2))