diff --git a/include/ekaf_definitions.hrl b/include/ekaf_definitions.hrl index b07c224..b99bc6f 100644 --- a/include/ekaf_definitions.hrl +++ b/include/ekaf_definitions.hrl @@ -9,6 +9,7 @@ -define(EKAF_DEFAULT_PARTITION_STRATEGY , random). -define(EKAF_DEFAULT_PULL_FOR_CHANGES_TIMEOUT , 60000). -define(EKAF_SYNC_TIMEOUT , 5000). +-define(EKAF_CONNECT_TIMEOUT , 5000). %%====================================================================== %% ekaf specific constants diff --git a/src/ekaf_demo.erl b/src/ekaf_demo.erl index 4fc93b9..80fbffb 100644 --- a/src/ekaf_demo.erl +++ b/src/ekaf_demo.erl @@ -164,7 +164,7 @@ tuple_to_statsd_key(X) when is_binary(X) -> %% Public helper functions %%==================================================================== get_workers(Topic)-> - pg2:get_local_members(Topic). + pg2l:get_local_members(Topic). get_dead_workers(Topic) when is_binary(Topic)-> Workers = get_workers(Topic), diff --git a/src/ekaf_fsm.erl b/src/ekaf_fsm.erl index b42b7db..86e3acb 100644 --- a/src/ekaf_fsm.erl +++ b/src/ekaf_fsm.erl @@ -120,7 +120,7 @@ ready({produce_async, _Messages} = Async, PrevState)-> ready({produce_async_batched, _Messages}= Async, PrevState)-> ekaf_lib:handle_async_as_batch(true, Async, PrevState); ready(ping, #ekaf_fsm{ topic = Topic } = State)-> - pg2:join(Topic,self()), + pg2l:join(Topic,self()), gproc:send({n,l,Topic}, {worker, up, self(), ready, State, undefined}), fsm_next_state(ready,State); ready({timeout, Timer, <<"refresh">>}, #ekaf_fsm{ buffer = Buffer, max_buffer_size = MaxBufferSize, buffer_ttl = BufferTTL, cor_id = PrevCorId, last_known_size = LastKnownSize} = PrevState)-> @@ -293,7 +293,7 @@ terminate(Reason, StateName, #ekaf_fsm{ id = WorkerId, socket = Socket, topic = io:format("~n ~p stopping since ~p when buffer had ~p items",[Self, Reason, Buffer]), gproc:send({n,l,Topic}, {add, queue, Buffer}) end, - pg2:leave(Topic,self()), + pg2l:leave(Topic,self()), ok. %%-------------------------------------------------------------------- %% Func: code_change/4 diff --git a/src/ekaf_lib.erl b/src/ekaf_lib.erl index 9b7626a..6a0908e 100644 --- a/src/ekaf_lib.erl +++ b/src/ekaf_lib.erl @@ -29,13 +29,14 @@ flush_messages_callback/1, flushed_messages_replied_callback/2 ]). -prepare(Topic)-> - ekaf_sup:start_child(ekaf_sup, - {Topic, {ekaf_server, start_link, [[Topic]]}, - transient, infinity, worker, []} - ), +prepare(Topic)-> Pid = (catch gproc:where({n,l,Topic})), case Pid of + undefined -> + ekaf_sup:start_child(ekaf_sup, + {Topic, {ekaf_server, start_link, [[Topic]]}, + transient, infinity, worker, []} + ); SomePid when is_pid(SomePid)-> gen_fsm:sync_send_event(Pid, prepare); _E -> @@ -54,7 +55,7 @@ prepare(Topic, Callback)-> {ok, Pid} -> Callback(Pid); _ -> - ok + {error, not_prepared} end. %% Pick a worker, and pass it into the callback diff --git a/src/ekaf_picker.erl b/src/ekaf_picker.erl index 7d82db1..8dcd244 100644 --- a/src/ekaf_picker.erl +++ b/src/ekaf_picker.erl @@ -69,7 +69,7 @@ pick_sync(_Topic, _Callback, ketama, _Attempt)-> error; %% if strategy is sticky_round_robin or strict_round_robin or random pick_sync(Topic, Callback, _Strategy, _Attempt)-> - case pg2:get_closest_pid(Topic) of + case pg2l:get_closest_pid(Topic) of PoolPid when is_pid(PoolPid) -> handle_callback(Callback,PoolPid); {error, {no_process,_}}-> @@ -88,10 +88,10 @@ handle_callback(Callback, Pid)-> end. join_group_if_not_present(PG, Pid)-> - Pids = pg2:get_members(PG), + Pids = pg2l:get_members(PG), case lists:member(Pid, Pids) of true -> ok; _ -> - pg2:join(PG, Pid) + pg2l:join(PG, Pid) end. diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index 7ff3a39..b8f0471 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -65,7 +65,7 @@ init([Topic])-> Self = self(), State = generic_init(Topic), gproc:reg({n,l,Topic},[]), - pg2:create(Topic), + pg2l:create(Topic), ekaf_picker:join_group_if_not_present(Topic, self()), gen_fsm:send_event(self(), connect), {ok, downtime, State#ekaf_server{topic = Topic, worker = Self}}; @@ -178,7 +178,7 @@ ready(connect, #ekaf_server{ broker = Broker } = State)-> %% connection good, ask for metadata gen_fsm:send_event(self(), {metadata, req, Socket}); _ -> - gen_fsm:start_timer(5000, <<"reconnect">>) + gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>) end, fsm_next_state(ready, State); ready({metadata, req, Socket}, State) -> @@ -188,9 +188,8 @@ ready({metadata, req, Socket}, State) -> ready({metadata, resp, _Metadata} = Event, State)-> Next = ekaf_server_lib:handle_metadata_during_bootstrapping(Event, State), fsm_next_state(ready, Next#ekaf_server{ ongoing_metadata = false }); -ready({timeout, Timer, <<"reconnect">> = TimeoutKey}, State)-> +ready({timeout, Timer, <<"reconnect">> }, State)-> gen_fsm:cancel_timer(Timer), - gen_fsm:start_timer(5000, TimeoutKey), gen_fsm:send_event(self(), connect), fsm_next_state(ready, State); ready({timeout, Timer, <<"refresh">> = TimeoutKey}, #ekaf_server{ @@ -213,7 +212,7 @@ ready({timeout, Timer, <<"refresh">> = TimeoutKey}, #ekaf_server{ {error,_}-> State#ekaf_server{ ctr = 0 }; {NextWorker, NextState} when Strategy =:= strict_round_robin-> - Members = pg2:get_members(Topic), + Members = pg2l:get_members(Topic), NextWorkers = case Workers of [] -> Members; _ -> case State#ekaf_server.workers -- Members of [] -> Workers; _ -> Members end end, NextState#ekaf_server{ ctr = 0, worker = NextWorker, workers = NextWorkers}; {NextWorker, NextState} -> @@ -239,13 +238,12 @@ ready(Msg, State) -> %%-------------------------------------------------------------------- ready(info, _From, State)-> Reply = State, - {reply, Reply, State}; + {reply, Reply, ready, State}; ready({produce_sync, Messages}, _From, State)-> ekaf_server_lib:save_messages(ready, State, Messages); -ready(prepare, From, #ekaf_server{ kv = KV } = State)-> - %got prepare during downtime - %let reply_to_prepares handle these when its ready - fsm_next_state(ready, State#ekaf_server{ kv = dict:append(prepare, From, KV)}); +ready(prepare, From, #ekaf_server{ worker = Worker } = State)-> + gen_fsm:reply(From, {ok, Worker}), + fsm_next_state(ready, State); ready(metadata, From, #ekaf_server{ kv = KV} = State)-> % got metadata during ready % this means that regular workers arent ready yet, @@ -268,19 +266,11 @@ ready(Msg, _From, State)-> downtime(info, _From, State)-> Reply = State, - {reply, Reply, State}; -downtime(prepare, From, #ekaf_server{ kv = KV, socket = Socket } = State)-> - %got prepare during downtime - case Socket of - undefined -> - %% downtime since server down - gen_fsm:reply(From, {ok,self()}), - fsm_next_state(downtime, State); - _ -> - %% downtime since topic is still being queries - %% let reply_to_prepares handle these when its ready - fsm_next_state(downtime, State#ekaf_server{ kv = dict:append(prepare, From, KV)}) - end; + {reply, Reply, downtime, State}; +downtime(prepare, From, State)-> + %% downtime since server down + gen_fsm:reply(From, {ok,self()}), + fsm_next_state(downtime, State); downtime({produce_sync, Messages}, From, State)-> gen_fsm:reply(From, {error, downtime}), ekaf_server_lib:save_messages(downtime, State, Messages); @@ -374,7 +364,7 @@ handle_info({worker, down, WorkerDown, WorkerId, WorkerDownStateName, WorkerDown fsm_next_state(ready, State#ekaf_server{ ongoing_metadata = true, workers = NextWorkers, worker = NextWorker, time = os:timestamp() } ) end; handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State) -> - pg2:leave(Topic, self()), + pg2l:leave(Topic, self()), case StateName of ready -> ekaf_server_lib:send_messages(StateName, State, lists:reverse(OfflineMessages)); @@ -389,7 +379,7 @@ handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateNa ok end, Next = ekaf_server_lib:reply_to_prepares(WorkerUp, State), - fsm_next_state(StateName, Next#ekaf_server{ worker = WorkerUp, messages = [], workers = pg2:get_members(Topic)}); + fsm_next_state(StateName, Next#ekaf_server{ worker = WorkerUp, messages = [], workers = pg2l:get_members(Topic)}); handle_info({set, strategy, Value}, ready, State)-> Next = State#ekaf_server{ strategy = Value }, fsm_next_state(ready, Next); @@ -409,6 +399,12 @@ handle_info({add, queue, Messages}, StateName, #ekaf_server{ messages = OfflineM %% when a partition worker dies, its queue gets added to the downtime queue %% the downtime queue is flushed when it the connection is good to go again fsm_next_state(StateName, State#ekaf_server{ messages = lists:append( OfflineMessages, Messages) }); + +handle_info(purge_messages, StateName, #ekaf_server{ messages = OfflineMessages } = State)-> + ?INFO_MSG("Purge ~p messages for topic in process ~p~n", + [length(OfflineMessages), self()]), + fsm_next_state(StateName, State#ekaf_server{ messages = [] }); + handle_info(_Info, StateName, State) -> ?INFO_MSG("dont know how to handle ~p during ~p",[_Info, StateName]), fsm_next_state(StateName, State). @@ -419,7 +415,7 @@ handle_info(_Info, StateName, State) -> %% Returns: any %%-------------------------------------------------------------------- terminate(_Reason, _StateName, #ekaf_server{ topic = Topic }) -> - pg2:delete(Topic), + pg2l:delete(Topic), ok. %%-------------------------------------------------------------------- diff --git a/src/ekaf_server_lib.erl b/src/ekaf_server_lib.erl index ee9e906..207de50 100644 --- a/src/ekaf_server_lib.erl +++ b/src/ekaf_server_lib.erl @@ -83,7 +83,7 @@ handle_pick(Pick, _From, State) -> {Error, State}. reconnect_attempt()-> - gen_fsm:start_timer(1000,<<"reconnect">>). + gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>). save_messages(StateName, #ekaf_server{ messages = OfflineMessages, worker = Worker } = State, Messages)-> case StateName of diff --git a/src/ekaf_socket.erl b/src/ekaf_socket.erl index 816e601..dde4bf6 100644 --- a/src/ekaf_socket.erl +++ b/src/ekaf_socket.erl @@ -25,8 +25,9 @@ open({Host,Port}) when is_binary(Host)-> open({ ekaf_utils:btoa(Host),Port}); open({Host,Port})-> - gen_tcp:connect(Host, Port, [binary,{packet, 4} - ,{sndbuf, 10000000}]). + gen_tcp:connect(Host, Port, + [binary,{packet, 4},{sndbuf, 10000000}], + ?EKAF_CONNECT_TIMEOUT). close(undefined)-> ok; diff --git a/src/pg2l.erl b/src/pg2l.erl new file mode 100644 index 0000000..a56b244 --- /dev/null +++ b/src/pg2l.erl @@ -0,0 +1,336 @@ +%% +%% Pg2l creates and manages process groups on local node +%% for client library scenarioes. +%% It is a local progress group module, and fully compatiable with +%% (copied from) the distributed one pg2. +%% + +-module(pg2l). +-author('eric.l.2046@gmail.com'). + +-export([create/1, delete/1, join/2, leave/2]). +-export([get_members/1, get_local_members/1]). +-export([get_closest_pid/1, which_groups/0]). +-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, + terminate/2]). + +%%% +%%% Exported functions +%%% + +-spec start_link() -> {'ok', pid()} | {'error', any()}. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec start() -> {'ok', pid()} | {'error', any()}. + +start() -> + ensure_started(). + +-type name() :: any(). + +-spec create(Name :: name()) -> 'ok'. + +create(Name) -> + _ = ensure_started(), + case ets:member(pg2l_table, {group, Name}) of + false -> + gen_server:call(?MODULE, {create, Name}), + ok; + true -> + ok + end. + +-spec delete(Name :: name()) -> 'ok'. + +delete(Name) -> + _ = ensure_started(), + gen_server:call(?MODULE, {delete, Name}), + ok. + +-spec join(Name, Pid :: pid()) -> 'ok' | {'error', {'no_such_group', Name}} + when Name :: name(). + +join(Name, Pid) when is_pid(Pid) -> + _ = ensure_started(), + case ets:member(pg2l_table, {group, Name}) of + false -> + {error, {no_such_group, Name}}; + true -> + gen_server:call(?MODULE,{join, Name, Pid}), + ok + end. + +-spec leave(Name, Pid :: pid()) -> 'ok' | {'error', {'no_such_group', Name}} + when Name :: name(). + +leave(Name, Pid) when is_pid(Pid) -> + _ = ensure_started(), + case ets:member(pg2l_table, {group, Name}) of + false -> + {error, {no_such_group, Name}}; + true -> + gen_server:call(?MODULE,{leave, Name, Pid}), + ok + end. + +-spec get_members(Name) -> [pid()] | {'error', {'no_such_group', Name}} + when Name :: name(). + +get_members(Name) -> + _ = ensure_started(), + case ets:member(pg2l_table, {group, Name}) of + true -> + group_members(Name); + false -> + {error, {no_such_group, Name}} + end. + +-spec get_local_members(Name) -> [pid()] | {'error', {'no_such_group', Name}} + when Name :: name(). + +get_local_members(Name) -> + _ = ensure_started(), + case ets:member(pg2l_table, {group, Name}) of + true -> + local_group_members(Name); + false -> + {error, {no_such_group, Name}} + end. + +-spec which_groups() -> [Name :: name()]. + +which_groups() -> + _ = ensure_started(), + all_groups(). + +-spec get_closest_pid(Name) -> pid() | {'error', Reason} when + Name :: name(), + Reason :: {'no_process', Name} | {'no_such_group', Name}. + +get_closest_pid(Name) -> + case get_local_members(Name) of + [Pid] -> + Pid; + [] -> + {error, {no_process, Name}}; + Members when is_list(Members) -> + {_,_,X} = erlang:now(), + lists:nth((X rem length(Members))+1, Members); + Else -> + Else + end. + +%%% +%%% Callback functions from gen_server +%%% + +-record(state, {}). + +-type state() :: #state{}. + +-spec init(Arg :: []) -> {'ok', state()}. + +init([]) -> + pg2l_table = ets:new(pg2l_table, [ordered_set, protected, named_table]), + {ok, #state{}}. + +-spec handle_call(Call :: {'create', Name} + | {'delete', Name} + | {'join', Name, Pid :: pid()} + | {'leave', Name, Pid :: pid()}, + From :: {pid(),Tag :: any()}, + State :: state()) -> {'reply', 'ok', state()} + when Name :: name(). + +handle_call({create, Name}, _From, S) -> + assure_group(Name), + {reply, ok, S}; +handle_call({join, Name, Pid}, _From, S) -> + ets:member(pg2l_table, {group, Name}) andalso join_group(Name, Pid), + {reply, ok, S}; +handle_call({leave, Name, Pid}, _From, S) -> + ets:member(pg2l_table, {group, Name}) andalso leave_group(Name, Pid), + {reply, ok, S}; +handle_call({delete, Name}, _From, S) -> + delete_group(Name), + {reply, ok, S}; +handle_call(Request, From, S) -> + error_logger:warning_msg("The pg2l server received an unexpected message:\n" + "handle_call(~p, ~p, _)\n", + [Request, From]), + {noreply, S}. + +-spec handle_cast(Cast :: {'exchange', node(), Names :: [[Name,...]]} + | {'del_member', Name, Pid :: pid()}, + State :: state()) -> {'noreply', state()} + when Name :: name(). + +handle_cast(_, S) -> + %% Ignore {exchange, Name, Pid}. + %% Ignore {del_member, Name, Pid}. + {noreply, S}. + +-spec handle_info(Tuple :: tuple(), State :: state()) -> + {'noreply', state()}. + +handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> + member_died(MonitorRef), + {noreply, S}; +handle_info(_, S) -> + {noreply, S}. + +-spec terminate(Reason :: any(), State :: state()) -> 'ok'. + +terminate(_Reason, _S) -> + true = ets:delete(pg2l_table), + ok. + +%%% +%%% Local functions +%%% + +%%% One ETS table, pg2l_table, is used for bookkeeping. The type of the +%%% table is ordered_set, and the fast matching of partially +%%% instantiated keys is used extensively. +%%% +%%% {{group, Name}} +%%% Process group Name. +%%% {{ref, Pid}, RPid, MonitorRef, Counter} +%%% {{ref, MonitorRef}, Pid} +%%% Each process has one monitor. Sometimes a process is spawned to +%%% monitor the pid (RPid). Counter is incremented when the Pid joins +%%% some group. +%%% {{member, Name, Pid}, GroupCounter} +%%% Pid is a member of group Name, GroupCounter is incremented when the +%%% Pid joins the group Name. +%%% {{pid, Pid, Name}} +%%% Pid is a member of group Name. + +assure_group(Name) -> + Key = {group, Name}, + ets:member(pg2l_table, Key) orelse true =:= ets:insert(pg2l_table, {Key}). + +delete_group(Name) -> + _ = [leave_group(Name, Pid) || Pid <- group_members(Name)], + true = ets:delete(pg2l_table, {group, Name}), + ok. + +member_died(Ref) -> + [{{ref, Ref}, Pid}] = ets:lookup(pg2l_table, {ref, Ref}), + Names = member_groups(Pid), + [leave_group(Name, P) || + Name <- Names, + P <- member_in_group(Pid, Name)], + ok. + +join_group(Name, Pid) -> + Ref_Pid = {ref, Pid}, + try _ = ets:update_counter(pg2l_table, Ref_Pid, {4, +1}), true + catch _:_ -> + {RPid, Ref} = do_monitor(Pid), + true = ets:insert(pg2l_table, {Ref_Pid, RPid, Ref, 1}), + true = ets:insert(pg2l_table, {{ref, Ref}, Pid}) + end, + Member_Name_Pid = {member, Name, Pid}, + try _ = ets:update_counter(pg2l_table, Member_Name_Pid, {2, +1}) + catch _:_ -> + true = ets:insert(pg2l_table, {Member_Name_Pid, 1}), + true = ets:insert(pg2l_table, {{pid, Pid, Name}}) + end. + +leave_group(Name, Pid) -> + Member_Name_Pid = {member, Name, Pid}, + try ets:update_counter(pg2l_table, Member_Name_Pid, {2, -1}) of + N -> + if + N =:= 0 -> + true = ets:delete(pg2l_table, {pid, Pid, Name}), + true = ets:delete(pg2l_table, Member_Name_Pid); + true -> + ok + end, + Ref_Pid = {ref, Pid}, + case ets:update_counter(pg2l_table, Ref_Pid, {4, -1}) of + 0 -> + [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2l_table, Ref_Pid), + true = ets:delete(pg2l_table, {ref, Ref}), + true = ets:delete(pg2l_table, Ref_Pid), + true = erlang:demonitor(Ref, [flush]), + kill_monitor_proc(RPid, Pid); + _ -> + ok + end + catch _:_ -> + ok + end. + +group_members(Name) -> + [P || + [P, N] <- ets:match(pg2l_table, {{member, Name, '$1'},'$2'}), + _ <- lists:seq(1, N)]. + +local_group_members(Name) -> + group_members(Name). + +member_in_group(Pid, Name) -> + case ets:lookup(pg2l_table, {member, Name, Pid}) of + [] -> []; + [{{member, Name, Pid}, N}] -> + lists:duplicate(N, Pid) + end. + +member_groups(Pid) -> + [Name || [Name] <- ets:match(pg2l_table, {{pid, Pid, '$1'}})]. + +all_groups() -> + [N || [N] <- ets:match(pg2l_table, {{group,'$1'}})]. + +ensure_started() -> + case whereis(?MODULE) of + undefined -> + C = {pg2l, {?MODULE, start_link, []}, permanent, + 1000, worker, [?MODULE]}, + supervisor:start_child(kernel_safe_sup, C); + Pg2lPid -> + {ok, Pg2lPid} + end. + + +kill_monitor_proc(RPid, Pid) -> + RPid =:= Pid orelse exit(RPid, kill). + +%% When/if erlang:monitor() returns before trying to connect to the +%% other node this function can be removed. +do_monitor(Pid) -> + case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of + true -> + %% Assume the node is still up + {Pid, erlang:monitor(process, Pid)}; + false -> + F = fun() -> + Ref = erlang:monitor(process, Pid), + receive + {'DOWN', Ref, process, Pid, _Info} -> + exit(normal) + end + end, + erlang:spawn_monitor(F) + end. + +%%%% + +-ifdef(TEST). +-define(TEST_TOPIC,<<"ekaf">>). +-include_lib("eunit/include/eunit.hrl"). + +simple_test() -> + T = ?TEST_TOPIC, + pg2l:create(T), + pg2l:join(T, self()), + ?assertEqual(pg2l:get_local_members(T), [self()]), + ?assertEqual(pg2l:get_closest_pid(T), self()), + pg2l:delete(T). + +-endif. diff --git a/test/ekaf_tests.erl b/test/ekaf_tests.erl index 200fc89..e21bb05 100644 --- a/test/ekaf_tests.erl +++ b/test/ekaf_tests.erl @@ -105,7 +105,7 @@ pick_test_() -> ,{spawn, ?_test(?debugVal(t_produce_async_multi_in_batch_to_topic()))} , ?_test(t_is_clean()) - , {spawn, ?_test(?debugVal(t_restart_kafka_broker()))} + , {timeout, 10, {spawn, ?_test(?debugVal(t_restart_kafka_broker()))}} , ?_test(t_is_clean()) ,{spawn, ?_test(?debugVal(t_change_kafka_config()))} , ?_test(t_is_clean())