From 7be453eb7b48c50b1ad30065f525a082bee7a839 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Apr 2015 14:18:16 +0800 Subject: [PATCH 1/7] prepare will run in an idempotent mode --- src/ekaf_lib.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/ekaf_lib.erl b/src/ekaf_lib.erl index 9b7626a..9f327f7 100644 --- a/src/ekaf_lib.erl +++ b/src/ekaf_lib.erl @@ -29,13 +29,14 @@ flush_messages_callback/1, flushed_messages_replied_callback/2 ]). -prepare(Topic)-> - ekaf_sup:start_child(ekaf_sup, - {Topic, {ekaf_server, start_link, [[Topic]]}, - transient, infinity, worker, []} - ), +prepare(Topic)-> Pid = (catch gproc:where({n,l,Topic})), case Pid of + undefined -> + ekaf_sup:start_child(ekaf_sup, + {Topic, {ekaf_server, start_link, [[Topic]]}, + transient, infinity, worker, []} + ); SomePid when is_pid(SomePid)-> gen_fsm:sync_send_event(Pid, prepare); _E -> From bac46d5855c793cabad842ad7d9a436193415a89 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Apr 2015 19:34:56 +0800 Subject: [PATCH 2/7] will reply never but later since no worker-up message after server gets ready --- src/ekaf_lib.erl | 2 +- src/ekaf_server.erl | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/ekaf_lib.erl b/src/ekaf_lib.erl index 9f327f7..6a0908e 100644 --- a/src/ekaf_lib.erl +++ b/src/ekaf_lib.erl @@ -55,7 +55,7 @@ prepare(Topic, Callback)-> {ok, Pid} -> Callback(Pid); _ -> - ok + {error, not_prepared} end. %% Pick a worker, and pass it into the callback diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index 7ff3a39..d0869fd 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -242,10 +242,9 @@ ready(info, _From, State)-> {reply, Reply, State}; ready({produce_sync, Messages}, _From, State)-> ekaf_server_lib:save_messages(ready, State, Messages); -ready(prepare, From, #ekaf_server{ kv = KV } = State)-> - %got prepare during downtime - %let reply_to_prepares handle these when its ready - fsm_next_state(ready, State#ekaf_server{ kv = dict:append(prepare, From, KV)}); +ready(prepare, From, #ekaf_server{ worker = Worker } = State)-> + gen_fsm:reply(From, {ok, Worker}), + fsm_next_state(ready, State); ready(metadata, From, #ekaf_server{ kv = KV} = State)-> % got metadata during ready % this means that regular workers arent ready yet, From 02fa0ab79475880b4fb186552db9a6fdeb8b8880 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Apr 2015 19:36:23 +0800 Subject: [PATCH 3/7] no need to reply to prepares during downtime --- src/ekaf_server.erl | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index d0869fd..b6a5db0 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -268,18 +268,10 @@ ready(Msg, _From, State)-> downtime(info, _From, State)-> Reply = State, {reply, Reply, State}; -downtime(prepare, From, #ekaf_server{ kv = KV, socket = Socket } = State)-> - %got prepare during downtime - case Socket of - undefined -> - %% downtime since server down - gen_fsm:reply(From, {ok,self()}), - fsm_next_state(downtime, State); - _ -> - %% downtime since topic is still being queries - %% let reply_to_prepares handle these when its ready - fsm_next_state(downtime, State#ekaf_server{ kv = dict:append(prepare, From, KV)}) - end; +downtime(prepare, From, State)-> + %% downtime since server down + gen_fsm:reply(From, {ok,self()}), + fsm_next_state(downtime, State); downtime({produce_sync, Messages}, From, State)-> gen_fsm:reply(From, {error, downtime}), ekaf_server_lib:save_messages(downtime, State, Messages); From 756b7333805cb5e60b7bcbf783b47219dfaa57ed Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Apr 2015 20:21:33 +0800 Subject: [PATCH 4/7] purge messages in case too many messages buffered in memory --- src/ekaf_server.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index b6a5db0..0e4d176 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -400,6 +400,12 @@ handle_info({add, queue, Messages}, StateName, #ekaf_server{ messages = OfflineM %% when a partition worker dies, its queue gets added to the downtime queue %% the downtime queue is flushed when it the connection is good to go again fsm_next_state(StateName, State#ekaf_server{ messages = lists:append( OfflineMessages, Messages) }); + +handle_info(purge_messages, StateName, #ekaf_server{ messages = OfflineMessages } = State)-> + ?INFO_MSG("Purge ~p messages for topic in process ~p~n", + [length(OfflineMessages), self()]), + fsm_next_state(StateName, State#ekaf_server{ messages = [] }); + handle_info(_Info, StateName, State) -> ?INFO_MSG("dont know how to handle ~p during ~p",[_Info, StateName]), fsm_next_state(StateName, State). From 19eda2fbd6089d69a2c2b2d79b9498c1fbe460c2 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Apr 2015 20:35:05 +0800 Subject: [PATCH 5/7] should not start new timer if connect message was sent --- src/ekaf_server.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index 0e4d176..267fa23 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -188,9 +188,8 @@ ready({metadata, req, Socket}, State) -> ready({metadata, resp, _Metadata} = Event, State)-> Next = ekaf_server_lib:handle_metadata_during_bootstrapping(Event, State), fsm_next_state(ready, Next#ekaf_server{ ongoing_metadata = false }); -ready({timeout, Timer, <<"reconnect">> = TimeoutKey}, State)-> +ready({timeout, Timer, <<"reconnect">> }, State)-> gen_fsm:cancel_timer(Timer), - gen_fsm:start_timer(5000, TimeoutKey), gen_fsm:send_event(self(), connect), fsm_next_state(ready, State); ready({timeout, Timer, <<"refresh">> = TimeoutKey}, #ekaf_server{ From 2b2082e0c4a9464cdea39772a18db9f1bb15bdfc Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Apr 2015 20:42:15 +0800 Subject: [PATCH 6/7] add connect timeout to avoid reconnect action overflow --- include/ekaf_definitions.hrl | 1 + src/ekaf_server.erl | 2 +- src/ekaf_server_lib.erl | 2 +- src/ekaf_socket.erl | 5 +++-- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/include/ekaf_definitions.hrl b/include/ekaf_definitions.hrl index b07c224..b99bc6f 100644 --- a/include/ekaf_definitions.hrl +++ b/include/ekaf_definitions.hrl @@ -9,6 +9,7 @@ -define(EKAF_DEFAULT_PARTITION_STRATEGY , random). -define(EKAF_DEFAULT_PULL_FOR_CHANGES_TIMEOUT , 60000). -define(EKAF_SYNC_TIMEOUT , 5000). +-define(EKAF_CONNECT_TIMEOUT , 5000). %%====================================================================== %% ekaf specific constants diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index 267fa23..fa6239e 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -178,7 +178,7 @@ ready(connect, #ekaf_server{ broker = Broker } = State)-> %% connection good, ask for metadata gen_fsm:send_event(self(), {metadata, req, Socket}); _ -> - gen_fsm:start_timer(5000, <<"reconnect">>) + gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>) end, fsm_next_state(ready, State); ready({metadata, req, Socket}, State) -> diff --git a/src/ekaf_server_lib.erl b/src/ekaf_server_lib.erl index ee9e906..207de50 100644 --- a/src/ekaf_server_lib.erl +++ b/src/ekaf_server_lib.erl @@ -83,7 +83,7 @@ handle_pick(Pick, _From, State) -> {Error, State}. reconnect_attempt()-> - gen_fsm:start_timer(1000,<<"reconnect">>). + gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>). save_messages(StateName, #ekaf_server{ messages = OfflineMessages, worker = Worker } = State, Messages)-> case StateName of diff --git a/src/ekaf_socket.erl b/src/ekaf_socket.erl index 816e601..dde4bf6 100644 --- a/src/ekaf_socket.erl +++ b/src/ekaf_socket.erl @@ -25,8 +25,9 @@ open({Host,Port}) when is_binary(Host)-> open({ ekaf_utils:btoa(Host),Port}); open({Host,Port})-> - gen_tcp:connect(Host, Port, [binary,{packet, 4} - ,{sndbuf, 10000000}]). + gen_tcp:connect(Host, Port, + [binary,{packet, 4},{sndbuf, 10000000}], + ?EKAF_CONNECT_TIMEOUT). close(undefined)-> ok; From 8ec0619f1ee82fdf3878faac18209162aaef9c86 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Apr 2015 21:00:00 +0800 Subject: [PATCH 7/7] fix bug on return type of info --- src/ekaf_server.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ekaf_server.erl b/src/ekaf_server.erl index fa6239e..7d98967 100644 --- a/src/ekaf_server.erl +++ b/src/ekaf_server.erl @@ -238,7 +238,7 @@ ready(Msg, State) -> %%-------------------------------------------------------------------- ready(info, _From, State)-> Reply = State, - {reply, Reply, State}; + {reply, Reply, ready, State}; ready({produce_sync, Messages}, _From, State)-> ekaf_server_lib:save_messages(ready, State, Messages); ready(prepare, From, #ekaf_server{ worker = Worker } = State)-> @@ -266,7 +266,7 @@ ready(Msg, _From, State)-> downtime(info, _From, State)-> Reply = State, - {reply, Reply, State}; + {reply, Reply, downtime, State}; downtime(prepare, From, State)-> %% downtime since server down gen_fsm:reply(From, {ok,self()}),