diff --git a/src/ered_command.erl b/src/ered_command.erl index 0f21df6..54e6446 100644 --- a/src/ered_command.erl +++ b/src/ered_command.erl @@ -26,8 +26,8 @@ %% async. It could be handled by wrapping the callback fun in another %% fun that unpacks it. Not sure what is worse.. -type redis_command() :: - {redis_command, single, binary()} | - {redis_command, pipeline, [binary()]}. + {redis_command, {single, response_class()}, binary()} | + {redis_command, {pipeline, [response_class()]}, binary()}. -type raw_command() :: [binary()]. @@ -58,24 +58,28 @@ convert_to([]) -> error({badarg, []}); convert_to(RawCommands = [E|_]) when is_list(E) -> - Commands = [command_to_bin(RawCommand) || RawCommand <- RawCommands], - {redis_command, pipeline, Commands}; + {Bin, Classes} = + lists:foldl(fun(RawCommand, {BinAcc, ClassAcc}) -> + command_to_bin(RawCommand, BinAcc, ClassAcc) + end, {<<>>, []}, RawCommands), + {redis_command, {pipeline, lists:reverse(Classes)}, Bin}; convert_to(RawCommand) -> - Command = command_to_bin(RawCommand), - {redis_command, single, Command}. - -command_to_bin(RawCommand) -> - Len = integer_to_list(length(RawCommand)), - Elements = [[$$, integer_to_list(size(Bin)), $\r, $\n, Bin, $\r, $\n] || Bin <- RawCommand], - %% Maybe this could be kept as an iolist? - %% TODO profile this. - %% Since this is copied around a bit between processes it might be cheaper to keep it as a binary - %% since then it will be heap allocated if big. Just pure speculation.. - iolist_to_binary([$*, Len, $\r, $\n, Elements]). + {Command, [Class]} = command_to_bin(RawCommand, <<>>, []), + {redis_command, {single, Class}, Command}. + +command_to_bin(RawCommand, BinAcc, ClassAcc) -> + Len = integer_to_binary(length(RawCommand)), + Header = <>, + Bin = lists:foldl(fun(Arg, Acc) -> + Size = integer_to_binary(byte_size(Arg)), + <> + end, Header, RawCommand), + Class = resp_class(RawCommand), + {Bin, [Class | ClassAcc]}. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec get_data(redis_command()) -> binary() | [binary()]. +-spec get_data(redis_command()) -> binary(). %% %% Returns the command binary data to send to the socket. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -90,64 +94,47 @@ get_data({redis_command, _, Data}) -> %% mapping responses to the commands. Special handling is needed for %% some pubsub commands. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -get_response_class({redis_command, single, Data}) -> - resp_class(Data); -get_response_class({redis_command, pipeline, Data}) -> - lists:map(fun resp_class/1, Data). +get_response_class({redis_command, {single, Class}, _}) -> Class; +get_response_class({redis_command, {pipeline, Classes}, _}) -> + Classes. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec resp_class(binary()) -> response_class(). +-spec resp_class(raw_command()) -> response_class(). %% -%% Given a RESP-formatted command, returns a classification which can -%% be used to interpret the response(s) from Redis, particularly for -%% pubsub commands that don't return anything but expect certain push -%% messages to indicate success. +%% Given a raw command (list of binaries), returns a classification +%% which can be used to interpret the response(s) from Redis, +%% particularly for pubsub commands that don't return anything but +%% expect certain push messages to indicate success. %% %% If the command name ends in "subscribe", returns a tuple %% {CommandName, NumChannels}. Returns 'normal' otherwise. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -resp_class(<<"*", _, "\r\n$", X, "\r\n", _/binary>>) - when X =/= $9 -> % Shorter than "subscribe" - normal; % Quick path for most commands. -resp_class(<<"*", _, "\r\n$", X, Y, "\r\n", _/binary>>) - when X > $1; Y > $2 -> % Longer than "punsubscribe" - normal; % Quick path. -resp_class(<<"*", N, "\r\n$9\r\n", _/binary>> = Subj) -> - resp_class_helper(Subj, 8, 9, N - $0); -resp_class(<<"*", N, "\r\n$1", X, "\r\n", _/binary>> = Subj) - when X >= $0, X =< $2, N > 1 -> - resp_class_helper(Subj, 9, 10 + X - $0, N - $0); -resp_class(<<"*", M, N, "\r\n$9\r\n", _/binary>> = Subj) -> - resp_class_helper(Subj, 9, 9, (M - $0) * 10 + N - $0); -resp_class(<<"*", M, N, "\r\n$1", X, "\r\n", _/binary>> = Subj) - when X >= $0, X =< $2-> - resp_class_helper(Subj, 10, 10 + X - $0, (M - $0) * 10 + N - $0); -resp_class(<<"*", L, M, N, "\r\n$9\r\n", _/binary>> = Subj) -> - resp_class_helper(Subj, 10, 9, (L - $0) * 100 + (M - $0) * 10 + N - $0); -resp_class(<<"*", L, M, N, "\r\n$1", X, "\r\n", _/binary>> = Subj) - when X >= $0, X =< $2-> - resp_class_helper(Subj, 11, 10 + X - $0, (L - $0) * 100 + (M - $0) * 10 + N - $0); -resp_class(_) -> - normal. - -%% Returns response class when we know that the command starts at Offset within -%% Subject and is of Length 9-12 chars. -resp_class_helper(Subject, Offset, Length, Argc) -> - case binary:at(Subject, Offset + Length - 2) of - B when B =:= $b; B =:= $B -> - %% The B in SUBSCRIBE is at the right position (the penultimate - %% letter). This check eliminates all regular commands of length - %% 9-12 except the ones that end with "subscribe". Now do the slow - %% case-insensitive check to be sure. - case string:lowercase(binary:part(Subject, Offset, Length)) of - <<_:(Length - 9)/binary, "subscribe">> = LowercaseCmd -> - {LowercaseCmd, Argc - 1}; +resp_class([CmdName | Args]) -> + Len = byte_size(CmdName), + if Len < 9; Len > 12 -> + %% Quick path for most commands. + %% Shorter than "subscribe" or longer than "punsubscribe". + normal; + true -> + case binary:at(CmdName, Len - 2) of + B when B =:= $b; B =:= $B -> + %% The B in SUBSCRIBE is at the right position (the + %% penultimate letter). This check eliminates all regular + %% commands of length 9-12 except the ones that end with + %% "subscribe". Now do the slow case-insensitive check to + %% be sure. + case string:lowercase(CmdName) of + <<_:(Len - 9)/binary, "subscribe">> = LowercaseCmd -> + {LowercaseCmd, length(Args)}; + _ -> + normal + end; _ -> normal - end; - _ -> - normal - end. + end + end; +resp_class(_) -> + normal. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec check_result(ok_result() | any()) -> @@ -195,23 +182,54 @@ parse_host_and_port(Bin) -> %% Add ASKING for commands that got an ASK error. The Redis result is needed %% to filter out what commands to keep. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -add_asking(_, {redis_command, single, Command}) -> - {redis_command, pipeline, [[<<"ASKING\r\n">>], Command]}; - -add_asking({ok, OldReplies}, {redis_command, pipeline, Commands}) -> - %% Extract only the commands with ASK redirection. Even if all commands go to - %% the same slot it does not have to be the same key. When using hash tags - %% for instance. - AskCommands = add_asking_pipeline(OldReplies, Commands), - %% ASK commands + ASKING - {redis_command, pipeline, AskCommands}. - -add_asking_pipeline([], _) -> +add_asking(_, {redis_command, {single, Class}, Command}) -> + AskingBin = <<"ASKING\r\n">>, + Bin = <>, + {redis_command, {pipeline, [normal, Class]}, Bin}; + +add_asking({ok, OldReplies}, {redis_command, {pipeline, Classes}, PipelineBin}) -> + %% Split the concatenated binary back into individual commands, + %% then pick only the ones with ASK redirection. + Commands = split_pipeline_bin(PipelineBin), + {AskCommands, AskClasses} = add_asking_pipeline(OldReplies, Commands, Classes), + NewBin = lists:foldl(fun(C, Acc) -> <> end, <<>>, AskCommands), + {redis_command, {pipeline, AskClasses}, NewBin}. + +add_asking_pipeline([], _, _) -> + {[], []}; +add_asking_pipeline([{error, <<"ASK ", _/binary>>} | Replies], [Command | Commands], [Class | Classes]) -> + AskingBin = <<"ASKING\r\n">>, + {RestCmds, RestClasses} = add_asking_pipeline(Replies, Commands, Classes), + {[AskingBin, Command | RestCmds], [normal, Class | RestClasses]}; +add_asking_pipeline([_ | Replies], [_ | Commands], [_ | Classes]) -> + add_asking_pipeline(Replies, Commands, Classes). + +%% Split a concatenated pipeline binary into individual RESP command binaries. +split_pipeline_bin(<<>>) -> []; -add_asking_pipeline([{error, <<"ASK ", _/binary>>} |Replies], [Command |Commands]) -> - [[<<"ASKING\r\n">>], Command | add_asking_pipeline(Replies, Commands)]; -add_asking_pipeline([_ |Replies], [_ |Commands]) -> - add_asking_pipeline(Replies, Commands). +split_pipeline_bin(Bin) -> + {Cmd, Rest} = split_one_command(Bin), + [Cmd | split_pipeline_bin(Rest)]. + +split_one_command(<<"*", Rest/binary>>) -> + {ArgcStr, Rest1} = read_until_crlf(Rest), + Argc = binary_to_integer(ArgcStr), + split_args(Argc, Rest1, <<"*", ArgcStr/binary, "\r\n">>). + +split_args(0, Rest, Acc) -> + {Acc, Rest}; +split_args(N, <<"$", Rest/binary>>, Acc) -> + {LenStr, Rest1} = read_until_crlf(Rest), + Len = binary_to_integer(LenStr), + <> = Rest1, + split_args(N - 1, Rest2, <>). + +read_until_crlf(Bin) -> + case binary:match(Bin, <<"\r\n">>) of + {Pos, 2} -> + <> = Bin, + {Str, Rest} + end. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -233,4 +251,3 @@ merge_ask_result([{error, <<"ASK ", _/binary>>} | Replies1], [_AskOk, Reply | Re [Reply | merge_ask_result(Replies1, Replies2)]; merge_ask_result([Reply | Replies1], Replies2) -> [Reply | merge_ask_result(Replies1, Replies2)]. - diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl index c9ef7ec..1505c95 100644 --- a/test/ered_connection_tests.erl +++ b/test/ered_connection_tests.erl @@ -58,7 +58,7 @@ trailing_reply_test() -> ?debugFmt("~w", [Conn1]), ered_connection:command_async(Conn1, [<<"ping">>], ping1), receive sent_big_nasty -> ok end, - MalformedCommand = {redis_command, pipeline, [undefined]}, + MalformedCommand = {redis_command, {pipeline, [normal]}, undefined}, ered_connection:command_async(Conn1, MalformedCommand, no_ref), %% make sure the ping is received before the connection is shut down