Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeEx
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.OffsetPosition
import org.apache.kafka.server.log.internals.{AbortedTxn, OffsetPosition}

import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable}
Expand Down Expand Up @@ -448,7 +448,7 @@ class LocalLog(@volatile private var _dir: File,
}

val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)

FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
Expand All @@ -459,13 +459,13 @@ class LocalLog(@volatile private var _dir: File,

private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
startingSegment: LogSegment,
accumulator: List[AbortedTxn] => Unit): Unit = {
accumulator: Seq[AbortedTxn] => Unit): Unit = {
val higherSegments = segments.higherSegments(startingSegment.baseOffset).iterator
var segmentEntryOpt = Option(startingSegment)
while (segmentEntryOpt.isDefined) {
val segment = segmentEntryOpt.get
val searchResult = segment.collectAbortedTxns(startOffset, upperBoundOffset)
accumulator(searchResult.abortedTransactions)
accumulator(searchResult.abortedTransactions.asScala)
if (searchResult.isComplete)
return
segmentEntryOpt = nextOption(higherSegments)
Expand All @@ -475,7 +475,7 @@ class LocalLog(@volatile private var _dir: File,
private[log] def collectAbortedTransactions(logStartOffset: Long, baseOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
val segmentEntry = segments.floorSegment(baseOffset)
val allAbortedTxns = ListBuffer.empty[AbortedTxn]
def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
segmentEntry.foreach(segment => collectAbortedTransactions(logStartOffset, upperBoundOffset, segment, accumulator))
allAbortedTxns.toList
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{AbortedTxn, TransactionIndex}

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -1123,7 +1124,7 @@ private[log] class CleanedTransactionMetadata {
private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
// Minheap of aborted transactions sorted by the transaction first offset
private val abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
override def compare(x: AbortedTxn, y: AbortedTxn): Int = java.lang.Long.compare(x.firstOffset, y.firstOffset)
}.reverse)

// Output cleaned index to write retained aborted transactions
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.log.internals.{OffsetPosition, TimestampOffset}
import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn, OffsetPosition, TimestampOffset, TransactionIndex, TxnIndexSearchResult}

import scala.jdk.CollectionConverters._
import scala.math._
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/ProducerStateManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C, Time, Utils}
import org.apache.kafka.server.log.internals.CompletedTxn

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -318,7 +319,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
// without any associated data will not have any impact on the last stable offset
// and would not need to be reflected in the transaction index.
val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
new CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
}

updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
Expand Down
264 changes: 0 additions & 264 deletions core/src/main/scala/kafka/log/TransactionIndex.scala

This file was deleted.

21 changes: 1 addition & 20 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, CompletedTxn}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig

import scala.annotation.nowarn
Expand Down Expand Up @@ -159,26 +160,6 @@ case class LogReadInfo(fetchedData: FetchDataInfo,
logEndOffset: Long,
lastStableOffset: Long)

/**
* A class used to hold useful metadata about a completed transaction. This is used to build
* the transaction index after appending to the log.
*
* @param producerId The ID of the producer
* @param firstOffset The first offset (inclusive) of the transaction
* @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
* COMMIT/ABORT control record which indicates the transaction's completion.
* @param isAborted Whether or not the transaction was aborted
*/
case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) {
override def toString: String = {
"CompletedTxn(" +
s"producerId=$producerId, " +
s"firstOffset=$firstOffset, " +
s"lastOffset=$lastOffset, " +
s"isAborted=$isAborted)"
}
}

/**
* A class used to hold params required to decide to rotate a log segment or not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.OffsetPosition
import org.apache.kafka.server.log.internals.{OffsetPosition, TransactionIndex}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}

Expand Down
Loading