Skip to content

KAFKA-14487: Move LogManager to storage module#21834

Open
mimaison wants to merge 2 commits intoapache:trunkfrom
mimaison:kafka-14487-2
Open

KAFKA-14487: Move LogManager to storage module#21834
mimaison wants to merge 2 commits intoapache:trunkfrom
mimaison:kafka-14487-2

Conversation

@mimaison
Copy link
Member

@mimaison mimaison commented Mar 20, 2026

I kept the Scala factory (LogManager object) for now as it uses
KafkaConfig but this is migrating all the logic to Java. I tried to
modify LogMangerTest as little as possible.

I've run the 4 JMH benchmarks this touches and saw no performance
changes:

PR:
Benchmark                                  (partitionCount)  Mode  Cnt
Score       Error  Units
ReplicaFetcherThreadBenchmark.testFetcher               100  avgt    3
4929.594 ±   527.256  ns/op
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt    3
28161.077 ±  1448.314  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt    3
49003.192 ±  2650.147  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt    3
420319.519 ± 20527.880  ns/op

trunk:   Benchmark                                  (partitionCount)
Mode  Cnt       Score      Error  Units
ReplicaFetcherThreadBenchmark.testFetcher               100  avgt    5
4923.481 ±  207.098  ns/op  ReplicaFetcherThreadBenchmark.testFetcher
500  avgt    5   34052.929 ±  533.669  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt    5
64309.265 ±  786.801  ns/op  ReplicaFetcherThreadBenchmark.testFetcher
5000  avgt    5  449232.172 ± 3014.692  ns/op

PR:  Benchmark                                        Mode  Cnt    Score
Error  Units  PartitionMakeFollowerBenchmark.testMakeFollower  avgt    5
850.541 ± 68.376  ns/op

trunk:  Benchmark                                        Mode  Cnt
Score    Error  Units  PartitionMakeFollowerBenchmark.testMakeFollower
avgt    5  901.512 ± 70.109  ns/op

PR:  Benchmark                                         (numPartitions)
(numTopics)   Mode  Cnt  Score   Error   Units
CheckpointBench.measureCheckpointHighWatermarks                 3
100  thrpt    9  0.148 ± 0.015  ops/ms
CheckpointBench.measureCheckpointHighWatermarks                 3
1000  thrpt    9  0.110 ± 0.003  ops/ms
CheckpointBench.measureCheckpointHighWatermarks                 3
2000  thrpt    9  0.101 ± 0.003  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets                3
100  thrpt    9  0.152 ± 0.008  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets                3
1000  thrpt    9  0.105 ± 0.005  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets                3
2000  thrpt    9  0.104 ± 0.005  ops/ms

trunk:  Benchmark
(numPartitions)  (numTopics)   Mode  Cnt  Score   Error   Units
CheckpointBench.measureCheckpointHighWatermarks                 3
100  thrpt    9  0.152 ± 0.004  ops/ms
CheckpointBench.measureCheckpointHighWatermarks                 3
1000  thrpt    9  0.111 ± 0.002  ops/ms
CheckpointBench.measureCheckpointHighWatermarks                 3
2000  thrpt    9  0.100 ± 0.001  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets                3
100  thrpt    9  0.155 ± 0.012  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets                3
1000  thrpt    9  0.109 ± 0.004  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets                3
2000  thrpt    9  0.105 ± 0.009  ops/ms

PR:  Benchmark                            (numPartitions)  (useTopicIds)
Mode  Cnt    Score    Error  Units  PartitionCreationBench.makeFollower
20          false  avgt    9   22.895 ±  4.214  ms/op
PartitionCreationBench.makeFollower               20           true
avgt    9  113.134 ± 14.783  ms/op

trunk:
Benchmark                            (numPartitions)  (useTopicIds)
Mode  Cnt    Score   Error  Units
PartitionCreationBench.makeFollower               20          false
avgt    9   23.214 ± 5.620  ms/op
PartitionCreationBench.makeFollower               20           true
avgt    9  112.148 ± 9.270  ms/op

