Skip to content
1 change: 1 addition & 0 deletions include/ekaf_definitions.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-define(EKAF_DEFAULT_PARTITION_STRATEGY , random).
-define(EKAF_DEFAULT_PULL_FOR_CHANGES_TIMEOUT , 60000).
-define(EKAF_SYNC_TIMEOUT , 5000).
-define(EKAF_CONNECT_TIMEOUT , 5000).

%%======================================================================
%% ekaf specific constants
Expand Down
2 changes: 1 addition & 1 deletion src/ekaf_demo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ tuple_to_statsd_key(X) when is_binary(X) ->
%% Public helper functions
%%====================================================================
get_workers(Topic)->
pg2:get_local_members(Topic).
pg2l:get_local_members(Topic).

get_dead_workers(Topic) when is_binary(Topic)->
Workers = get_workers(Topic),
Expand Down
4 changes: 2 additions & 2 deletions src/ekaf_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ ready({produce_async, _Messages} = Async, PrevState)->
ready({produce_async_batched, _Messages}= Async, PrevState)->
ekaf_lib:handle_async_as_batch(true, Async, PrevState);
ready(ping, #ekaf_fsm{ topic = Topic } = State)->
pg2:join(Topic,self()),
pg2l:join(Topic,self()),
gproc:send({n,l,Topic}, {worker, up, self(), ready, State, undefined}),
fsm_next_state(ready,State);
ready({timeout, Timer, <<"refresh">>}, #ekaf_fsm{ buffer = Buffer, max_buffer_size = MaxBufferSize, buffer_ttl = BufferTTL, cor_id = PrevCorId, last_known_size = LastKnownSize} = PrevState)->
Expand Down Expand Up @@ -293,7 +293,7 @@ terminate(Reason, StateName, #ekaf_fsm{ id = WorkerId, socket = Socket, topic =
io:format("~n ~p stopping since ~p when buffer had ~p items",[Self, Reason, Buffer]),
gproc:send({n,l,Topic}, {add, queue, Buffer})
end,
pg2:leave(Topic,self()),
pg2l:leave(Topic,self()),
ok.
%%--------------------------------------------------------------------
%% Func: code_change/4
Expand Down
13 changes: 7 additions & 6 deletions src/ekaf_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
flush_messages_callback/1, flushed_messages_replied_callback/2
]).

prepare(Topic)->
ekaf_sup:start_child(ekaf_sup,
{Topic, {ekaf_server, start_link, [[Topic]]},
transient, infinity, worker, []}
),
prepare(Topic)->
Pid = (catch gproc:where({n,l,Topic})),
case Pid of
undefined ->
ekaf_sup:start_child(ekaf_sup,
{Topic, {ekaf_server, start_link, [[Topic]]},
transient, infinity, worker, []}
);
SomePid when is_pid(SomePid)->
gen_fsm:sync_send_event(Pid, prepare);
_E ->
Expand All @@ -54,7 +55,7 @@ prepare(Topic, Callback)->
{ok, Pid} ->
Callback(Pid);
_ ->
ok
{error, not_prepared}
end.

%% Pick a worker, and pass it into the callback
Expand Down
6 changes: 3 additions & 3 deletions src/ekaf_picker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pick_sync(_Topic, _Callback, ketama, _Attempt)->
error;
%% if strategy is sticky_round_robin or strict_round_robin or random
pick_sync(Topic, Callback, _Strategy, _Attempt)->
case pg2:get_closest_pid(Topic) of
case pg2l:get_closest_pid(Topic) of
PoolPid when is_pid(PoolPid) ->
handle_callback(Callback,PoolPid);
{error, {no_process,_}}->
Expand All @@ -88,10 +88,10 @@ handle_callback(Callback, Pid)->
end.

join_group_if_not_present(PG, Pid)->
Pids = pg2:get_members(PG),
Pids = pg2l:get_members(PG),
case lists:member(Pid, Pids) of
true ->
ok;
_ ->
pg2:join(PG, Pid)
pg2l:join(PG, Pid)
end.
48 changes: 22 additions & 26 deletions src/ekaf_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init([Topic])->
Self = self(),
State = generic_init(Topic),
gproc:reg({n,l,Topic},[]),
pg2:create(Topic),
pg2l:create(Topic),
ekaf_picker:join_group_if_not_present(Topic, self()),
gen_fsm:send_event(self(), connect),
{ok, downtime, State#ekaf_server{topic = Topic, worker = Self}};
Expand Down Expand Up @@ -178,7 +178,7 @@ ready(connect, #ekaf_server{ broker = Broker } = State)->
%% connection good, ask for metadata
gen_fsm:send_event(self(), {metadata, req, Socket});
_ ->
gen_fsm:start_timer(5000, <<"reconnect">>)
gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>)
end,
fsm_next_state(ready, State);
ready({metadata, req, Socket}, State) ->
Expand All @@ -188,9 +188,8 @@ ready({metadata, req, Socket}, State) ->
ready({metadata, resp, _Metadata} = Event, State)->
Next = ekaf_server_lib:handle_metadata_during_bootstrapping(Event, State),
fsm_next_state(ready, Next#ekaf_server{ ongoing_metadata = false });
ready({timeout, Timer, <<"reconnect">> = TimeoutKey}, State)->
ready({timeout, Timer, <<"reconnect">> }, State)->
gen_fsm:cancel_timer(Timer),
gen_fsm:start_timer(5000, TimeoutKey),
gen_fsm:send_event(self(), connect),
fsm_next_state(ready, State);
ready({timeout, Timer, <<"refresh">> = TimeoutKey}, #ekaf_server{
Expand All @@ -213,7 +212,7 @@ ready({timeout, Timer, <<"refresh">> = TimeoutKey}, #ekaf_server{
{error,_}->
State#ekaf_server{ ctr = 0 };
{NextWorker, NextState} when Strategy =:= strict_round_robin->
Members = pg2:get_members(Topic),
Members = pg2l:get_members(Topic),
NextWorkers = case Workers of [] -> Members; _ -> case State#ekaf_server.workers -- Members of [] -> Workers; _ -> Members end end,
NextState#ekaf_server{ ctr = 0, worker = NextWorker, workers = NextWorkers};
{NextWorker, NextState} ->
Expand All @@ -239,13 +238,12 @@ ready(Msg, State) ->
%%--------------------------------------------------------------------
ready(info, _From, State)->
Reply = State,
{reply, Reply, State};
{reply, Reply, ready, State};
ready({produce_sync, Messages}, _From, State)->
ekaf_server_lib:save_messages(ready, State, Messages);
ready(prepare, From, #ekaf_server{ kv = KV } = State)->
%got prepare during downtime
%let reply_to_prepares handle these when its ready
fsm_next_state(ready, State#ekaf_server{ kv = dict:append(prepare, From, KV)});
ready(prepare, From, #ekaf_server{ worker = Worker } = State)->
gen_fsm:reply(From, {ok, Worker}),
fsm_next_state(ready, State);
ready(metadata, From, #ekaf_server{ kv = KV} = State)->
% got metadata during ready
% this means that regular workers arent ready yet,
Expand All @@ -268,19 +266,11 @@ ready(Msg, _From, State)->

downtime(info, _From, State)->
Reply = State,
{reply, Reply, State};
downtime(prepare, From, #ekaf_server{ kv = KV, socket = Socket } = State)->
%got prepare during downtime
case Socket of
undefined ->
%% downtime since server down
gen_fsm:reply(From, {ok,self()}),
fsm_next_state(downtime, State);
_ ->
%% downtime since topic is still being queries
%% let reply_to_prepares handle these when its ready
fsm_next_state(downtime, State#ekaf_server{ kv = dict:append(prepare, From, KV)})
end;
{reply, Reply, downtime, State};
downtime(prepare, From, State)->
%% downtime since server down
gen_fsm:reply(From, {ok,self()}),
fsm_next_state(downtime, State);
downtime({produce_sync, Messages}, From, State)->
gen_fsm:reply(From, {error, downtime}),
ekaf_server_lib:save_messages(downtime, State, Messages);
Expand Down Expand Up @@ -374,7 +364,7 @@ handle_info({worker, down, WorkerDown, WorkerId, WorkerDownStateName, WorkerDown
fsm_next_state(ready, State#ekaf_server{ ongoing_metadata = true, workers = NextWorkers, worker = NextWorker, time = os:timestamp() } )
end;
handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateName, #ekaf_server { topic = Topic, messages = OfflineMessages } = State) ->
pg2:leave(Topic, self()),
pg2l:leave(Topic, self()),
case StateName of
ready ->
ekaf_server_lib:send_messages(StateName, State, lists:reverse(OfflineMessages));
Expand All @@ -389,7 +379,7 @@ handle_info({worker, up, WorkerUp, WorkerUpStateName, WorkerUpState, _}, StateNa
ok
end,
Next = ekaf_server_lib:reply_to_prepares(WorkerUp, State),
fsm_next_state(StateName, Next#ekaf_server{ worker = WorkerUp, messages = [], workers = pg2:get_members(Topic)});
fsm_next_state(StateName, Next#ekaf_server{ worker = WorkerUp, messages = [], workers = pg2l:get_members(Topic)});
handle_info({set, strategy, Value}, ready, State)->
Next = State#ekaf_server{ strategy = Value },
fsm_next_state(ready, Next);
Expand All @@ -409,6 +399,12 @@ handle_info({add, queue, Messages}, StateName, #ekaf_server{ messages = OfflineM
%% when a partition worker dies, its queue gets added to the downtime queue
%% the downtime queue is flushed when it the connection is good to go again
fsm_next_state(StateName, State#ekaf_server{ messages = lists:append( OfflineMessages, Messages) });

handle_info(purge_messages, StateName, #ekaf_server{ messages = OfflineMessages } = State)->
?INFO_MSG("Purge ~p messages for topic in process ~p~n",
[length(OfflineMessages), self()]),
fsm_next_state(StateName, State#ekaf_server{ messages = [] });

handle_info(_Info, StateName, State) ->
?INFO_MSG("dont know how to handle ~p during ~p",[_Info, StateName]),
fsm_next_state(StateName, State).
Expand All @@ -419,7 +415,7 @@ handle_info(_Info, StateName, State) ->
%% Returns: any
%%--------------------------------------------------------------------
terminate(_Reason, _StateName, #ekaf_server{ topic = Topic }) ->
pg2:delete(Topic),
pg2l:delete(Topic),
ok.

%%--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/ekaf_server_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ handle_pick(Pick, _From, State) ->
{Error, State}.

reconnect_attempt()->
gen_fsm:start_timer(1000,<<"reconnect">>).
gen_fsm:start_timer(?EKAF_CONNECT_TIMEOUT, <<"reconnect">>).

save_messages(StateName, #ekaf_server{ messages = OfflineMessages, worker = Worker } = State, Messages)->
case StateName of
Expand Down
5 changes: 3 additions & 2 deletions src/ekaf_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ open({Host,Port}) when is_binary(Host)->
open({ ekaf_utils:btoa(Host),Port});

open({Host,Port})->
gen_tcp:connect(Host, Port, [binary,{packet, 4}
,{sndbuf, 10000000}]).
gen_tcp:connect(Host, Port,
[binary,{packet, 4},{sndbuf, 10000000}],
?EKAF_CONNECT_TIMEOUT).

close(undefined)->
ok;
Expand Down
Loading