diff --git a/kafka/init.lua b/kafka/init.lua index 6a4d83f..92d80c8 100644 --- a/kafka/init.lua +++ b/kafka/init.lua @@ -29,13 +29,13 @@ function Consumer.create(config) new:_poll_msg() end) new._poll_msg_fiber:set_joinable(true) - new._poll_msg_fiber:name('kafka_msg_poller') + new._poll_msg_fiber:name('kafka_consumer:msg_poller') if config.log_callback ~= nil then new._poll_logs_fiber = fiber.new(function() new:_poll_logs() end) - new._poll_logs_fiber:name('kafka_logs_poller') + new._poll_logs_fiber:name('kafka_consumer:logs_poller') new._poll_logs_fiber:set_joinable(true) end @@ -43,7 +43,7 @@ function Consumer.create(config) new._poll_stats_fiber = fiber.new(function() new:_poll_stats() end) - new._poll_stats_fiber:name('kafka_stats_poller') + new._poll_stats_fiber:name('kafka_consumer:stats_poller') new._poll_stats_fiber:set_joinable(true) end @@ -51,7 +51,7 @@ function Consumer.create(config) new._poll_errors_fiber = fiber.new(function() new:_poll_errors() end) - new._poll_errors_fiber:name('kafka_error_poller') + new._poll_errors_fiber:name('kafka_consumer:error_poller') new._poll_errors_fiber:set_joinable(true) end @@ -59,7 +59,7 @@ function Consumer.create(config) new._poll_rebalances_fiber = fiber.new(function() new:_poll_rebalances() end) - new._poll_rebalances_fiber:name('kafka_rebalances_poller') + new._poll_rebalances_fiber:name('kafka_consumer:rebalances_poller') new._poll_rebalances_fiber:set_joinable(true) end @@ -82,8 +82,6 @@ function Consumer:_poll_msg() end end -jit.off(Consumer._poll_msg) - function Consumer:_poll_logs() local count, err while true do @@ -101,8 +99,6 @@ function Consumer:_poll_logs() end end -jit.off(Consumer._poll_logs) - function Consumer:_poll_stats() local count, err while true do @@ -120,8 +116,6 @@ function Consumer:_poll_stats() end end -jit.off(Consumer._poll_stats) - function Consumer:_poll_errors() local count, err while true do @@ -139,8 +133,6 @@ function Consumer:_poll_errors() end end -jit.off(Consumer._poll_errors) - function Consumer:_poll_rebalances() local count, err while true do @@ -158,8 +150,6 @@ function Consumer:_poll_rebalances() end end -jit.off(Consumer._poll_rebalances) - function Consumer:close() if self._consumer == nil then return true @@ -167,31 +157,29 @@ function Consumer:close() local ok, err = self._consumer:close() - if self._poll_msg_fiber ~= nil and self._poll_msg_fiber:status() ~= 'dead' then - self._poll_msg_fiber:cancel() - end - if self._output_ch ~= nil and not self._output_ch:is_closed() then - self._output_ch:close() - end - self._poll_msg_fiber:join() + local fibers = { + self._poll_msg_fiber, + self._poll_logs_fiber, + self._poll_stats_fiber, + self._poll_errors_fiber, + self._poll_rebalances_fiber, + } - if self._poll_logs_fiber ~= nil and self._poll_logs_fiber:status() ~= 'dead' then - self._poll_logs_fiber:cancel() - end - if self._poll_stats_fiber ~= nil and self._poll_stats_fiber:status() ~= 'dead' then - self._poll_stats_fiber:cancel() - end - if self._poll_errors_fiber ~= nil and self._poll_errors_fiber:status() ~= 'dead' then - self._poll_errors_fiber:cancel() + for _, f in ipairs(fibers) do + if f ~= nil and f:status() ~= 'dead' then + f:cancel() + end end - if self._poll_rebalances_fiber ~= nil and self._poll_rebalances_fiber:status() ~= 'dead' then - self._poll_rebalances_fiber:cancel() + + for _, f in ipairs(fibers) do + if f ~= nil then + f:join() + end end - self._poll_errors_fiber:join() - self._poll_stats_fiber:join() - self._poll_logs_fiber:join() - self._poll_rebalances_fiber:join() + if self._output_ch ~= nil and not self._output_ch:is_closed() then + self._output_ch:close() + end self._consumer:destroy() self._consumer = nil @@ -301,13 +289,13 @@ function Producer.create(config) new:_msg_delivery_poll() end) new._msg_delivery_poll_fiber:set_joinable(true) - new._msg_delivery_poll_fiber:name('kafka_delivery_poller') + new._msg_delivery_poll_fiber:name('kafka_producer:delivery_poller') if config.log_callback ~= nil then new._poll_logs_fiber = fiber.new(function() new:_poll_logs() end) - new._poll_logs_fiber:name('kafka_logs_poller') + new._poll_logs_fiber:name('kafka_producer:logs_poller') new._poll_logs_fiber:set_joinable(true) end @@ -315,7 +303,7 @@ function Producer.create(config) new._poll_stats_fiber = fiber.new(function() new:_poll_stats() end) - new._poll_stats_fiber:name('kafka_stats_poller') + new._poll_stats_fiber:name('kafka_producer:stats_poller') new._poll_stats_fiber:set_joinable(true) end @@ -323,7 +311,7 @@ function Producer.create(config) new._poll_errors_fiber = fiber.new(function() new:_poll_errors() end) - new._poll_errors_fiber:name('kafka_error_poller') + new._poll_errors_fiber:name('kafka_producer:error_poller') new._poll_errors_fiber:set_joinable(true) end @@ -349,8 +337,6 @@ function Producer:_msg_delivery_poll() end end -jit.off(Producer._msg_delivery_poll) - function Producer:_poll_logs() local count, err while true do @@ -368,8 +354,6 @@ function Producer:_poll_logs() end end -jit.off(Producer._poll_logs) - function Producer:_poll_stats() local count, err while true do @@ -387,8 +371,6 @@ function Producer:_poll_stats() end end -jit.off(Producer._poll_stats) - function Producer:_poll_errors() local count, err while true do @@ -406,8 +388,6 @@ function Producer:_poll_errors() end end -jit.off(Producer._poll_errors) - function Producer:produce_async(msg) local err = self._producer:produce(msg) return err @@ -476,23 +456,24 @@ function Producer:close() local ok, err = self._producer:close() - if self._msg_delivery_poll_fiber ~= nil and self._msg_delivery_poll_fiber:status() ~= 'dead' then - self._msg_delivery_poll_fiber:cancel() - end - if self._poll_logs_fiber ~= nil and self._poll_logs_fiber:status() ~= 'dead' then - self._poll_logs_fiber:cancel() - end - if self._poll_stats_fiber ~= nil and self._poll_stats_fiber:status() ~= 'dead' then - self._poll_stats_fiber:cancel() - end - if self._poll_errors_fiber ~= nil and self._poll_errors_fiber:status() ~= 'dead' then - self._poll_errors_fiber:cancel() + local fibers = { + self._msg_delivery_poll_fiber, + self._poll_logs_fiber, + self._poll_stats_fiber, + self._poll_errors_fiber, + } + + for _, f in ipairs(fibers) do + if f ~= nil and f:status() ~= 'dead' then + f:cancel() + end end - self._msg_delivery_poll_fiber:join() - self._poll_logs_fiber:join() - self._poll_stats_fiber:join() - self._poll_errors_fiber:join() + for _, f in ipairs(fibers) do + if f ~= nil then + f:join() + end + end self._producer:destroy() self._producer = nil