Skip to content

Commit

Permalink
not correct on repl_set_master
Browse files Browse the repository at this point in the history
  • Loading branch information
idning committed Oct 11, 2014
1 parent 310e71f commit e02bf1e
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ test/test_oplog.o: ndb_oplog.c test/test_oplog.c

#test-repl
TEST_REPL_BIN=test_repl
TEST_REPL_OBJ=$(UTIL_OBJ) test/test_repl.o ndb_leveldb.o ndb_oplog.o
TEST_REPL_OBJ=$(UTIL_OBJ) test/test_repl.o ndb_leveldb.o ndb_oplog.o ndb_job.o
$(TEST_REPL_BIN): $(TEST_REPL_OBJ)
$(LD) $(LDFLAGS) -o $@ $^ $(LDLIBS)
test/test_repl.o: ndb_repl.c test/test_repl.c
Expand Down
6 changes: 3 additions & 3 deletions src/ndb.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ ndb_init(instance_t *instance)
return status;
}

status = server_init(instance, &instance->srv,
ndb_conn_recv_done, ndb_conn_send_done);
status = job_init(instance);
if (status != NC_OK) {
return status;
}

status = job_init(instance);
status = server_init(instance, &instance->srv,
ndb_conn_recv_done, ndb_conn_send_done);
if (status != NC_OK) {
return status;
}
Expand Down
8 changes: 2 additions & 6 deletions src/ndb_command.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,8 @@ command_process_slaveof(struct conn *conn, msg_t *msg)
repl_set_master(repl, master);
}

status = job_signal(JOB_REPL);
if (status == NC_OK) {
return command_reply_ok(conn);
} else {
return command_reply_err(conn, "-ERR repl already running\r\n");
}
return command_reply_ok(conn);

}

/*
Expand Down
2 changes: 0 additions & 2 deletions src/ndb_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,3 @@ sds cursor_next_key(cursor_t *cursor);
rstatus_t cursor_next(cursor_t *cursor, sds *key, sds *val, uint64_t *expire);

#endif
/* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */

9 changes: 6 additions & 3 deletions src/ndb_job.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ job_run(void *arg)
{
job_t *job = arg;

if (job->type == JOB_REPL) {
job_run_repl(job);
return;
}

while (1) {
pthread_mutex_lock(&job->mutex);
job->running = 0;
Expand All @@ -109,10 +114,8 @@ job_run(void *arg)
case JOB_COMPACT:
job_run_compact(job);
break;
case JOB_REPL: /* TODO: jog->run(job) */
job_run_repl(job);
break;
default:
log_warn("unknown job: %d", job->type);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/ndb_oplog.c
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ oplog_range(oplog_t *oplog, uint64_t *first, uint64_t *last)
if (oplog->opid == 0) {
*first = 0;
*last = 0;
return;
return NC_OK;
}

seg0 = array_get(oplog->segments, 0);
Expand Down
71 changes: 58 additions & 13 deletions src/ndb_repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
rstatus_t
repl_init(void *owner, repl_t *repl)
{

repl->owner = owner;
repl->master = NULL;

/* do nothing, what we want to init should in repl_start */
return NC_OK;
}

Expand All @@ -24,13 +23,6 @@ repl_deinit(repl_t *repl)
return NC_OK;
}


rstatus_t
repl_start(repl_t *repl)
{
return NC_OK;
}

/**
* connect to master with retry
*/
Expand All @@ -43,6 +35,8 @@ repl_connect(repl_t *repl)
struct timeval timeout = {0, 500000 }; /* default 500 ms */
uint32_t retry;

ASSERT(repl->master != NULL);

host = strdup(repl->master);
if (host == NULL) {
log_warn("nomem on strdup");
Expand Down Expand Up @@ -83,8 +77,11 @@ repl_connect(repl_t *repl)
static void
repl_disconnect(repl_t *repl)
{
redisFree(repl->conn);
repl->conn = NULL;
log_info("disconnected from %s", repl->master);
if (repl->conn) {
redisFree(repl->conn);
repl->conn = NULL;
}
}

/* PING master */
Expand Down Expand Up @@ -349,7 +346,9 @@ repl_sync(repl_t *repl)
return status;
}

if (!(repl->repl_opid >= range[0] && repl->repl_opid <= range[1])) {
/*TODO: think carefully about this */
if (repl->repl_opid == 0 ||
repl->repl_opid < range[0] || repl->repl_opid > range[1]) {
/* init sync or outof sync, need a full resync */
status = repl_sync_full(repl);
if (status != NC_OK) {
Expand Down Expand Up @@ -386,6 +385,12 @@ repl_run(repl_t *repl)
rstatus_t status;

while (true) {
if (repl->master == NULL) {
log_debug("no repl->master is set, repl wait");
usleep(repl->sleep_time * 1000);
continue;
}

status = repl_connect(repl);
if (status != NC_OK) {
log_error("can not connect to master (%s), error: %s\n", repl->master, strerror(errno));
Expand Down Expand Up @@ -413,14 +418,54 @@ repl_info_flush(repl_t *repl)
return NC_OK;
}

/**
* change repl->master
*
* set repl->master to NULL means it's not a slave.
*
* TODO: need thread safe
*
*/
rstatus_t
repl_set_master(repl_t *repl, char *master)
{
instance_t *instance = repl->owner;
store_t *store = &instance->store;

if (master == NULL && repl->master == NULL) {
log_info("master not change");
return NC_OK;
}

if (master && repl->master &&
0 == nc_strncmp(master, repl->master, strlen(master))) {
log_info("master not change");
return NC_OK;
}

/* There was no previous master or the user specified a different one,
* we can continue. */

/*
*
* TODO: we should set repl.newmaster here.
* and in the repl thread, we check repl.newmaster and if it's set, do disconnect and reconnect
*
* - how about the newmaster is NULL?
*/
log_info("set master from %s to %s", repl->master, master);
if (repl->master) {
repl_disconnect(repl);
repl->repl_opid = 0;

sdsfree(repl->master);
repl->master = NULL;
}

if (master) {
store_drop(store);
repl->master = sdsnew(master);
}
repl->master = sdsnew(master);

return NC_OK;
}
Expand Down
66 changes: 59 additions & 7 deletions test/unit/test_oplog.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import StringIO

ndb2 = NDB('127.0.0.5', 5529, '/tmp/r/ndb-5529/', {'loglevel': T_VERBOSE})
ndb3 = NDB('127.0.0.5', 5530, '/tmp/r/ndb-5530/', {'loglevel': T_VERBOSE})

def _setup():
print 'xxxxx _setup'
ndb2.deploy()
ndb2.start()
for ndb in [ndb2, ndb3]:
ndb.deploy()
ndb.start()

def _teardown():
print 'xxxxx _teardown'
assert(ndb2._alive())
ndb2.stop()
for ndb in [ndb2, ndb3]:
assert(ndb._alive())
ndb.stop()

def test_oplog():
k = 'kkkkk'
Expand Down Expand Up @@ -92,7 +95,7 @@ def test_repl():
conn.set(k, v)
conn.expire(k, 100)

conn2.slaveof('%s:%s' % (ndb.host(), ndb.port()))
conn2.slaveof(ndb.host(), ndb.port())

time.sleep(2)
print _get_all_keys(conn)
Expand Down Expand Up @@ -124,7 +127,7 @@ def test_repl_slave_readonly():
conn.set(k, v)
conn.expire(k, 100)

conn2.slaveof('%s:%s' % (ndb.host(), ndb.port()))
conn2.slaveof(ndb.host(), ndb.port())
time.sleep(2)

for k, v in kv.items():
Expand All @@ -136,6 +139,55 @@ def test_repl_slave_readonly():
assert_fail("READONLY", conn2.delete, 'key')
assert_fail("READONLY", conn2.flushdb)

@with_setup(_setup, _teardown)
def test_repl_bad_master():
conn2 = get_conn(ndb2)
conn2.slaveof(ndb.host(), '500') # 500 is a bad port
for i in range(3):
time.sleep(1)
assert_fail("READONLY", conn2.set, 'k', 'v')
assert(_get_all_keys(conn2) == [])

'''
ndb2 slaveof
NULL -> ndb -> ndb3 -> NULL
'''
@with_setup(_setup, _teardown)
def test_repl_switch_master():
conn = get_conn()
conn2 = get_conn(ndb2)
conn3 = get_conn(ndb3)

kv = {'kkk-%s' % i : 'vvv-%s' % i for i in range(12)}
for k, v in kv.items():
conn.set(k, v)

kv = {'kkk-%s' % i : 'vvv-%s' % i for i in range(20, 22)}
for k, v in kv.items():
conn3.set(k, v)

# slave of ndb
conn2.slaveof(ndb.host(), ndb.port())
time.sleep(2)
assert(_get_all_keys(conn) == _get_all_keys(conn2))

# slave of ndb3
conn2.slaveof(ndb3.host(), ndb3.port())
time.sleep(2)
assert(_get_all_keys(conn3) == _get_all_keys(conn2))

# slave of NO ONE
conn2.slaveof('no', 'one')
time.sleep(2)
assert(_get_all_keys(conn3) == _get_all_keys(conn2)) # still same as ndb3


conn3.set('new-key', 'new-val')
time.sleep(1)
assert(_get_all_keys(conn3) == _get_all_keys(conn2))

@with_setup(_setup, _teardown)
def test_repl_master_restart():
conn = get_conn()
Expand All @@ -146,7 +198,7 @@ def test_repl_master_restart():
conn.set(k, v)
conn.expire(k, 100)

conn2.slaveof('%s:%s' % (ndb.host(), ndb.port()))
conn2.slaveof(ndb.host(), ndb.port())

time.sleep(2)
assert(_get_all_keys(conn) == _get_all_keys(conn2))
Expand Down

0 comments on commit e02bf1e

Please sign in to comment.