Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 99 additions & 82 deletions src/ered_command.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
%% async. It could be handled by wrapping the callback fun in another
%% fun that unpacks it. Not sure what is worse..
-type redis_command() ::
{redis_command, single, binary()} |
{redis_command, pipeline, [binary()]}.
{redis_command, {single, response_class()}, binary()} |
{redis_command, {pipeline, [response_class()]}, binary()}.


-type raw_command() :: [binary()].
Expand Down Expand Up @@ -58,24 +58,28 @@ convert_to([]) ->
error({badarg, []});

convert_to(RawCommands = [E|_]) when is_list(E) ->
Commands = [command_to_bin(RawCommand) || RawCommand <- RawCommands],
{redis_command, pipeline, Commands};
{Bin, Classes} =
lists:foldl(fun(RawCommand, {BinAcc, ClassAcc}) ->
command_to_bin(RawCommand, BinAcc, ClassAcc)
end, {<<>>, []}, RawCommands),
{redis_command, {pipeline, lists:reverse(Classes)}, Bin};

convert_to(RawCommand) ->
Command = command_to_bin(RawCommand),
{redis_command, single, Command}.

command_to_bin(RawCommand) ->
Len = integer_to_list(length(RawCommand)),
Elements = [[$$, integer_to_list(size(Bin)), $\r, $\n, Bin, $\r, $\n] || Bin <- RawCommand],
%% Maybe this could be kept as an iolist?
%% TODO profile this.
%% Since this is copied around a bit between processes it might be cheaper to keep it as a binary
%% since then it will be heap allocated if big. Just pure speculation..
iolist_to_binary([$*, Len, $\r, $\n, Elements]).
{Command, [Class]} = command_to_bin(RawCommand, <<>>, []),
{redis_command, {single, Class}, Command}.

command_to_bin(RawCommand, BinAcc, ClassAcc) ->
Len = integer_to_binary(length(RawCommand)),
Header = <<BinAcc/binary, "*", Len/binary, "\r\n">>,
Bin = lists:foldl(fun(Arg, Acc) ->
Size = integer_to_binary(byte_size(Arg)),
<<Acc/binary, $$, Size/binary, "\r\n", Arg/binary, "\r\n">>
end, Header, RawCommand),
Class = resp_class(RawCommand),
{Bin, [Class | ClassAcc]}.

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec get_data(redis_command()) -> binary() | [binary()].
-spec get_data(redis_command()) -> binary().
%%
%% Returns the command binary data to send to the socket.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Expand All @@ -90,64 +94,47 @@ get_data({redis_command, _, Data}) ->
%% mapping responses to the commands. Special handling is needed for
%% some pubsub commands.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
get_response_class({redis_command, single, Data}) ->
resp_class(Data);
get_response_class({redis_command, pipeline, Data}) ->
lists:map(fun resp_class/1, Data).
get_response_class({redis_command, {single, Class}, _}) -> Class;
get_response_class({redis_command, {pipeline, Classes}, _}) ->
Classes.

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec resp_class(binary()) -> response_class().
-spec resp_class(raw_command()) -> response_class().
%%
%% Given a RESP-formatted command, returns a classification which can
%% be used to interpret the response(s) from Redis, particularly for
%% pubsub commands that don't return anything but expect certain push
%% messages to indicate success.
%% Given a raw command (list of binaries), returns a classification
%% which can be used to interpret the response(s) from Redis,
%% particularly for pubsub commands that don't return anything but
%% expect certain push messages to indicate success.
%%
%% If the command name ends in "subscribe", returns a tuple
%% {CommandName, NumChannels}. Returns 'normal' otherwise.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
resp_class(<<"*", _, "\r\n$", X, "\r\n", _/binary>>)
when X =/= $9 -> % Shorter than "subscribe"
normal; % Quick path for most commands.
resp_class(<<"*", _, "\r\n$", X, Y, "\r\n", _/binary>>)
when X > $1; Y > $2 -> % Longer than "punsubscribe"
normal; % Quick path.
resp_class(<<"*", N, "\r\n$9\r\n", _/binary>> = Subj) ->
resp_class_helper(Subj, 8, 9, N - $0);
resp_class(<<"*", N, "\r\n$1", X, "\r\n", _/binary>> = Subj)
when X >= $0, X =< $2, N > 1 ->
resp_class_helper(Subj, 9, 10 + X - $0, N - $0);
resp_class(<<"*", M, N, "\r\n$9\r\n", _/binary>> = Subj) ->
resp_class_helper(Subj, 9, 9, (M - $0) * 10 + N - $0);
resp_class(<<"*", M, N, "\r\n$1", X, "\r\n", _/binary>> = Subj)
when X >= $0, X =< $2->
resp_class_helper(Subj, 10, 10 + X - $0, (M - $0) * 10 + N - $0);
resp_class(<<"*", L, M, N, "\r\n$9\r\n", _/binary>> = Subj) ->
resp_class_helper(Subj, 10, 9, (L - $0) * 100 + (M - $0) * 10 + N - $0);
resp_class(<<"*", L, M, N, "\r\n$1", X, "\r\n", _/binary>> = Subj)
when X >= $0, X =< $2->
resp_class_helper(Subj, 11, 10 + X - $0, (L - $0) * 100 + (M - $0) * 10 + N - $0);
resp_class(_) ->
normal.

