Skip to content
This repository has been archived by the owner on Feb 3, 2025. It is now read-only.

Commit

Permalink
Check KV ready before starting nextgenrepl (#23)
Browse files Browse the repository at this point in the history
* Check KV ready before starting nextgenrepl

If riak_kv is not ready, and riak_kv_replrtq_snk starts work it will fetch from queues on the remote cluster, but not be able to put those changes into this cluster.

This defers the prompting of work on startup until after riak_kv is ready.  The `riak_kv_replrtq_peer` can also prompt work - so this too  is delayed until riak_kv has started.

The riak_kv_ttaaefs_manager will not function correctly without riak_kv being started, so this also has startup deferred should riak_kv not be ready after the initial startup timeout.

* Standardise controls over initial timeout

Allows for timeout to be reduced in test and aligned with waits in test.
  • Loading branch information
martinsumner authored Mar 28, 2024
1 parent daa619b commit b7b690f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 29 deletions.
42 changes: 31 additions & 11 deletions src/riak_kv_replrtq_peer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
-define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900).
-define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60).


-record(state, {discovery_peers = [] :: list(discovery_peer())}).

%%%============================================================================
Expand Down Expand Up @@ -86,17 +87,8 @@ init([]) ->
DefaultQueue = app_helper:get_env(riak_kv, replrtq_sinkqueue),
SnkQueuePeerInfo =
riak_kv_replrtq_snk:tokenise_peers(DefaultQueue, SinkPeers),

MinDelay =
application:get_env(riak_kv,
replrtq_prompt_min_seconds,
?AUTO_DISCOVERY_MINIMUM_SECONDS),

lists:foreach(
fun({QueueName, _PeerInfo}) ->
_ = schedule_discovery(QueueName, self(), MinDelay)
end,
SnkQueuePeerInfo),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{ok, #state{discovery_peers = SnkQueuePeerInfo}};
false ->
{ok, #state{}}
Expand Down Expand Up @@ -131,6 +123,33 @@ handle_cast({prompt_discovery, QueueName}, State) ->
_ = do_discovery(QueueName, PeerInfo, regular),
{noreply, State}.

handle_info(deferred_start, State) ->
MinDelay =
application:get_env(
riak_kv,
replrtq_prompt_min_seconds,
?AUTO_DISCOVERY_MINIMUM_SECONDS),
case riak_kv_util:kv_ready() of
true ->
lists:foreach(
fun({QueueName, PeerInfo}) ->
_ = schedule_discovery(QueueName, self(), MinDelay),
?LOG_INFO(
"Initiated real-time repl peer ~p for queue ~p",
[PeerInfo, QueueName])
end,
State#state.discovery_peers),
{noreply, State};
false ->
?LOG_INFO(
"Real-time repl peer discovery waiting ~w ms "
"to initialise as riak_kv not ready",
[MinDelay]
),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{noreply, State}
end;
handle_info({scheduled_discovery, QueueName}, State) ->
ok = prompt_discovery(QueueName),
MinDelay =
Expand All @@ -152,6 +171,7 @@ handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) ->
[HTTPClientError]),
{noreply, State}.


terminate(_Reason, _State) ->
ok.

Expand Down
24 changes: 18 additions & 6 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
-define(STARTING_DELAYMS, 8).
-define(MAX_SUCCESS_DELAYMS, 1024).
-define(ON_ERROR_DELAYMS, 65536).
-define(INITIAL_TIMEOUT_MS, 60000).
-define(DEFAULT_WORKERCOUNT, 1).

-record(sink_work, {queue_name :: queue_name(),
Expand Down Expand Up @@ -410,9 +409,21 @@ handle_cast({requeue_work, WorkItem}, State) ->
{noreply, State}
end.

handle_info(deferred_start, State) ->
prompt_work(),
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats),
handle_info(deferred_start, State) ->
case riak_kv_util:kv_ready() of
true ->
?LOG_INFO("Initiated real-time repl sink"),
prompt_work(),
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats);
false ->
?LOG_INFO(
"Real-time repl sink waiting ~w ms "
"to initialise as riak_kv not ready",
[riak_kv_util:ngr_initial_timeout()]
),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start)
end,
{noreply, State};
handle_info(log_stats, State) ->
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats),
Expand Down Expand Up @@ -453,7 +464,8 @@ handle_continue(initialise_work, State) ->
{SnkQueueName, Iteration, SnkW}
end,
Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo),
erlang:send_after(?INITIAL_TIMEOUT_MS, self(), deferred_start),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{noreply, State#state{enabled = true, work = Work}}.

terminate(_Reason, State) ->
Expand Down Expand Up @@ -678,7 +690,7 @@ close_pbc_client(PBC) ->
%% @doc
%% For an item of work which has been removed from the work queue, spawn a
%% snk worker (using the repl_fetcher fun) to manage that item of work. The
%% worker must ensure the wortk_item is delivered back on completion.
%% worker must ensure the work_item is delivered back on completion.
-spec do_work(sink_work()) -> sink_work().
do_work({QueueName, Iteration, SinkWork}) ->
WorkQueue = SinkWork#sink_work.work_queue,
Expand Down
38 changes: 27 additions & 11 deletions src/riak_kv_ttaaefs_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@
-include_lib("kernel/include/logger.hrl").

-define(SECONDS_IN_DAY, 86400).
-define(INITIAL_TIMEOUT, 60000).
% Wait a minute before the first allocation is considered, Lot may be
% going on at a node immeidately at startup
-define(LOOP_TIMEOUT, 15000).
% Always wait at least 15s after completing an action before
% prompting another
Expand Down Expand Up @@ -331,9 +328,9 @@ init([]) ->
queue_name = SrcQueueName,
peer_queue_name = PeerQueueName,
check_window = CheckWindow},

