ekaf_lib:common_sync/4 uses ekaf:pick(Topic) -> pg2:get_closest_pid(Topic) which does not take custom strategies into account. This makes ekaf:produce_sync* functions work differently for a binary key vs a {binkey, binval} tuple:
158> ekaf:produce_sync_batched(<<"hai">>,<<"value">>).
{buffered,0,1}
159> ekaf:produce_sync_batched(<<"hai">>,{<<>>,<<"value">>}).
>>>>>>>>>>>>>>>>>>> {<<"hai">>,
{<<>>,<<"value">>},
[<0.4644.1>,<0.5064.1>,<0.8221.1>]}
[{buffered,0,1}]
The async variants already work as expected in all cases. The partitioner might want to do it's own default thing for all ekaf:produce_* calls.
Reproduce by setting ekaf_callback_custom_partition_picker to:
partition_picker(Topic, Data, State) ->
io:format(">>>>>>>>>>>>>>>>>>> ~p~n",[{Topic, Data, State}]),
{partition, 0}
.
ekaf_lib:common_sync/4usesekaf:pick(Topic)->pg2:get_closest_pid(Topic)which does not takecustomstrategies into account. This makesekaf:produce_sync*functions work differently for a binary key vs a{binkey, binval}tuple:The async variants already work as expected in all cases. The partitioner might want to do it's own default thing for all
ekaf:produce_*calls.Reproduce by setting
ekaf_callback_custom_partition_pickerto: