From aaa63ce42abf8e7ab9ba223ac7f8d96c07e717aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 18 Mar 2026 14:59:13 +0100 Subject: [PATCH 1/2] Use binary append instead of iolist_to_binary in command_to_bin Build the RESP binary by appending directly instead of constructing an iolist and converting it. This avoids the intermediate iolist allocation and is faster for the full data path (cross-process copy + TCP send) which is the common case. Benchmark results (cross-process copy + TCP send, OTP 28): iolist_to_binary iolist bin-append small (37B) 2105 ns/op 1128 888 pipeline (1.1KB) 2492 ns/op 2930 1226 --- src/ered_command.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ered_command.erl b/src/ered_command.erl index 0f21df6..c175cda 100644 --- a/src/ered_command.erl +++ b/src/ered_command.erl @@ -66,13 +66,12 @@ convert_to(RawCommand) -> {redis_command, single, Command}. command_to_bin(RawCommand) -> - Len = integer_to_list(length(RawCommand)), - Elements = [[$$, integer_to_list(size(Bin)), $\r, $\n, Bin, $\r, $\n] || Bin <- RawCommand], - %% Maybe this could be kept as an iolist? - %% TODO profile this. - %% Since this is copied around a bit between processes it might be cheaper to keep it as a binary - %% since then it will be heap allocated if big. Just pure speculation.. - iolist_to_binary([$*, Len, $\r, $\n, Elements]). + Len = integer_to_binary(length(RawCommand)), + Header = <<"*", Len/binary, "\r\n">>, + lists:foldl(fun(Bin, Acc) -> + Size = integer_to_binary(byte_size(Bin)), + <> + end, Header, RawCommand). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec get_data(redis_command()) -> binary() | [binary()]. From d92d9ddc35bb6acaaae8c291a0d90682f1ae49c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 18 Mar 2026 17:59:51 +0100 Subject: [PATCH 2/2] Flatten pipeline to single binary with precomputed response class Change redis_command() representation from a list of per-command binaries to a single concatenated binary. Response classes are precomputed at convert_to time and stored in the tuple, making get_data and get_response_class simple field accesses. New type: {redis_command, {single, response_class()}, binary()} {redis_command, {pipeline, [response_class()]}, binary()} Also simplify resp_class to work on raw commands instead of RESP-formatted binaries, and preserve the original response class in add_asking (needed for sharded pub/sub commands like SSUBSCRIBE that can receive ASK redirects). Benchmark results (convert_to + cross-process copy + TCP send, OTP 28): before after single small 2053 ns/op 2372 pipeline 10 3584 ns/op 2712 large real 3823 ns/op 3050 --- src/ered_command.erl | 176 ++++++++++++++++++--------------- test/ered_connection_tests.erl | 2 +- 2 files changed, 98 insertions(+), 80 deletions(-) diff --git a/src/ered_command.erl b/src/ered_command.erl index c175cda..54e6446 100644 --- a/src/ered_command.erl +++ b/src/ered_command.erl @@ -26,8 +26,8 @@ %% async. It could be handled by wrapping the callback fun in another %% fun that unpacks it. Not sure what is worse.. -type redis_command() :: - {redis_command, single, binary()} | - {redis_command, pipeline, [binary()]}. + {redis_command, {single, response_class()}, binary()} | + {redis_command, {pipeline, [response_class()]}, binary()}. -type raw_command() :: [binary()]. @@ -58,23 +58,28 @@ convert_to([]) -> error({badarg, []}); convert_to(RawCommands = [E|_]) when is_list(E) -> - Commands = [command_to_bin(RawCommand) || RawCommand <- RawCommands], - {redis_command, pipeline, Commands}; + {Bin, Classes} = + lists:foldl(fun(RawCommand, {BinAcc, ClassAcc}) -> + command_to_bin(RawCommand, BinAcc, ClassAcc) + end, {<<>>, []}, RawCommands), + {redis_command, {pipeline, lists:reverse(Classes)}, Bin}; convert_to(RawCommand) -> - Command = command_to_bin(RawCommand), - {redis_command, single, Command}. + {Command, [Class]} = command_to_bin(RawCommand, <<>>, []), + {redis_command, {single, Class}, Command}. -command_to_bin(RawCommand) -> +command_to_bin(RawCommand, BinAcc, ClassAcc) -> Len = integer_to_binary(length(RawCommand)), - Header = <<"*", Len/binary, "\r\n">>, - lists:foldl(fun(Bin, Acc) -> - Size = integer_to_binary(byte_size(Bin)), - <> - end, Header, RawCommand). + Header = <>, + Bin = lists:foldl(fun(Arg, Acc) -> + Size = integer_to_binary(byte_size(Arg)), + <> + end, Header, RawCommand), + Class = resp_class(RawCommand), + {Bin, [Class | ClassAcc]}. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec get_data(redis_command()) -> binary() | [binary()]. +-spec get_data(redis_command()) -> binary(). %% %% Returns the command binary data to send to the socket. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -89,64 +94,47 @@ get_data({redis_command, _, Data}) -> %% mapping responses to the commands. Special handling is needed for %% some pubsub commands. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -get_response_class({redis_command, single, Data}) -> - resp_class(Data); -get_response_class({redis_command, pipeline, Data}) -> - lists:map(fun resp_class/1, Data). +get_response_class({redis_command, {single, Class}, _}) -> Class; +get_response_class({redis_command, {pipeline, Classes}, _}) -> + Classes. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec resp_class(binary()) -> response_class(). +-spec resp_class(raw_command()) -> response_class(). %% -%% Given a RESP-formatted command, returns a classification which can -%% be used to interpret the response(s) from Redis, particularly for -%% pubsub commands that don't return anything but expect certain push -%% messages to indicate success. +%% Given a raw command (list of binaries), returns a classification +%% which can be used to interpret the response(s) from Redis, +%% particularly for pubsub commands that don't return anything but +%% expect certain push messages to indicate success. %% %% If the command name ends in "subscribe", returns a tuple %% {CommandName, NumChannels}. Returns 'normal' otherwise. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -resp_class(<<"*", _, "\r\n$", X, "\r\n", _/binary>>) - when X =/= $9 -> % Shorter than "subscribe" - normal; % Quick path for most commands. -resp_class(<<"*", _, "\r\n$", X, Y, "\r\n", _/binary>>) - when X > $1; Y > $2 -> % Longer than "punsubscribe" - normal; % Quick path. -resp_class(<<"*", N, "\r\n$9\r\n", _/binary>> = Subj) -> - resp_class_helper(Subj, 8, 9, N - $0); -resp_class(<<"*", N, "\r\n$1", X, "\r\n", _/binary>> = Subj) - when X >= $0, X =< $2, N > 1 -> - resp_class_helper(Subj, 9, 10 + X - $0, N - $0); -resp_class(<<"*", M, N, "\r\n$9\r\n", _/binary>> = Subj) -> - resp_class_helper(Subj, 9, 9, (M - $0) * 10 + N - $0); -resp_class(<<"*", M, N, "\r\n$1", X, "\r\n", _/binary>> = Subj) - when X >= $0, X =< $2-> - resp_class_helper(Subj, 10, 10 + X - $0, (M - $0) * 10 + N - $0); -resp_class(<<"*", L, M, N, "\r\n$9\r\n", _/binary>> = Subj) -> - resp_class_helper(Subj, 10, 9, (L - $0) * 100 + (M - $0) * 10 + N - $0); -resp_class(<<"*", L, M, N, "\r\n$1", X, "\r\n", _/binary>> = Subj) - when X >= $0, X =< $2-> - resp_class_helper(Subj, 11, 10 + X - $0, (L - $0) * 100 + (M - $0) * 10 + N - $0); -resp_class(_) -> - normal. - -%% Returns response class when we know that the command starts at Offset within -%% Subject and is of Length 9-12 chars. -resp_class_helper(Subject, Offset, Length, Argc) -> - case binary:at(Subject, Offset + Length - 2) of - B when B =:= $b; B =:= $B -> - %% The B in SUBSCRIBE is at the right position (the penultimate - %% letter). This check eliminates all regular commands of length - %% 9-12 except the ones that end with "subscribe". Now do the slow - %% case-insensitive check to be sure. - case string:lowercase(binary:part(Subject, Offset, Length)) of - <<_:(Length - 9)/binary, "subscribe">> = LowercaseCmd -> - {LowercaseCmd, Argc - 1}; +resp_class([CmdName | Args]) -> + Len = byte_size(CmdName), + if Len < 9; Len > 12 -> + %% Quick path for most commands. + %% Shorter than "subscribe" or longer than "punsubscribe". + normal; + true -> + case binary:at(CmdName, Len - 2) of + B when B =:= $b; B =:= $B -> + %% The B in SUBSCRIBE is at the right position (the + %% penultimate letter). This check eliminates all regular + %% commands of length 9-12 except the ones that end with + %% "subscribe". Now do the slow case-insensitive check to + %% be sure. + case string:lowercase(CmdName) of + <<_:(Len - 9)/binary, "subscribe">> = LowercaseCmd -> + {LowercaseCmd, length(Args)}; + _ -> + normal + end; _ -> normal - end; - _ -> - normal - end. + end + end; +resp_class(_) -> + normal. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec check_result(ok_result() | any()) -> @@ -194,23 +182,54 @@ parse_host_and_port(Bin) -> %% Add ASKING for commands that got an ASK error. The Redis result is needed %% to filter out what commands to keep. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -add_asking(_, {redis_command, single, Command}) -> - {redis_command, pipeline, [[<<"ASKING\r\n">>], Command]}; - -add_asking({ok, OldReplies}, {redis_command, pipeline, Commands}) -> - %% Extract only the commands with ASK redirection. Even if all commands go to - %% the same slot it does not have to be the same key. When using hash tags - %% for instance. - AskCommands = add_asking_pipeline(OldReplies, Commands), - %% ASK commands + ASKING - {redis_command, pipeline, AskCommands}. - -add_asking_pipeline([], _) -> +add_asking(_, {redis_command, {single, Class}, Command}) -> + AskingBin = <<"ASKING\r\n">>, + Bin = <>, + {redis_command, {pipeline, [normal, Class]}, Bin}; + +add_asking({ok, OldReplies}, {redis_command, {pipeline, Classes}, PipelineBin}) -> + %% Split the concatenated binary back into individual commands, + %% then pick only the ones with ASK redirection. + Commands = split_pipeline_bin(PipelineBin), + {AskCommands, AskClasses} = add_asking_pipeline(OldReplies, Commands, Classes), + NewBin = lists:foldl(fun(C, Acc) -> <> end, <<>>, AskCommands), + {redis_command, {pipeline, AskClasses}, NewBin}. + +add_asking_pipeline([], _, _) -> + {[], []}; +add_asking_pipeline([{error, <<"ASK ", _/binary>>} | Replies], [Command | Commands], [Class | Classes]) -> + AskingBin = <<"ASKING\r\n">>, + {RestCmds, RestClasses} = add_asking_pipeline(Replies, Commands, Classes), + {[AskingBin, Command | RestCmds], [normal, Class | RestClasses]}; +add_asking_pipeline([_ | Replies], [_ | Commands], [_ | Classes]) -> + add_asking_pipeline(Replies, Commands, Classes). + +%% Split a concatenated pipeline binary into individual RESP command binaries. +split_pipeline_bin(<<>>) -> []; -add_asking_pipeline([{error, <<"ASK ", _/binary>>} |Replies], [Command |Commands]) -> - [[<<"ASKING\r\n">>], Command | add_asking_pipeline(Replies, Commands)]; -add_asking_pipeline([_ |Replies], [_ |Commands]) -> - add_asking_pipeline(Replies, Commands). +split_pipeline_bin(Bin) -> + {Cmd, Rest} = split_one_command(Bin), + [Cmd | split_pipeline_bin(Rest)]. + +split_one_command(<<"*", Rest/binary>>) -> + {ArgcStr, Rest1} = read_until_crlf(Rest), + Argc = binary_to_integer(ArgcStr), + split_args(Argc, Rest1, <<"*", ArgcStr/binary, "\r\n">>). + +split_args(0, Rest, Acc) -> + {Acc, Rest}; +split_args(N, <<"$", Rest/binary>>, Acc) -> + {LenStr, Rest1} = read_until_crlf(Rest), + Len = binary_to_integer(LenStr), + <> = Rest1, + split_args(N - 1, Rest2, <>). + +read_until_crlf(Bin) -> + case binary:match(Bin, <<"\r\n">>) of + {Pos, 2} -> + <> = Bin, + {Str, Rest} + end. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -232,4 +251,3 @@ merge_ask_result([{error, <<"ASK ", _/binary>>} | Replies1], [_AskOk, Reply | Re [Reply | merge_ask_result(Replies1, Replies2)]; merge_ask_result([Reply | Replies1], Replies2) -> [Reply | merge_ask_result(Replies1, Replies2)]. - diff --git a/test/ered_connection_tests.erl b/test/ered_connection_tests.erl index c9ef7ec..1505c95 100644 --- a/test/ered_connection_tests.erl +++ b/test/ered_connection_tests.erl @@ -58,7 +58,7 @@ trailing_reply_test() -> ?debugFmt("~w", [Conn1]), ered_connection:command_async(Conn1, [<<"ping">>], ping1), receive sent_big_nasty -> ok end, - MalformedCommand = {redis_command, pipeline, [undefined]}, + MalformedCommand = {redis_command, {pipeline, [normal]}, undefined}, ered_connection:command_async(Conn1, MalformedCommand, no_ref), %% make sure the ping is received before the connection is shut down