Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
440 changes: 0 additions & 440 deletions core/src/main/scala/kafka/log/AbstractIndex.scala

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package kafka.log
import java.io.File
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.locks.ReentrantLock

import LazyIndex._
import kafka.utils.CoreUtils.inLock
import kafka.utils.threadsafe
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.AbstractIndex

/**
* A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading
Expand Down
35 changes: 17 additions & 18 deletions core/src/main/scala/kafka/log/OffsetIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.server.log.internals.{CorruptIndexException, OffsetPosition}
import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, OffsetPosition}

/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
Expand Down Expand Up @@ -60,14 +60,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
private[this] var _lastOffset = lastEntry.offset

debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, " +
s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = ${_lastOffset}, file position = ${mmap.position()}")
s"maxIndexSize = $maxIndexSize, entries = $entries, lastOffset = ${_lastOffset}, file position = ${mmap.position()}")

/**
* The last entry in the index
*/
private def lastEntry: OffsetPosition = {
inLock(lock) {
_entries match {
entries match {
case 0 => new OffsetPosition(baseOffset, 0)
case s => parseEntry(mmap, s - 1)
}
Expand All @@ -86,14 +86,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* the pair (baseOffset, 0) is returned.
*/
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
maybeLock(lock, {() =>
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
new OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
}
})
}

/**
Expand All @@ -102,14 +102,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* such offset.
*/
def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = {
maybeLock(lock) {
maybeLock(lock, { () =>
val idx = mmap.duplicate
val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE)
if (slot == -1)
None
else
Some(parseEntry(idx, slot))
}
})
}

private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
Expand All @@ -126,12 +126,12 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* @return The offset/position pair at that entry
*/
def entry(n: Int): OffsetPosition = {
maybeLock(lock) {
if (n >= _entries)
maybeLock(lock, { () =>
if (n >= entries)
throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " +
s"which has size ${_entries}.")
s"which has size $entries.")
parseEntry(mmap, n)
}
})
}

/**
Expand All @@ -141,14 +141,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
*/
def append(offset: Long, position: Int): Unit = {
inLock(lock) {
require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
if (_entries == 0 || offset > _lastOffset) {
require(!isFull, "Attempt to append to a full index (size = " + entries + ").")
if (entries == 0 || offset > _lastOffset) {
trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
mmap.putInt(relativeOffset(offset))
mmap.putInt(position)
_entries += 1
incrementEntries()
_lastOffset = offset
require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
} else {
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
Expand Down Expand Up @@ -184,16 +184,15 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
*/
private def truncateToEntries(entries: Int): Unit = {
inLock(lock) {
_entries = entries
mmap.position(_entries * entrySize)
super.truncateToEntries0(entries)
_lastOffset = lastEntry.offset
debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" +
s" position is now ${mmap.position()} and last offset is now ${_lastOffset}")
}
}

override def sanityCheck(): Unit = {
if (_entries != 0 && _lastOffset < baseOffset)
if (entries != 0 && _lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " +
s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.")
if (length % entrySize != 0)
Expand Down
39 changes: 19 additions & 20 deletions core/src/main/scala/kafka/log/TimeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.server.log.internals.{CorruptIndexException, TimestampOffset}
import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, TimestampOffset}

/**
* An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
Expand Down Expand Up @@ -59,7 +59,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
override def entrySize = 12

debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, maxIndexSize = $maxIndexSize," +
s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = ${mmap.position()}")
s" entries = $entries, lastOffset = ${_lastEntry}, file position = ${mmap.position()}")

// We override the full check to reserve the last time index entry slot for the on roll call.
override def isFull: Boolean = entries >= maxEntries - 1
Expand All @@ -75,7 +75,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
*/
private def lastEntryFromIndexFile: TimestampOffset = {
inLock(lock) {
_entries match {
entries match {
case 0 => new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
case s => parseEntry(mmap, s - 1)
}
Expand All @@ -88,12 +88,12 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
* @return The timestamp/offset pair at that entry
*/
def entry(n: Int): TimestampOffset = {
maybeLock(lock) {
if(n >= _entries)
maybeLock(lock, { () =>
if(n >= entries)
throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from time index ${file.getAbsolutePath} " +
s"which has size ${_entries}.")
s"which has size $entries.")
parseEntry(mmap, n)
}
})
}

override def parseEntry(buffer: ByteBuffer, n: Int): TimestampOffset = {
Expand All @@ -113,18 +113,18 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
inLock(lock) {
if (!skipFullCheck)
require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
require(!isFull, "Attempt to append to a full time index (size = " + entries + ").")
// We do not throw exception when the offset equals to the offset of last entry. That means we are trying
// to insert the same time index entry as the last entry.
// If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
// because that could happen in the following two scenarios:
// 1. A log segment is closed.
// 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
if (_entries != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
if (entries != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot $entries no larger than" +
s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
if (_entries != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
if (entries != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot $entries no larger" +
s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
// We only append to the time index when the timestamp is greater than the last inserted timestamp.
// If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
Expand All @@ -133,9 +133,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
mmap.putLong(timestamp)
mmap.putInt(relativeOffset(offset))
_entries += 1
incrementEntries()
_lastEntry = new TimestampOffset(timestamp, offset)
require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
}
}
}
Expand All @@ -149,14 +149,14 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
* @return The time index entry found.
*/
def lookup(targetTimestamp: Long): TimestampOffset = {
maybeLock(lock) {
maybeLock(lock, {() =>
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
if (slot == -1)
new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
else
parseEntry(idx, slot)
}
})
}

override def truncate(): Unit = truncateToEntries(0)
Expand Down Expand Up @@ -201,8 +201,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
*/
private def truncateToEntries(entries: Int): Unit = {
inLock(lock) {
_entries = entries
mmap.position(_entries * entrySize)
super.truncateToEntries0(entries)
_lastEntry = lastEntryFromIndexFile
debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; position is now ${mmap.position()} and last entry is now ${_lastEntry}")
}
Expand All @@ -211,11 +210,11 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
override def sanityCheck(): Unit = {
val lastTimestamp = lastEntry.timestamp
val lastOffset = lastEntry.offset
if (_entries != 0 && lastTimestamp < timestamp(mmap, 0))
if (entries != 0 && lastTimestamp < timestamp(mmap, 0))
throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
s"non-zero size but the last timestamp is $lastTimestamp which is less than the first timestamp " +
s"${timestamp(mmap, 0)}")
if (_entries != 0 && lastOffset < baseOffset)
if (entries != 0 && lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
s"non-zero size but the last offset is $lastOffset which is less than the first offset $baseOffset")
if (length % entrySize != 0)
Expand Down
7 changes: 7 additions & 0 deletions gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>

<Match>
<!-- False positive - the volatile read is guarded by a lock. -->
<Class name="org.apache.kafka.server.log.internals.AbstractIndex"/>
<Method name="incrementEntries"/>
<Bug pattern="VO_VOLATILE_INCREMENT"/>
</Match>

<Match>
<!-- Suppress a spurious warning about a missing default case. -->
<Or>
Expand Down
Loading