Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import LazyIndex._
import kafka.utils.CoreUtils.inLock
import kafka.utils.threadsafe
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.AbstractIndex
import org.apache.kafka.server.log.internals.{AbstractIndex, OffsetIndex}

/**
* A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ class LocalLog(@volatile private var _dir: File,
val fetchSize = fetchInfo.records.sizeInBytes
val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).orElse {
segments.higherSegment(segment.baseOffset).map(_.baseOffset).getOrElse(logEndOffset)
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult}
import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetIndex, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult}

import java.util.Optional
import scala.jdk.CollectionConverters._
import scala.math._

Expand Down Expand Up @@ -321,7 +322,7 @@ class LogSegment private[log] (val log: FileRecords,
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}

def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] =
def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] =
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)

/**
Expand Down
207 changes: 0 additions & 207 deletions core/src/main/scala/kafka/log/OffsetIndex.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.{OffsetPosition, TransactionIndex}
import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition, TransactionIndex}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.server.log.internals.TransactionIndex
import org.apache.kafka.server.log.internals.{OffsetIndex, TransactionIndex}
import org.apache.kafka.snapshot.Snapshots

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -129,7 +129,7 @@ object DumpLogSegments {
val startOffset = file.getName.split("\\.")(0).toLong
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
val fileRecords = FileRecords.open(logFile, false)
val index = new OffsetIndex(file, baseOffset = startOffset, writable = false)
val index = new OffsetIndex(file, startOffset, -1, false)

if (index.entries == 0) {
println(s"$file is empty.")
Expand Down Expand Up @@ -171,7 +171,7 @@ object DumpLogSegments {
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
val fileRecords = FileRecords.open(logFile, false)
val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.IndexFileSuffix)
val index = new OffsetIndex(indexFile, baseOffset = startOffset, writable = false)
val index = new OffsetIndex(indexFile, startOffset, -1, false)
val timeIndex = new TimeIndex(file, baseOffset = startOffset, writable = false)

try {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.log.internals.AbortedTxn
import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetIndex}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down
34 changes: 17 additions & 17 deletions core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.io._
import java.nio.file.Files
import org.junit.jupiter.api.Assertions._

import java.util.{Arrays, Collections}
import java.util.{Arrays, Collections, Optional}
import org.junit.jupiter.api._

import scala.collection._
import scala.util.Random
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.server.log.internals.OffsetPosition
import org.apache.kafka.server.log.internals.{OffsetIndex, OffsetPosition}

import scala.annotation.nowarn

Expand All @@ -40,7 +40,7 @@ class OffsetIndexTest {

@BeforeEach
def setup(): Unit = {
this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset, maxIndexSize = 30 * 8)
this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 30 * 8)
}

@AfterEach
Expand Down Expand Up @@ -127,19 +127,19 @@ class OffsetIndexTest {
val third = new OffsetPosition(baseOffset + 2, 23)
val fourth = new OffsetPosition(baseOffset + 3, 37)

assertEquals(None, idx.fetchUpperBoundOffset(first, 5))
assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 5))

for (offsetPosition <- Seq(first, second, third, fourth))
idx.append(offsetPosition.offset, offsetPosition.position)

assertEquals(Some(second), idx.fetchUpperBoundOffset(first, 5))
assertEquals(Some(second), idx.fetchUpperBoundOffset(first, 10))
assertEquals(Some(third), idx.fetchUpperBoundOffset(first, 23))
assertEquals(Some(third), idx.fetchUpperBoundOffset(first, 22))
assertEquals(Some(fourth), idx.fetchUpperBoundOffset(second, 24))
assertEquals(None, idx.fetchUpperBoundOffset(fourth, 1))
assertEquals(None, idx.fetchUpperBoundOffset(first, 200))
assertEquals(None, idx.fetchUpperBoundOffset(second, 200))
assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 5))
assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 10))
assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 23))
assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 22))
assertEquals(Optional.of(fourth), idx.fetchUpperBoundOffset(second, 24))
assertEquals(Optional.empty, idx.fetchUpperBoundOffset(fourth, 1))
assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 200))
assertEquals(Optional.empty, idx.fetchUpperBoundOffset(second, 200))
}

@Test
Expand All @@ -149,7 +149,7 @@ class OffsetIndexTest {
idx.append(first.offset, first.position)
idx.append(sec.offset, sec.position)
idx.close()
val idxRo = new OffsetIndex(idx.file, baseOffset = idx.baseOffset)
val idxRo = new OffsetIndex(idx.file, idx.baseOffset)
assertEquals(first, idxRo.lookup(first.offset))
assertEquals(sec, idxRo.lookup(sec.offset))
assertEquals(sec.offset, idxRo.lastOffset)
Expand All @@ -159,8 +159,8 @@ class OffsetIndexTest {

@Test
def truncate(): Unit = {
val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
idx.truncate()
val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8)
idx.truncate()
for(i <- 1 until 10)
idx.append(i, i)

Expand Down Expand Up @@ -200,7 +200,7 @@ class OffsetIndexTest {

@Test
def forceUnmapTest(): Unit = {
val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8)
idx.forceUnmap()
// mmap should be null after unmap causing lookup to throw a NPE
assertThrows(classOf[NullPointerException], () => idx.lookup(1))
Expand All @@ -210,7 +210,7 @@ class OffsetIndexTest {
def testSanityLastOffsetEqualToBaseOffset(): Unit = {
// Test index sanity for the case where the last offset appended to the index is equal to the base offset
val baseOffset = 20L
val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = baseOffset, maxIndexSize = 10 * 8)
val idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 10 * 8)
idx.append(baseOffset, 0)
idx.sanityCheck()
}
Expand Down
Loading