?LOG_INFO("Initiated Tictac AAE Full-Sync Mgr with scope=~w", [Scope]),
{ok, State2, ?INITIAL_TIMEOUT}.
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{ok, State2}.

handle_call(pause, _From, State) ->
case State#state.is_paused of
Expand All @@ -356,7 +353,7 @@ handle_call(pause, _From, State) ->
slice_allocations = [],
slice_set_start = undefined,
is_paused = true},
?INITIAL_TIMEOUT}
riak_kv_util:ngr_initial_timeout()}
end;
handle_call(resume, _From, State) ->
case State#state.is_paused of
Expand All @@ -369,16 +366,19 @@ handle_call(resume, _From, State) ->
is_paused = false,
slice_allocations = [],
slice_set_start = undefined},
?INITIAL_TIMEOUT};
riak_kv_util:ngr_initial_timeout()};
false ->
{reply, {error, not_paused}, State, ?INITIAL_TIMEOUT}
{reply,
{error, not_paused},
State,
riak_kv_util:ngr_initial_timeout()}
end;
handle_call({set_sink, Protocol, PeerIP, PeerPort}, _From, State) ->
State0 =
State#state{peer_ip = PeerIP,
peer_port = PeerPort,
peer_protocol = Protocol},
{reply, ok, State0, ?INITIAL_TIMEOUT};
{reply, ok, State0, riak_kv_util:ngr_initial_timeout()};
handle_call({set_queuename, QueueName}, _From, State) ->
{reply, ok, State#state{queue_name = QueueName}};
handle_call({set_allsync, LocalNVal, RemoteNVal}, _From, State) ->
Expand Down Expand Up @@ -638,7 +638,23 @@ handle_cast({auto_check, ReqID, From, Now}, State) ->
end,
{noreply, State}.


handle_info(deferred_start, State) ->
case riak_kv_util:kv_ready() of
true ->
?LOG_INFO(
"Initiated Tictac AAE Full-Sync Mgr with scope=~w",
[State#state.scope]),
handle_info(timeout, State);
false ->
?LOG_INFO(
"Tictac AAE Full-Sync Mgr waiting ~w ms "
"to initialise as riak_kv not ready",
[riak_kv_util:ngr_initial_timeout()]
),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{noreply, State}
end;
handle_info(timeout, State) ->
SlotInfoFun = State#state.slot_info_fun,
SlotInfo = SlotInfoFun(),
Expand Down
14 changes: 13 additions & 1 deletion src/riak_kv_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
overload_reply/1,
get_backend_config/3,
is_modfun_allowed/2,
shuffle_list/1]).
shuffle_list/1,
kv_ready/0,
ngr_initial_timeout/0
]).
-export([report_hashtree_tokens/0, reset_hashtree_tokens/2]).

-include_lib("kernel/include/logger.hrl").
Expand Down Expand Up @@ -214,7 +217,16 @@ get_write_once(Bucket) ->
Err
end.

-spec kv_ready() -> boolean().
kv_ready() ->
lists:member(riak_kv, riak_core_node_watcher:services(node())).

%% @doc
%% Replication services may wait a period on startup to ensure stability before
%% commencing. Default 60s. Normally only modified in test.
-spec ngr_initial_timeout() -> pos_integer().
ngr_initial_timeout() ->
application:get_env(riak_kv, ngr_initial_timeout, 60000).

%% ===================================================================
%% Hashtree token management functions
Expand Down

0 comments on commit b7b690f

Please sign in to comment.