From b000c6e689376ab0d26c3023d53ab6210aea1c5e Mon Sep 17 00:00:00 2001 From: GilbertWong Date: Mon, 21 Mar 2022 19:41:06 +0800 Subject: [PATCH 1/2] Provide an user-friendly behaviour `eetcd_watcher` --- erlang_ls.config | 6 + include/eetcd.hrl | 4 + src/eetcd_watcher.erl | 277 +++++++++++++++++++++++++++++++++ test/eetcd_watcher_SUITE.erl | 99 ++++++++++++ test/eetcd_watcher_example.erl | 47 ++++++ 5 files changed, 433 insertions(+) create mode 100644 erlang_ls.config create mode 100644 src/eetcd_watcher.erl create mode 100644 test/eetcd_watcher_SUITE.erl create mode 100644 test/eetcd_watcher_example.erl diff --git a/erlang_ls.config b/erlang_ls.config new file mode 100644 index 0000000..6019e9f --- /dev/null +++ b/erlang_ls.config @@ -0,0 +1,6 @@ +apps_dirs: + - "_build/default/lib/*" + +include_dirs: + - "include" + - "_build/default/lib/*/include" \ No newline at end of file diff --git a/include/eetcd.hrl b/include/eetcd.hrl index 76db3d8..61ccf28 100644 --- a/include/eetcd.hrl +++ b/include/eetcd.hrl @@ -34,4 +34,8 @@ -define(ETCD_CONNS, eetcd_conns). -record(eetcd_conn, {id, gun, conn, token = []}). + +-define(ETCD_WATCHERS, eetcd_watchers). +-record(eetcd_watcher, {name :: atom(), pid :: pid()}). + -endif. diff --git a/src/eetcd_watcher.erl b/src/eetcd_watcher.erl new file mode 100644 index 0000000..b3df259 --- /dev/null +++ b/src/eetcd_watcher.erl @@ -0,0 +1,277 @@ +-module(eetcd_watcher). + +-behaviour(gen_statem). + +-include("eetcd.hrl"). + +%% API +-export([start_link/3, stop/1]). +-export([watch/2, watch/3, unwatch/1, unwatch/2]). + +%% gen_statem callbacks +-export([callback_mode/0, init/1, ready/3, watching/3, recovering/3, terminate/3, code_change/4]). + +-define(SERVER, ?MODULE). + +-export_type([watcher/0, options/0, response/0, event/0, watch_req/0]). + +%% Types +-type reg_name() :: atom(). + +-type watch_id() :: integer(). + +-type options() :: #{retry_watch_interval_ms => integer(), + retry_watch_max_times => non_neg_integer(), + reg_name => reg_name(), %% register name for watcher process + client => name() %% watcher name for eetcd watch process + }. + +-type revision() :: integer(). + +-type watch_req() :: context(). + +-type watching_item() :: {watch_req(), revision()}. + +-type watching() :: #{watch_id() := watching_item()}. + +-type statedata() :: #{callback_mod := module(), + watching := watching(), + retry_watch_interval_ms := integer(), + retry_watch_max_times := non_neg_integer(), + retry_times := non_neg_integer(), + client := name(), + conn := undefined | eetcd_watch:watch_conn() + }. + +-type retry_times() :: non_neg_integer(). + +-type retry_item() :: {watch_req(), revision(), retry_times()}. + +-type response() :: router_pb:'Etcd.WatchResponse'(). + +-type event() :: router_pb:'mvccpb.Event'(). + +-type state() :: ready | watching | recovering. + +-type watcher() :: gen_statem:server_ref(). + +-define(DEFAULT_RETRY_WATCH_INTERVAL_MS, 3000). +-define(DEFAULT_RETRY_WATCH_MAX, infinity). +-define(DEFAULT_TIMEOUT, 5000). +-define(DEFAULT_WATCH_TIMEOUT, 3000). +-define(DEFAULT_UNWATCH_TIMEOUT, 3000). + +-callback handle_unwatch([response()], [response()]) -> ok. + +-callback handle_watch_events([event()]) -> ok. + +-optional_callbacks([ handle_unwatch/2 ]). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc send the watch request to the watcher. +-spec watch(watcher(), watch_req()) -> ok | {error, any()}. +watch(Watcher, WatchReq) -> + watch(Watcher, WatchReq, ?DEFAULT_WATCH_TIMEOUT). + +-spec watch(watcher(), watch_req(), timeout()) -> ok | {error, any()}. +watch(Watcher, WatchReq, Timeout) -> + gen_statem:call(Watcher, {watch, WatchReq, Timeout}, infinity). + +%% @doc send the unwatch request to the watcher. +%% Notice: it would unwatch all requests in +-spec unwatch(watcher()) -> ok | {error, any()}. +unwatch(Watcher) -> + unwatch(Watcher, ?DEFAULT_UNWATCH_TIMEOUT). + +-spec unwatch(watcher(), timeout()) -> ok | {error, any()}. +unwatch(Watcher, Timeout) -> + gen_statem:call(Watcher, {unwatch, Timeout}, infinity). + +%% @doc stop the eetcd watcher. +-spec stop(watcher()) -> ok. +stop(Watcher) -> + gen_statem:stop(Watcher). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% +%% Create a watcher process and start watching the keys in `WatchReq'. +%% +%% @end +%%-------------------------------------------------------------------- +-spec start_link(module(), name(), options()) -> gen_statem:start_ret(). +start_link(Mod, Client, Options) -> + RegName = maps:get(reg_name, Options, Mod), + gen_statem:start_link({local, RegName}, ?MODULE, {Mod, Client, Options}, []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Define the callback_mode() for this callback module. +%% @end +%%-------------------------------------------------------------------- +-spec callback_mode() -> gen_statem:callback_mode_result(). +callback_mode() -> state_functions. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initialize the watcher state. +%% @end +%%-------------------------------------------------------------------- +-spec init({module(), name(), options()}) -> {ok, ready, statedata()}. +init({Mod, Client, Options}) -> + process_flag(trap_exit, true), + RetryWatchInterval = maps:get(retry_watch_interval_ms, Options, ?DEFAULT_RETRY_WATCH_INTERVAL_MS), + RetryWatchMax = maps:get(retry_watch_max_times, Options, ?DEFAULT_RETRY_WATCH_MAX), + StateData = #{callback_mod => Mod, + watching => #{}, + retry_watch_interval_ms => RetryWatchInterval, + retry_watch_max_times => RetryWatchMax, + conn => undefined, + retry_times => 0, + client => Client + }, + {ok, ready, StateData}. + +-spec ready({call, gen_statem:from()}, {watch, watch_req(), timeout()}, statedata()) -> + keep_state_and_data | {next_state, watching}. +ready({call, Caller}, WatchReq = {watch, #{key := _Key}, _Timeout}, StateData) -> + {next_state, watching, StateData, [{next_event, {call, Caller}, WatchReq}]}; +ready(EventType, Msg, _StateData) -> + ?LOG_ERROR("Receive unexpected msg ~p of event type: ~p in ready state", [Msg, EventType]), + keep_state_and_data. + +-spec watching({call, gen_statem:from()}, {watch, watch_req(), timeout()}, statedata()) -> + keep_state_and_data | {next_state, watching}. +watching({call, Caller}, {watch, WatchReq, Timeout}, + StateData = #{watching := Watching, conn := WatchConn, client := Client}) -> + case eetcd_watch:watch(Client, WatchReq, WatchConn, Timeout) of + {ok, NewWatchConn = #{watch_ids := WatchIds}, WatchId} -> + #{revision := Revision} = maps:get(WatchId, WatchIds), + NewWatching = Watching#{WatchId => {WatchReq, Revision}}, + NewStateData = StateData#{conn => NewWatchConn, watching => NewWatching}, + {keep_state, NewStateData, {reply, Caller, ok}}; + Err = {error, _Reason} -> + {keep_state_and_data, {reply, Caller, Err}} + end; + +watching({call, Caller}, {unwatch, Timeout}, StateData = #{callback_mod := Mod, conn := Conn}) -> + {Reply, Responses, Events} = case eetcd_watch:unwatch(Conn, Timeout) of + {ok, Responses0, OtherEvents} -> + {ok, Responses0, OtherEvents}; + {error, Reason, Responses0, OtherEvents} -> + {{error, Reason}, Responses0, OtherEvents} + end, + try Mod:handle_unwatch(Responses, Events) + catch _Class:_Reason -> ok + end, + NewStateData = StateData#{conn => undefined, watching => #{}}, + {next_state, ready, NewStateData, {reply, Caller, Reply}}; +watching(info, Msg, StateData = #{conn := Conn, + watching := Watching, + callback_mod := Mod}) -> + case eetcd_watch:watch_stream(Conn, Msg) of + {ok, NewConn, WatchEvent = #{ events := Events }} -> + try Mod:handle_watch_events(Events) catch _C:_R -> ok end, + NewWatching = update_revision(Watching, WatchEvent), + {keep_state, StateData#{conn => NewConn, watching => NewWatching}}; + {more, NewConn} -> + NewStateData = StateData#{conn => NewConn}, + {keep_state, NewStateData}; + Err -> + ?LOG_ERROR("Watching stream failed for ~p", [Err]), + %% Due to the watch strategy is reuse the same stream by default. + %% If one `watch_stream' failed, all watch requests should be retried. + RetryItems = maps:values(Watching), + NewStateData = StateData#{conn => undefined, watching => #{}}, + {next_state, recovering, NewStateData, {state_timeout, 0, RetryItems}} + end; +watching(EventType, EventContent, _StateData) -> + ?LOG_ERROR("Receive unexpected msg ~p of event type ~p in watching state", + [EventContent, EventType]), + keep_state_and_data. + + +-spec recovering(state_timeout, [retry_item()], statedata()) -> + keep_state_and_data | + {next_state, ready, statedata()} | + {next_state, watching, statedata()} | + {keep_state, statedata(), {state_timeout, non_neg_integer(), non_neg_integer()}} | + {keep_state_and_data, {reply, gen_statem:from(), {error, recovering}}}. +recovering(state_timeout, _RetryItems , StateData = #{retry_watch_max_times := RetryWatchMaxTimes, + retry_times := RetryTimes}) + when RetryTimes >= RetryWatchMaxTimes -> + ?LOG_WARNING("Exceeded max retry times: ~p", [RetryWatchMaxTimes]), + {next_state, ready, StateData}; +recovering(state_timeout, RetryItems, StateData = #{retry_watch_interval_ms := RetryInterval, + retry_times := RetryTimes}) -> + case retry_watch(RetryItems, StateData) of + {ok, NewStateData} -> + {next_state, watching, NewStateData}; + {error, Reason} -> + ?LOG_ERROR("Retry watch error for ~p", [Reason]), + NewStateData = StateData#{rety_times => RetryTimes + 1}, + {keep_state, NewStateData, {state_timeout, RetryInterval, RetryItems}} + end; +recovering({call, Caller}, _Request, _StateData) -> + {keep_state_and_data, {reply, Caller, {error, recovering}}}; +recovering(EventType, EventContent, _StateData) -> + ?LOG_ERROR("Receive unexpected msg ~p of event type ~p in recovering state", + [EventType, EventContent]), + keep_state_and_data. + + +-spec terminate(any(), state(), statedata()) -> + any(). +terminate(_Reason, _State, _Data) -> + void. + +-spec code_change( + OldVsn :: term() | {down,term()}, + State :: state(), Data :: statedata(), Extra :: term()) -> + {ok, NewState :: term(), NewData :: term()} | + (Reason :: term()). +code_change(_OldVsn, State, Data, _Extra) -> + {ok, State, Data}. + + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% @doc If one request failed, it must be `stream_error' or `conn_error' or `http2_down' and +%% all requests should be retried. +-spec retry_watch([retry_item()], statedata()) -> {ok, statedata()} | {error, any()}. +retry_watch([], StateData) -> + {ok, StateData}; +retry_watch([{WatchReq, Revision} | RestRetryItem], StateData = #{conn := Conn, + client := Client, + watching := Watching + }) -> + NewWatchReq = eetcd_watch:with_start_revision(WatchReq, Revision + 1), + case eetcd_watch:watch(Client, NewWatchReq, Conn, ?DEFAULT_WATCH_TIMEOUT) of + {ok, NewWatchConn = #{watch_ids := WatchIds}, WatchId} -> + #{revision := NewRevision} = maps:get(WatchId, WatchIds), + NewWatching = Watching#{WatchId => {NewWatchReq, NewRevision}}, + NewStateData = StateData#{conn => NewWatchConn, watching => NewWatching}, + retry_watch(RestRetryItem, NewStateData); + {error, Reason} -> + {error, Reason} + end. + +-spec update_revision(watching(), router_pb:'Etcd.WatchResponse'()) -> watching(). +update_revision(Watching, #{header := #{revision := Rev}, watch_id := WatchId}) -> + maps:update_with(WatchId, fun({Key, _}) -> {Key, Rev} end, Watching). + diff --git a/test/eetcd_watcher_SUITE.erl b/test/eetcd_watcher_SUITE.erl new file mode 100644 index 0000000..59a52f0 --- /dev/null +++ b/test/eetcd_watcher_SUITE.erl @@ -0,0 +1,99 @@ +-module(eetcd_watcher_SUITE). + +-include_lib("eunit/include/eunit.hrl"). + +-export([all/0, suite/0, init_per_suite/1, end_per_suite/1]). + +-export([ watch/1, client_down/1 ]). + +-define(Name, ?MODULE). + +-define(Endpoints, ["127.0.0.1:2379", "127.0.0.1:2479", "127.0.0.1:2579"]). + +suite() -> + [{timetrap, {minutes, 2}}]. + +all() -> + [watch, client_down]. + + +init_per_suite(Config) -> + application:ensure_all_started(eetcd), + {ok, _Pid} = eetcd:open(?Name, ?Endpoints), + Config. + +end_per_suite(_Config) -> + eetcd:close(?Name), + application:stop(eetcd). + +watch(_Config) -> + %% register the name to receive messages from watcher example + register(?MODULE, self()), + + Key = <<"etcd_key">>, Value = <<"etcd_value">>, + {ok, _Pid} = eetcd_watcher_example:start_link(?Name, #{}), + WatchReq0 = eetcd_watch:new(), + WatchReq = eetcd_watch:with_key(WatchReq0, Key), + ok = eetcd_watcher_example:watch(WatchReq), + + eetcd_kv:put(?Name, Key, Value), + eetcd_kv:delete(?Name, Key), + + ?assertMatch([#{kv := #{key := Key, value := Value}, + type := 'PUT'}, + #{kv := #{key := Key}, type := 'DELETE'} + ], receive_msg(2)), + + ok = eetcd_watcher_example:unwatch(), + + ?assertMatch([{[#{canceled := true}], []}], receive_msg(1, 200)), + + eetcd_kv:put(?Name, Key, Value), + + ?assertEqual([], receive_msg(1, 200)), + + eetcd_watcher_example:stop(), + unregister(?MODULE). + +client_down(_Config) -> + register(?MODULE, self()), + eetcd_watcher_example:start_link(?Name, #{retry_watch_interval_ms => 100, + retry_watch_max => 10}), + + eetcd:open(helper, ?Endpoints), + + Key = <<"client_down">>, + WatchReq0 = eetcd_watch:new(), + WatchReq = eetcd_watch:with_key(WatchReq0, Key), + ok = eetcd_watcher_example:watch(WatchReq), + eetcd_kv:put(helper, Key, <<"ping1">>), + + eetcd:close(?Name), + + eetcd_kv:put(helper, Key, <<"ping2">>), + eetcd_kv:delete(helper, Key), + + eetcd:open(?Name, ?Endpoints), + timer:sleep(100), + Results = receive_msg(2, 300), + ct:pal("Results: ~p", [Results]), + + eetcd_watcher_example:stop(), + + unregister(?MODULE). + +receive_msg(Count) -> + receive_msg(Count, 100). + +receive_msg(Count, Timeout) -> + Results = receive_msg(Count, Timeout, []), + lists:flatten(lists:reverse(Results)). + +receive_msg(0, _Timeout, Acc) -> + Acc; +receive_msg(Count, Timeout, Acc) -> + receive Msg -> + receive_msg(Count - 1, Timeout, [Msg | Acc]) + after Timeout -> + Acc + end. diff --git a/test/eetcd_watcher_example.erl b/test/eetcd_watcher_example.erl new file mode 100644 index 0000000..aed9cd4 --- /dev/null +++ b/test/eetcd_watcher_example.erl @@ -0,0 +1,47 @@ +-module(eetcd_watcher_example). + +-behaviour(eetcd_watcher). + +-include("eetcd.hrl"). + +-export([ start_link/2, watch/1, unwatch/0, stop/0 ]). + +-export([ handle_watch_events/1, handle_unwatch/2 ]). + +-type options() :: eetcd_watcher:options(). + +-type response() :: eetcd_watcher:response(). + +-type event() :: eetcd_watcher:event(). + +-type watch_req() :: eetcd_watcher:watch_req(). + +-spec watch(watch_req()) -> ok. +watch(WatchReq) -> + eetcd_watcher:watch(?MODULE, WatchReq). + +-spec unwatch() -> ok. +unwatch() -> + eetcd_watcher:unwatch(?MODULE). + +-spec start_link(name(), options()) -> ok. +start_link(Client, Options) -> + eetcd_watcher:start_link(?MODULE, Client, Options). + +-spec stop() -> ok. +stop() -> + eetcd_watcher:stop(?MODULE). + +-spec handle_watch_events([event()]) -> ok. +handle_watch_events(Events) -> + ct:pal("Receive events: ~p", [Events]), + eetcd_watcher_SUITE ! Events, + ok. + +-spec handle_unwatch([response()], [event()]) -> ok. +handle_unwatch(Responses, Events) -> + ct:pal("Unwatch Responses:"), + ct:pal("Responses: ~p", [Responses]), + ct:pal("Events: ~p", [Events]), + eetcd_watcher_SUITE ! {Responses, Events}, + ok. From 9850089ee3ab8507ef6d73f74e138cb1a061aedf Mon Sep 17 00:00:00 2001 From: GilbertWong Date: Mon, 21 Mar 2022 20:33:01 +0800 Subject: [PATCH 2/2] Give `eetcd_watcher` behaviour example in README.md --- README.md | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/README.md b/README.md index ddd3c0c..f555d38 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,55 @@ We can use a single stream for multiplex watches, see [example](/test/eetcd_watc - `Active` is normal connection. - `Freeze` is a broken connection who try to reconnect after `ReconnectSecond`. + +##### Watcher - Watcher which use `eetcd_watcher` behaviour + +``` erlang +-module(eetcd_watcher_example). + +-include_lib("eetcd/include/eetcd.hrl"). +-behaviour(eetcd_watcher). + +-export([ start_link/2, watch/1, unwatch/0, stop/0 ]). + +-export([ handle_watch_events/1, handle_unwatch_response/2 ]). + + +-type options() :: eetcd_watcher:options(). + +-type response() :: eetcd_watcher:response(). + +-type event() :: eetcd_watcher:event(). + +-type watch_req() :: eetcd_watcher:watch_req(). + +-spec watch(watch_req()) -> ok. +watch(WatchReq) -> + eetcd_watcher:watch(?MODULE, WatchReq). + +-spec unwatch() -> ok. +unwatch() -> + eetcd_watcher:unwatch(?MODULE). + +-spec start_link(name(), options()) -> ok. +start_link(Client, Options) -> + eetcd_watcher:start_link(?MODULE, Client, Options). + +-spec stop() -> ok. +stop() -> + eetcd_watcher:stop(?MODULE). + +-spec handle_watch_events([event()]) -> ok. +handle_watch_events(Events) -> + %% Handle watch events ... + ok. + +-spec handle_unwatch([response()], [event()]) -> ok. +handle_unwatch(Responses, Events) -> + %% Handle unwatch response ... + ok. +``` + Test -----