Some of these JMH benchmarks are currently broken, I opened
#21839 to fix them. Update: PR has
been merged, benchmarks are fixed

@github-actions github-actions bot added core Kafka Broker tools performance storage Pull requests that target the storage module clients labels Mar 20, 2026
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite a few asScala calls here but changing to Java collections would cause too many changes

@mimaison mimaison force-pushed the kafka-14487-2 branch 2 times, most recently from 0760a05 to e608eeb Compare March 20, 2026 09:55
invocation.callRealMethod().asInstanceOf[Try[File]]
invocation.callRealMethod()
}
}.when(logManager).createLogDirectory(any(), any())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala version of createLogDirectory() used to return Try[File] so we could throw anythinn here to get it to fail.

The Java version of this method just throws KafkaStorageException so updated the logic here to match

}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala logic did not explicitly handle IOException. I decided to catch it and rethrow as unchecked here to keep the call to schedule() clean in startupWithConfigOverrides(). It should produce the same effect as KafkaScheduler.schedule() catches Throwable.

private final LinkedBlockingQueue<Map.Entry<UnifiedLog, Long>> logsToBeDeleted = new LinkedBlockingQueue<>();

// Map of stray partition to stray log. This holds all stray logs detected on the broker.
private final ConcurrentHashMap<TopicPartition, UnifiedLog> strayLogs = new ConcurrentHashMap<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only ever write to this field. This was already the case in the Scala version. I kept it as is. We should probably get rid of it if we don't find a usage.

Copy link
Contributor

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an initial pass, thanks for starting this 😊!

Comment on lines +385 to +388
if (liveLogDirs.isEmpty()) {
LOG.error("Shutdown broker because none of the specified log dirs from {} can be created or validated", dirsToString(dirs));
Exit.halt(1);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging error here used to be FATAL instead of ERROR in Scala-world

    if (liveLogDirs.isEmpty) {
      fatal(s"Shutdown broker because none of the specified log dirs from ${dirs.mkString(", ")} can be created or validated")
      Exit.halt(1)
    }

Maybe we would like to keep it the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at our code base and did not find a place where we logged a line with the FATAL marker in Java. So I assumed we just stick to the default levels.

Comment on lines +1081 to +1083
} catch (KafkaStorageException kse) {
LOG.error("Disk error while writing recovery offsets checkpoint in directory {}: {}", logDir, kse.getMessage());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala-world we used to handle an IOException here

    } catch {
      case e: KafkaStorageException =>
        error(s"Disk error while writing recovery offsets checkpoint in directory $logDir: ${e.getMessage}")
      case e: IOException =>
        logDirFailureChannel.maybeAddOfflineLogDir(logDir.getAbsolutePath,
          s"Disk error while writing recovery offsets checkpoint in directory $logDir: ${e.getMessage}", e)

Is not handling it a miss or on purpose in Java-world?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must have been dead code because nothing in that block throws IOException. It's a checked exception so in Java you have to handle it explicitly and you can see that checkpointRecoveryOffsetsInDir() does not throw it.

* relevant log was being loaded.
*/
public void finishedInitializingLog(TopicPartition topicPartition, Optional<UnifiedLog> maybeLog) {
boolean removedValue = partitionsInitializing.remove(topicPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think remove returns Boolean which has the potential of being null and the unboxing to a primitive can cause a NullPointerException. I think we were protected against this issue in Scala-world.

LOG.error("Exception while deleting {} in dir {}.", removedLog, removedLog.parentDir(), kse);
}
}
nextDelayMs = nextDeleteDelayMs(nextDelayMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala-world fileDeleteDelayMs was always used. I need to spend a bit more time to think whether swapping this to nextDelayMs changes the timings, but was this change intentional or accidental?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitively accidental. I agree we should be nextDelayMs = nextDeleteDelayMs(fileDeleteDelayMs); to match the Scala logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients core Kafka Broker performance storage Pull requests that target the storage module tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants