KAFKA-14487: Move LogManager to storage module#21834
KAFKA-14487: Move LogManager to storage module#21834mimaison wants to merge 2 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Quite a few asScala calls here but changing to Java collections would cause too many changes
0760a05 to
e608eeb
Compare
| invocation.callRealMethod().asInstanceOf[Try[File]] | ||
| invocation.callRealMethod() | ||
| } | ||
| }.when(logManager).createLogDirectory(any(), any()) |
There was a problem hiding this comment.
The Scala version of createLogDirectory() used to return Try[File] so we could throw anythinn here to get it to fail.
The Java version of this method just throws KafkaStorageException so updated the logic here to match
| } | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); |
There was a problem hiding this comment.
The Scala logic did not explicitly handle IOException. I decided to catch it and rethrow as unchecked here to keep the call to schedule() clean in startupWithConfigOverrides(). It should produce the same effect as KafkaScheduler.schedule() catches Throwable.
| private final LinkedBlockingQueue<Map.Entry<UnifiedLog, Long>> logsToBeDeleted = new LinkedBlockingQueue<>(); | ||
|
|
||
| // Map of stray partition to stray log. This holds all stray logs detected on the broker. | ||
| private final ConcurrentHashMap<TopicPartition, UnifiedLog> strayLogs = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
We only ever write to this field. This was already the case in the Scala version. I kept it as is. We should probably get rid of it if we don't find a usage.
clolov
left a comment
There was a problem hiding this comment.
Made an initial pass, thanks for starting this 😊!
| if (liveLogDirs.isEmpty()) { | ||
| LOG.error("Shutdown broker because none of the specified log dirs from {} can be created or validated", dirsToString(dirs)); | ||
| Exit.halt(1); | ||
| } |
There was a problem hiding this comment.
The logging error here used to be FATAL instead of ERROR in Scala-world
if (liveLogDirs.isEmpty) {
fatal(s"Shutdown broker because none of the specified log dirs from ${dirs.mkString(", ")} can be created or validated")
Exit.halt(1)
}
Maybe we would like to keep it the same?
There was a problem hiding this comment.
I had a look at our code base and did not find a place where we logged a line with the FATAL marker in Java. So I assumed we just stick to the default levels.
| } catch (KafkaStorageException kse) { | ||
| LOG.error("Disk error while writing recovery offsets checkpoint in directory {}: {}", logDir, kse.getMessage()); | ||
| } |
There was a problem hiding this comment.
In Scala-world we used to handle an IOException here
} catch {
case e: KafkaStorageException =>
error(s"Disk error while writing recovery offsets checkpoint in directory $logDir: ${e.getMessage}")
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(logDir.getAbsolutePath,
s"Disk error while writing recovery offsets checkpoint in directory $logDir: ${e.getMessage}", e)
Is not handling it a miss or on purpose in Java-world?
There was a problem hiding this comment.
This must have been dead code because nothing in that block throws IOException. It's a checked exception so in Java you have to handle it explicitly and you can see that checkpointRecoveryOffsetsInDir() does not throw it.
| * relevant log was being loaded. | ||
| */ | ||
| public void finishedInitializingLog(TopicPartition topicPartition, Optional<UnifiedLog> maybeLog) { | ||
| boolean removedValue = partitionsInitializing.remove(topicPartition); |
There was a problem hiding this comment.
I think remove returns Boolean which has the potential of being null and the unboxing to a primitive can cause a NullPointerException. I think we were protected against this issue in Scala-world.
| LOG.error("Exception while deleting {} in dir {}.", removedLog, removedLog.parentDir(), kse); | ||
| } | ||
| } | ||
| nextDelayMs = nextDeleteDelayMs(nextDelayMs); |
There was a problem hiding this comment.
In Scala-world fileDeleteDelayMs was always used. I need to spend a bit more time to think whether swapping this to nextDelayMs changes the timings, but was this change intentional or accidental?
There was a problem hiding this comment.
Definitively accidental. I agree we should be nextDelayMs = nextDeleteDelayMs(fileDeleteDelayMs); to match the Scala logic.
Keep the scala factory for now as it uses KafkaConfig
I kept the Scala factory (
LogManagerobject) for now as it usesKafkaConfigbut this is migrating all the logic to Java. I tried tomodify
LogMangerTestas little as possible.I've run the 4 JMH benchmarks this touches and saw no performance
changes:
Some of these JMH benchmarks are currently broken, I opened
#21839 to fix them. Update: PR has
been merged, benchmarks are fixed