%% Returns response class when we know that the command starts at Offset within
%% Subject and is of Length 9-12 chars.
resp_class_helper(Subject, Offset, Length, Argc) ->
case binary:at(Subject, Offset + Length - 2) of
B when B =:= $b; B =:= $B ->
%% The B in SUBSCRIBE is at the right position (the penultimate
%% letter). This check eliminates all regular commands of length
%% 9-12 except the ones that end with "subscribe". Now do the slow
%% case-insensitive check to be sure.
case string:lowercase(binary:part(Subject, Offset, Length)) of
<<_:(Length - 9)/binary, "subscribe">> = LowercaseCmd ->
{LowercaseCmd, Argc - 1};
resp_class([CmdName | Args]) ->
Len = byte_size(CmdName),
if Len < 9; Len > 12 ->
%% Quick path for most commands.
%% Shorter than "subscribe" or longer than "punsubscribe".
normal;
true ->
case binary:at(CmdName, Len - 2) of
B when B =:= $b; B =:= $B ->
%% The B in SUBSCRIBE is at the right position (the
%% penultimate letter). This check eliminates all regular
%% commands of length 9-12 except the ones that end with
%% "subscribe". Now do the slow case-insensitive check to
%% be sure.
case string:lowercase(CmdName) of
<<_:(Len - 9)/binary, "subscribe">> = LowercaseCmd ->
{LowercaseCmd, length(Args)};
_ ->
normal
end;
_ ->
normal
end;
_ ->
normal
end.
end
end;
resp_class(_) ->
normal.

%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-spec check_result(ok_result() | any()) ->
Expand Down Expand Up @@ -195,23 +182,54 @@ parse_host_and_port(Bin) ->
%% Add ASKING for commands that got an ASK error. The Redis result is needed
%% to filter out what commands to keep.
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
add_asking(_, {redis_command, single, Command}) ->
{redis_command, pipeline, [[<<"ASKING\r\n">>], Command]};

add_asking({ok, OldReplies}, {redis_command, pipeline, Commands}) ->
%% Extract only the commands with ASK redirection. Even if all commands go to
%% the same slot it does not have to be the same key. When using hash tags
%% for instance.
AskCommands = add_asking_pipeline(OldReplies, Commands),
%% ASK commands + ASKING
{redis_command, pipeline, AskCommands}.

add_asking_pipeline([], _) ->
add_asking(_, {redis_command, {single, Class}, Command}) ->
AskingBin = <<"ASKING\r\n">>,
Bin = <<AskingBin/binary, Command/binary>>,
{redis_command, {pipeline, [normal, Class]}, Bin};

add_asking({ok, OldReplies}, {redis_command, {pipeline, Classes}, PipelineBin}) ->
%% Split the concatenated binary back into individual commands,
%% then pick only the ones with ASK redirection.
Commands = split_pipeline_bin(PipelineBin),
{AskCommands, AskClasses} = add_asking_pipeline(OldReplies, Commands, Classes),
NewBin = lists:foldl(fun(C, Acc) -> <<Acc/binary, C/binary>> end, <<>>, AskCommands),
{redis_command, {pipeline, AskClasses}, NewBin}.

add_asking_pipeline([], _, _) ->
{[], []};
add_asking_pipeline([{error, <<"ASK ", _/binary>>} | Replies], [Command | Commands], [Class | Classes]) ->
AskingBin = <<"ASKING\r\n">>,
{RestCmds, RestClasses} = add_asking_pipeline(Replies, Commands, Classes),
{[AskingBin, Command | RestCmds], [normal, Class | RestClasses]};
add_asking_pipeline([_ | Replies], [_ | Commands], [_ | Classes]) ->
add_asking_pipeline(Replies, Commands, Classes).

%% Split a concatenated pipeline binary into individual RESP command binaries.
split_pipeline_bin(<<>>) ->
[];
add_asking_pipeline([{error, <<"ASK ", _/binary>>} |Replies], [Command |Commands]) ->
[[<<"ASKING\r\n">>], Command | add_asking_pipeline(Replies, Commands)];
add_asking_pipeline([_ |Replies], [_ |Commands]) ->
add_asking_pipeline(Replies, Commands).
split_pipeline_bin(Bin) ->
{Cmd, Rest} = split_one_command(Bin),
[Cmd | split_pipeline_bin(Rest)].

split_one_command(<<"*", Rest/binary>>) ->
{ArgcStr, Rest1} = read_until_crlf(Rest),
Argc = binary_to_integer(ArgcStr),
split_args(Argc, Rest1, <<"*", ArgcStr/binary, "\r\n">>).

split_args(0, Rest, Acc) ->
{Acc, Rest};
split_args(N, <<"$", Rest/binary>>, Acc) ->
{LenStr, Rest1} = read_until_crlf(Rest),
Len = binary_to_integer(LenStr),
<<ArgData:Len/binary, "\r\n", Rest2/binary>> = Rest1,
split_args(N - 1, Rest2, <<Acc/binary, $$, LenStr/binary, "\r\n", ArgData/binary, "\r\n">>).

read_until_crlf(Bin) ->
case binary:match(Bin, <<"\r\n">>) of
{Pos, 2} ->
<<Str:Pos/binary, "\r\n", Rest/binary>> = Bin,
{Str, Rest}
end.


%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Expand All @@ -233,4 +251,3 @@ merge_ask_result([{error, <<"ASK ", _/binary>>} | Replies1], [_AskOk, Reply | Re
[Reply | merge_ask_result(Replies1, Replies2)];
merge_ask_result([Reply | Replies1], Replies2) ->
[Reply | merge_ask_result(Replies1, Replies2)].

2 changes: 1 addition & 1 deletion test/ered_connection_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ trailing_reply_test() ->
?debugFmt("~w", [Conn1]),
ered_connection:command_async(Conn1, [<<"ping">>], ping1),
receive sent_big_nasty -> ok end,
MalformedCommand = {redis_command, pipeline, [undefined]},
MalformedCommand = {redis_command, {pipeline, [normal]}, undefined},
ered_connection:command_async(Conn1, MalformedCommand, no_ref),

%% make sure the ping is received before the connection is shut down
Expand Down
Loading