diff --git a/include/ekaf_definitions.hrl b/include/ekaf_definitions.hrl index b07c224..b99bc6f 100644 --- a/include/ekaf_definitions.hrl +++ b/include/ekaf_definitions.hrl @@ -9,6 +9,7 @@ -define(EKAF_DEFAULT_PARTITION_STRATEGY , random). -define(EKAF_DEFAULT_PULL_FOR_CHANGES_TIMEOUT , 60000). -define(EKAF_SYNC_TIMEOUT , 5000). +-define(EKAF_CONNECT_TIMEOUT , 5000). %%====================================================================== %% ekaf specific constants diff --git a/src/ekaf_lib.erl b/src/ekaf_lib.erl index 9b7626a..6a0908e 100644 --- a/src/ekaf_lib.erl +++ b/src/ekaf_lib.erl @@ -29,13 +29,14 @@ flush_messages_callback/1, flushed_messages_replied_callback/2 ]). -prepare(Topic)-> - ekaf_sup:start_child(ekaf_sup, - {Topic, {ekaf_server, start_link, [[Topic]]}, - transient, infinity, worker, []} - ), +prepare(Topic)-> Pid = (catch gproc:where({n,l,Topic})), case Pid of + undefined -> + ekaf_sup:start_child(ekaf_sup, + {Topic, {ekaf_server, start_link, [[Topic]]}, + transient, infinity, worker, []} + ); SomePid when is_pid(SomePid)-> gen_fsm:sync_send_event(Pid, prepare); _E -> @@ -54,7 +55,7 @@ prepare(Topic, Callback)-> {ok, Pid} -> Callback(Pid); _ -> - ok + {error, not_prepared} end. %% Pick a worker, and pass it into the callback diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index 7ff3a39..7d98967 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -178,7 +178,7 @@ ready(connect, #ekaf_server{ broker = Broker } = State)-> %% connection good, ask for metadata gen_fsm:send_event(self(), {metadata, req, Socket}); _ -> - gen_fsm:start_timer(5000, <<"reconnect">>) + gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>) end, fsm_next_state(ready, State); ready({metadata, req, Socket}, State) -> @@ -188,9 +188,8 @@ ready({metadata, req, Socket}, State) -> ready({metadata, resp, _Metadata} = Event, State)-> Next = ekaf_server_lib:handle_metadata_during_bootstrapping(Event, State), fsm_next_state(ready, Next#ekaf_server{ ongoing_metadata = false }); -ready({timeout, Timer, <<"reconnect">> = TimeoutKey}, State)-> +ready({timeout, Timer, <<"reconnect">> }, State)-> gen_fsm:cancel_timer(Timer), - gen_fsm:start_timer(5000, TimeoutKey), gen_fsm:send_event(self(), connect), fsm_next_state(ready, State); ready({timeout, Timer, <<"refresh">> = TimeoutKey}, #ekaf_server{ @@ -239,13 +238,12 @@ ready(Msg, State) -> %%-------------------------------------------------------------------- ready(info, _From, State)-> Reply = State, - {reply, Reply, State}; + {reply, Reply, ready, State}; ready({produce_sync, Messages}, _From, State)-> ekaf_server_lib:save_messages(ready, State, Messages); -ready(prepare, From, #ekaf_server{ kv = KV } = State)-> - %got prepare during downtime - %let reply_to_prepares handle these when its ready - fsm_next_state(ready, State#ekaf_server{ kv = dict:append(prepare, From, KV)}); +ready(prepare, From, #ekaf_server{ worker = Worker } = State)-> + gen_fsm:reply(From, {ok, Worker}), + fsm_next_state(ready, State); ready(metadata, From, #ekaf_server{ kv = KV} = State)-> % got metadata during ready % this means that regular workers arent ready yet, @@ -268,19 +266,11 @@ ready(Msg, _From, State)-> downtime(info, _From, State)-> Reply = State, - {reply, Reply, State}; -downtime(prepare, From, #ekaf_server{ kv = KV, socket = Socket } = State)-> - %got prepare during downtime - case Socket of - undefined -> - %% downtime since server down - gen_fsm:reply(From, {ok,self()}), - fsm_next_state(downtime, State); - _ -> - %% downtime since topic is still being queries - %% let reply_to_prepares handle these when its ready - fsm_next_state(downtime, State#ekaf_server{ kv = dict:append(prepare, From, KV)}) - end; + {reply, Reply, downtime, State}; +downtime(prepare, From, State)-> + %% downtime since server down + gen_fsm:reply(From, {ok,self()}), + fsm_next_state(downtime, State); downtime({produce_sync, Messages}, From, State)-> gen_fsm:reply(From, {error, downtime}), ekaf_server_lib:save_messages(downtime, State, Messages); @@ -409,6 +399,12 @@ handle_info({add, queue, Messages}, StateName, #ekaf_server{ messages = OfflineM %% when a partition worker dies, its queue gets added to the downtime queue %% the downtime queue is flushed when it the connection is good to go again fsm_next_state(StateName, State#ekaf_server{ messages = lists:append( OfflineMessages, Messages) }); + +handle_info(purge_messages, StateName, #ekaf_server{ messages = OfflineMessages } = State)-> + ?INFO_MSG("Purge ~p messages for topic in process ~p~n", + [length(OfflineMessages), self()]), + fsm_next_state(StateName, State#ekaf_server{ messages = [] }); + handle_info(_Info, StateName, State) -> ?INFO_MSG("dont know how to handle ~p during ~p",[_Info, StateName]), fsm_next_state(StateName, State). diff --git a/src/ekaf_server_lib.erl b/src/ekaf_server_lib.erl index ee9e906..207de50 100644 --- a/src/ekaf_server_lib.erl +++ b/src/ekaf_server_lib.erl @@ -83,7 +83,7 @@ handle_pick(Pick, _From, State) -> {Error, State}. reconnect_attempt()-> - gen_fsm:start_timer(1000,<<"reconnect">>). + gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>). save_messages(StateName, #ekaf_server{ messages = OfflineMessages, worker = Worker } = State, Messages)-> case StateName of diff --git a/src/ekaf_socket.erl b/src/ekaf_socket.erl index 816e601..dde4bf6 100644 --- a/src/ekaf_socket.erl +++ b/src/ekaf_socket.erl @@ -25,8 +25,9 @@ open({Host,Port}) when is_binary(Host)-> open({ ekaf_utils:btoa(Host),Port}); open({Host,Port})-> - gen_tcp:connect(Host, Port, [binary,{packet, 4} - ,{sndbuf, 10000000}]). + gen_tcp:connect(Host, Port, + [binary,{packet, 4},{sndbuf, 10000000}], + ?EKAF_CONNECT_TIMEOUT). close(undefined)-> ok;