diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index fba95217..39120553 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -195,7 +195,7 @@ node_dirty(Node) -> Leader -> Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), [riak_repl2_fscoordinator:node_dirty(Pid, Node) || - {_, Pid} <- Fullsyncs] + {_, Pid} <- Fullsyncs, Pid =/= self()] end. node_dirty(Pid, Node) -> @@ -350,6 +350,7 @@ handle_cast(start_fullsync, State) -> lager:info("Starting fullsync (source) with max_fssource_node=~p and max_fssource_cluster=~p", [MaxSource, MaxCluster]), {ok, Ring} = riak_core_ring_manager:get_my_ring(), + check_nodes_for_rt_dirty(Ring), N = largest_n(Ring), Partitions = sort_partitions(Ring), State2 = State#state{ @@ -1007,8 +1008,17 @@ notify_rt_dirty_nodes(State = #state{dirty_nodes = DirtyNodes, NodesToNotify = lists:subtract(AllNodesList, ordsets:to_list(DirtyNodesDuringFS)), lager:debug("Notifying nodes ~p", [ NodesToNotify]), - _ = rpc:multicall(NodesToNotify, riak_repl_stats, clear_rt_dirty, []), - State#state{dirty_nodes=ordsets:new()}; + {_, BadNodes} = rpc:multicall(NodesToNotify, + riak_repl_stats, + clear_rt_dirty, []), + case BadNodes of + [] -> + %% all nodes nodified, clear rt_dirty state + State#state{dirty_nodes=ordsets:new()}; + Nodes -> + lager:warning("Failed to clear rt_dirty on ~p", [Nodes]), + State + end; false -> lager:debug("No dirty nodes before fullsync started"), State @@ -1063,3 +1073,12 @@ flush_exit_message(Pid) -> ok end. +%% check all nodes in the cluster for existing rt_dirty files +%% and reset their rt_dirty flag if it exists +check_nodes_for_rt_dirty(Ring) -> + Owners = riak_core_ring:all_owners(Ring), + [ case rpc:call(Node, riak_repl_stats, is_rt_dirty,[]) of + false -> ok; + _ -> riak_repl2_fscoordinator:node_dirty(Node) + end || {_Part, Node} <- Owners ]. + diff --git a/src/riak_repl_stats.erl b/src/riak_repl_stats.erl index f654ce49..25c433c2 100644 --- a/src/riak_repl_stats.erl +++ b/src/riak_repl_stats.erl @@ -139,9 +139,11 @@ rt_dirty() -> try riak_repl2_fscoordinator:node_dirty(node()) catch - _:_ -> - lager:debug("Failed to notify coordinator of rt_dirty status") - end + W:Y -> + %% This could be triggered on startup if the + %% fscoordinator isn't running + lager:warning("Failed to notify coordinator of rt_dirty status due to ~p:~p.", [W,Y]) + end end), ok; false -> ok