Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 44 additions & 63 deletions kafka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,37 @@ function Consumer.create(config)
new:_poll_msg()
end)
new._poll_msg_fiber:set_joinable(true)
new._poll_msg_fiber:name('kafka_msg_poller')
new._poll_msg_fiber:name('kafka_consumer:msg_poller')

if config.log_callback ~= nil then
new._poll_logs_fiber = fiber.new(function()
new:_poll_logs()
end)
new._poll_logs_fiber:name('kafka_logs_poller')
new._poll_logs_fiber:name('kafka_consumer:logs_poller')
new._poll_logs_fiber:set_joinable(true)
end

if config.stats_callback ~= nil then
new._poll_stats_fiber = fiber.new(function()
new:_poll_stats()
end)
new._poll_stats_fiber:name('kafka_stats_poller')
new._poll_stats_fiber:name('kafka_consumer:stats_poller')
new._poll_stats_fiber:set_joinable(true)
end

if config.error_callback ~= nil then
new._poll_errors_fiber = fiber.new(function()
new:_poll_errors()
end)
new._poll_errors_fiber:name('kafka_error_poller')
new._poll_errors_fiber:name('kafka_consumer:error_poller')
new._poll_errors_fiber:set_joinable(true)
end

if config.rebalance_callback ~= nil then
new._poll_rebalances_fiber = fiber.new(function()
new:_poll_rebalances()
end)
new._poll_rebalances_fiber:name('kafka_rebalances_poller')
new._poll_rebalances_fiber:name('kafka_consumer:rebalances_poller')
new._poll_rebalances_fiber:set_joinable(true)
end

Expand All @@ -82,8 +82,6 @@ function Consumer:_poll_msg()
end
end

jit.off(Consumer._poll_msg)

function Consumer:_poll_logs()
local count, err
while true do
Expand All @@ -101,8 +99,6 @@ function Consumer:_poll_logs()
end
end

jit.off(Consumer._poll_logs)

function Consumer:_poll_stats()
local count, err
while true do
Expand All @@ -120,8 +116,6 @@ function Consumer:_poll_stats()
end
end

jit.off(Consumer._poll_stats)

function Consumer:_poll_errors()
local count, err
while true do
Expand All @@ -139,8 +133,6 @@ function Consumer:_poll_errors()
end
end

jit.off(Consumer._poll_errors)

function Consumer:_poll_rebalances()
local count, err
while true do
Expand All @@ -158,40 +150,36 @@ function Consumer:_poll_rebalances()
end
end

jit.off(Consumer._poll_rebalances)

function Consumer:close()
if self._consumer == nil then
return true
end

local ok, err = self._consumer:close()

if self._poll_msg_fiber ~= nil and self._poll_msg_fiber:status() ~= 'dead' then
self._poll_msg_fiber:cancel()
end
if self._output_ch ~= nil and not self._output_ch:is_closed() then
self._output_ch:close()
end
self._poll_msg_fiber:join()
local fibers = {
self._poll_msg_fiber,
self._poll_logs_fiber,
self._poll_stats_fiber,
self._poll_errors_fiber,
self._poll_rebalances_fiber,
}

if self._poll_logs_fiber ~= nil and self._poll_logs_fiber:status() ~= 'dead' then
self._poll_logs_fiber:cancel()
end
if self._poll_stats_fiber ~= nil and self._poll_stats_fiber:status() ~= 'dead' then
self._poll_stats_fiber:cancel()
end
if self._poll_errors_fiber ~= nil and self._poll_errors_fiber:status() ~= 'dead' then
self._poll_errors_fiber:cancel()
for _, f in ipairs(fibers) do
if f ~= nil and f:status() ~= 'dead' then
f:cancel()
end
end
if self._poll_rebalances_fiber ~= nil and self._poll_rebalances_fiber:status() ~= 'dead' then
self._poll_rebalances_fiber:cancel()

for _, f in ipairs(fibers) do
if f ~= nil then
f:join()
end
end

self._poll_errors_fiber:join()
self._poll_stats_fiber:join()
self._poll_logs_fiber:join()
self._poll_rebalances_fiber:join()
if self._output_ch ~= nil and not self._output_ch:is_closed() then
self._output_ch:close()
end

self._consumer:destroy()
self._consumer = nil
Expand Down Expand Up @@ -301,29 +289,29 @@ function Producer.create(config)
new:_msg_delivery_poll()
end)
new._msg_delivery_poll_fiber:set_joinable(true)
new._msg_delivery_poll_fiber:name('kafka_delivery_poller')
new._msg_delivery_poll_fiber:name('kafka_producer:delivery_poller')

if config.log_callback ~= nil then
new._poll_logs_fiber = fiber.new(function()
new:_poll_logs()
end)
new._poll_logs_fiber:name('kafka_logs_poller')
new._poll_logs_fiber:name('kafka_producer:logs_poller')
new._poll_logs_fiber:set_joinable(true)
end

if config.stats_callback ~= nil then
new._poll_stats_fiber = fiber.new(function()
new:_poll_stats()
end)
new._poll_stats_fiber:name('kafka_stats_poller')
new._poll_stats_fiber:name('kafka_producer:stats_poller')
new._poll_stats_fiber:set_joinable(true)
end

if config.error_callback ~= nil then
new._poll_errors_fiber = fiber.new(function()
new:_poll_errors()
end)
new._poll_errors_fiber:name('kafka_error_poller')
new._poll_errors_fiber:name('kafka_producer:error_poller')
new._poll_errors_fiber:set_joinable(true)
end

Expand All @@ -349,8 +337,6 @@ function Producer:_msg_delivery_poll()
end
end

jit.off(Producer._msg_delivery_poll)

function Producer:_poll_logs()
local count, err
while true do
Expand All @@ -368,8 +354,6 @@ function Producer:_poll_logs()
end
end

jit.off(Producer._poll_logs)

function Producer:_poll_stats()
local count, err
while true do
Expand All @@ -387,8 +371,6 @@ function Producer:_poll_stats()
end
end

jit.off(Producer._poll_stats)

function Producer:_poll_errors()
local count, err
while true do
Expand All @@ -406,8 +388,6 @@ function Producer:_poll_errors()
end
end

jit.off(Producer._poll_errors)

function Producer:produce_async(msg)
local err = self._producer:produce(msg)
return err
Expand Down Expand Up @@ -476,23 +456,24 @@ function Producer:close()

local ok, err = self._producer:close()

if self._msg_delivery_poll_fiber ~= nil and self._msg_delivery_poll_fiber:status() ~= 'dead' then
self._msg_delivery_poll_fiber:cancel()
end
if self._poll_logs_fiber ~= nil and self._poll_logs_fiber:status() ~= 'dead' then
self._poll_logs_fiber:cancel()
end
if self._poll_stats_fiber ~= nil and self._poll_stats_fiber:status() ~= 'dead' then
self._poll_stats_fiber:cancel()
end
if self._poll_errors_fiber ~= nil and self._poll_errors_fiber:status() ~= 'dead' then
self._poll_errors_fiber:cancel()
local fibers = {
self._msg_delivery_poll_fiber,
self._poll_logs_fiber,
self._poll_stats_fiber,
self._poll_errors_fiber,
}

for _, f in ipairs(fibers) do
if f ~= nil and f:status() ~= 'dead' then
f:cancel()
end
end

self._msg_delivery_poll_fiber:join()
self._poll_logs_fiber:join()
self._poll_stats_fiber:join()
self._poll_errors_fiber:join()
for _, f in ipairs(fibers) do
if f ~= nil then
f:join()
end
end

self._producer:destroy()
self._producer = nil
Expand Down