diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index ce3ef4b05..bf14ff257 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -48,6 +48,7 @@ -define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900). -define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60). + -record(state, {discovery_peers = [] :: list(discovery_peer())}). %%%============================================================================ @@ -86,17 +87,8 @@ init([]) -> DefaultQueue = app_helper:get_env(riak_kv, replrtq_sinkqueue), SnkQueuePeerInfo = riak_kv_replrtq_snk:tokenise_peers(DefaultQueue, SinkPeers), - - MinDelay = - application:get_env(riak_kv, - replrtq_prompt_min_seconds, - ?AUTO_DISCOVERY_MINIMUM_SECONDS), - - lists:foreach( - fun({QueueName, _PeerInfo}) -> - _ = schedule_discovery(QueueName, self(), MinDelay) - end, - SnkQueuePeerInfo), + erlang:send_after( + riak_kv_util:ngr_initial_timeout(), self(), deferred_start), {ok, #state{discovery_peers = SnkQueuePeerInfo}}; false -> {ok, #state{}} @@ -131,6 +123,33 @@ handle_cast({prompt_discovery, QueueName}, State) -> _ = do_discovery(QueueName, PeerInfo, regular), {noreply, State}. +handle_info(deferred_start, State) -> + MinDelay = + application:get_env( + riak_kv, + replrtq_prompt_min_seconds, + ?AUTO_DISCOVERY_MINIMUM_SECONDS), + case riak_kv_util:kv_ready() of + true -> + lists:foreach( + fun({QueueName, PeerInfo}) -> + _ = schedule_discovery(QueueName, self(), MinDelay), + ?LOG_INFO( + "Initiated real-time repl peer ~p for queue ~p", + [PeerInfo, QueueName]) + end, + State#state.discovery_peers), + {noreply, State}; + false -> + ?LOG_INFO( + "Real-time repl peer discovery waiting ~w ms " + "to initialise as riak_kv not ready", + [MinDelay] + ), + erlang:send_after( + riak_kv_util:ngr_initial_timeout(), self(), deferred_start), + {noreply, State} + end; handle_info({scheduled_discovery, QueueName}, State) -> ok = prompt_discovery(QueueName), MinDelay = @@ -152,6 +171,7 @@ handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) -> [HTTPClientError]), {noreply, State}. + terminate(_Reason, _State) -> ok. diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index ad7d6ab1b..df9a1f81b 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -68,7 +68,6 @@ -define(STARTING_DELAYMS, 8). -define(MAX_SUCCESS_DELAYMS, 1024). -define(ON_ERROR_DELAYMS, 65536). --define(INITIAL_TIMEOUT_MS, 60000). -define(DEFAULT_WORKERCOUNT, 1). -record(sink_work, {queue_name :: queue_name(), @@ -410,9 +409,21 @@ handle_cast({requeue_work, WorkItem}, State) -> {noreply, State} end. -handle_info(deferred_start, State) -> - prompt_work(), - erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), +handle_info(deferred_start, State) -> + case riak_kv_util:kv_ready() of + true -> + ?LOG_INFO("Initiated real-time repl sink"), + prompt_work(), + erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats); + false -> + ?LOG_INFO( + "Real-time repl sink waiting ~w ms " + "to initialise as riak_kv not ready", + [riak_kv_util:ngr_initial_timeout()] + ), + erlang:send_after( + riak_kv_util:ngr_initial_timeout(), self(), deferred_start) + end, {noreply, State}; handle_info(log_stats, State) -> erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), @@ -453,7 +464,8 @@ handle_continue(initialise_work, State) -> {SnkQueueName, Iteration, SnkW} end, Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), - erlang:send_after(?INITIAL_TIMEOUT_MS, self(), deferred_start), + erlang:send_after( + riak_kv_util:ngr_initial_timeout(), self(), deferred_start), {noreply, State#state{enabled = true, work = Work}}. terminate(_Reason, State) -> @@ -678,7 +690,7 @@ close_pbc_client(PBC) -> %% @doc %% For an item of work which has been removed from the work queue, spawn a %% snk worker (using the repl_fetcher fun) to manage that item of work. The -%% worker must ensure the wortk_item is delivered back on completion. +%% worker must ensure the work_item is delivered back on completion. -spec do_work(sink_work()) -> sink_work(). do_work({QueueName, Iteration, SinkWork}) -> WorkQueue = SinkWork#sink_work.work_queue, diff --git a/src/riak_kv_ttaaefs_manager.erl b/src/riak_kv_ttaaefs_manager.erl index 696dd01e7..40ef8879e 100644 --- a/src/riak_kv_ttaaefs_manager.erl +++ b/src/riak_kv_ttaaefs_manager.erl @@ -57,9 +57,6 @@ -include_lib("kernel/include/logger.hrl"). -define(SECONDS_IN_DAY, 86400). --define(INITIAL_TIMEOUT, 60000). - % Wait a minute before the first allocation is considered, Lot may be - % going on at a node immeidately at startup -define(LOOP_TIMEOUT, 15000). % Always wait at least 15s after completing an action before % prompting another @@ -331,9 +328,9 @@ init([]) -> queue_name = SrcQueueName, peer_queue_name = PeerQueueName, check_window = CheckWindow}, - - ?LOG_INFO("Initiated Tictac AAE Full-Sync Mgr with scope=~w", [Scope]), - {ok, State2, ?INITIAL_TIMEOUT}. + erlang:send_after( + riak_kv_util:ngr_initial_timeout(), self(), deferred_start), + {ok, State2}. handle_call(pause, _From, State) -> case State#state.is_paused of @@ -356,7 +353,7 @@ handle_call(pause, _From, State) -> slice_allocations = [], slice_set_start = undefined, is_paused = true}, - ?INITIAL_TIMEOUT} + riak_kv_util:ngr_initial_timeout()} end; handle_call(resume, _From, State) -> case State#state.is_paused of @@ -369,16 +366,19 @@ handle_call(resume, _From, State) -> is_paused = false, slice_allocations = [], slice_set_start = undefined}, - ?INITIAL_TIMEOUT}; + riak_kv_util:ngr_initial_timeout()}; false -> - {reply, {error, not_paused}, State, ?INITIAL_TIMEOUT} + {reply, + {error, not_paused}, + State, + riak_kv_util:ngr_initial_timeout()} end; handle_call({set_sink, Protocol, PeerIP, PeerPort}, _From, State) -> State0 = State#state{peer_ip = PeerIP, peer_port = PeerPort, peer_protocol = Protocol}, - {reply, ok, State0, ?INITIAL_TIMEOUT}; + {reply, ok, State0, riak_kv_util:ngr_initial_timeout()}; handle_call({set_queuename, QueueName}, _From, State) -> {reply, ok, State#state{queue_name = QueueName}}; handle_call({set_allsync, LocalNVal, RemoteNVal}, _From, State) -> @@ -638,7 +638,23 @@ handle_cast({auto_check, ReqID, From, Now}, State) -> end, {noreply, State}. - +handle_info(deferred_start, State) -> + case riak_kv_util:kv_ready() of + true -> + ?LOG_INFO( + "Initiated Tictac AAE Full-Sync Mgr with scope=~w", + [State#state.scope]), + handle_info(timeout, State); + false -> + ?LOG_INFO( + "Tictac AAE Full-Sync Mgr waiting ~w ms " + "to initialise as riak_kv not ready", + [riak_kv_util:ngr_initial_timeout()] + ), + erlang:send_after( + riak_kv_util:ngr_initial_timeout(), self(), deferred_start), + {noreply, State} + end; handle_info(timeout, State) -> SlotInfoFun = State#state.slot_info_fun, SlotInfo = SlotInfoFun(), diff --git a/src/riak_kv_util.erl b/src/riak_kv_util.erl index 30b2ad7da..b5ef6d782 100644 --- a/src/riak_kv_util.erl +++ b/src/riak_kv_util.erl @@ -50,7 +50,10 @@ overload_reply/1, get_backend_config/3, is_modfun_allowed/2, - shuffle_list/1]). + shuffle_list/1, + kv_ready/0, + ngr_initial_timeout/0 + ]). -export([report_hashtree_tokens/0, reset_hashtree_tokens/2]). -include_lib("kernel/include/logger.hrl"). @@ -214,7 +217,16 @@ get_write_once(Bucket) -> Err end. +-spec kv_ready() -> boolean(). +kv_ready() -> + lists:member(riak_kv, riak_core_node_watcher:services(node())). +%% @doc +%% Replication services may wait a period on startup to ensure stability before +%% commencing. Default 60s. Normally only modified in test. +-spec ngr_initial_timeout() -> pos_integer(). +ngr_initial_timeout() -> + application:get_env(riak_kv, ngr_initial_timeout, 60000). %% =================================================================== %% Hashtree token management functions