Skip to content

Commit

Permalink
KAFKA-15626: Replace verification guard object with an specific type (a…
Browse files Browse the repository at this point in the history
…pache#14568)

I've added a new class with an incrementing atomic long to represent the verification guard. Upon creation of verification guard, we will increment this value and assign it to the guard.

The expected behavior is the same as the object guard, but with better debuggability with the string value and type safety (I found a type safety issue in the current code when implementing this)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Artem Livshits <alivshits@confluent.io>
  • Loading branch information
jolshan authored Oct 20, 2023
1 parent eed5e68 commit e8c8969
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 75 deletions.
9 changes: 5 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup

import scala.collection.{Map, Seq}
Expand Down Expand Up @@ -581,8 +581,9 @@ class Partition(val topicPartition: TopicPartition,
}
}

// Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = {
// Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return the
// sentinel VerificationGuard.
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = {
leaderLogIfLocal match {
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
case None => throw new NotLeaderOrFollowerException();
Expand Down Expand Up @@ -1301,7 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
}

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
Expand Down
45 changes: 23 additions & 22 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}

import java.io.{File, IOException}
import java.nio.file.Files
Expand Down Expand Up @@ -599,31 +599,32 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

/**
* Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return null.
* Maybe create and return the VerificationGuard for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return the sentinel VerificationGuard.
*/
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = lock synchronized {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = lock synchronized {
if (hasOngoingTransaction(producerId))
null
VerificationGuard.SENTINEL
else
maybeCreateVerificationGuard(producerId, sequence, epoch)
}

/**
* Maybe create the VerificationStateEntry for the given producer ID -- always return the verification guard
* Maybe create the VerificationStateEntry for the given producer ID -- always return the VerificationGuard
*/
def maybeCreateVerificationGuard(producerId: Long,
sequence: Int,
epoch: Short): Object = lock synchronized {
epoch: Short): VerificationGuard = lock synchronized {
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
}

/**
* If an VerificationStateEntry is present for the given producer ID, return its verification guard, otherwise, return null.
* If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return the
* sentinel VerificationGuard.
*/
def verificationGuard(producerId: Long): Object = lock synchronized {
def verificationGuard(producerId: Long): VerificationGuard = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId)
if (entry != null) entry.verificationGuard else null
if (entry != null) entry.verificationGuard else VerificationGuard.SENTINEL
}

/**
Expand Down Expand Up @@ -715,7 +716,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
origin: AppendOrigin = AppendOrigin.CLIENT,
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
requestLocal: RequestLocal = RequestLocal.NoCaching,
verificationGuard: Object = null): LogAppendInfo = {
verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
}
Expand All @@ -734,7 +735,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validateAndAssignOffsets = false,
leaderEpoch = -1,
requestLocal = None,
verificationGuard = null,
verificationGuard = VerificationGuard.SENTINEL,
// disable to check the validation of record size since the record is already accepted by leader.
ignoreRecordSize = true)
}
Expand Down Expand Up @@ -763,7 +764,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
verificationGuard: Object,
verificationGuard: VerificationGuard,
ignoreRecordSize: Boolean): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
Expand Down Expand Up @@ -1024,7 +1025,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,
origin: AppendOrigin,
requestVerificationGuard: Object):
requestVerificationGuard: VerificationGuard):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
Expand All @@ -1049,17 +1050,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
// There are two phases -- the first append to the log and subsequent appends.
//
// 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
// given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
// 1. First append: Verification starts with creating a VerificationGuard, sending a verification request to the transaction coordinator, and
// given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique VerificationGuard for the transaction
// to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
// have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
// start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
// start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique VerificationGuard, this sequence would not
// result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
//
// 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
}
Expand All @@ -1080,9 +1081,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
(updatedProducers, completedTxns.toList, None)
}

private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: Object): Boolean = {
private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
(requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)
!verificationGuard(batch.producerId).verify(requestVerificationGuard)
}

/**
Expand Down Expand Up @@ -1991,7 +1992,7 @@ object UnifiedLog extends Logging {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
val completedTxn = appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
// Whether we wrote a control marker or a data batch, we can remove verification guard since either the transaction is complete or we have a first offset.
// Whether we wrote a control marker or a data batch, we can remove VerificationGuard since either the transaction is complete or we have a first offset.
if (batch.isTransactional)
producerStateManager.clearVerificationStateEntry(producerId)
completedTxn
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}

import java.io.File
import java.nio.file.{Files, Paths}
Expand Down Expand Up @@ -735,7 +735,7 @@ class ReplicaManager(val config: KafkaConfig,
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds

val verificationGuards: mutable.Map[TopicPartition, Object] = mutable.Map[TopicPartition, Object]()
val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = mutable.Map[TopicPartition, VerificationGuard]()
val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, errorsPerPartition) =
if (transactionalId == null || !config.transactionPartitionVerificationEnable)
(entriesPerPartition, Map.empty[TopicPartition, MemoryRecords], Map.empty[TopicPartition, Errors])
Expand Down Expand Up @@ -864,7 +864,7 @@ class ReplicaManager(val config: KafkaConfig,
}
}

private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, Object],
private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard],
entriesPerPartition: Map[TopicPartition, MemoryRecords],
verifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
Expand All @@ -877,10 +877,10 @@ class ReplicaManager(val config: KafkaConfig,
transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId))

if (transactionalBatches.nonEmpty) {
// We return verification guard if the partition needs to be verified. If no state is present, no need to verify.
// We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify.
val firstBatch = records.firstBatch
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
if (verificationGuard != null) {
if (verificationGuard != VerificationGuard.SENTINEL) {
verificationGuards.put(topicPartition, verificationGuard)
unverifiedEntries.put(topicPartition, records)
} else
Expand Down Expand Up @@ -1156,7 +1156,7 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short,
requestLocal: RequestLocal,
verificationGuards: Map[TopicPartition, Object]): Map[TopicPartition, LogAppendResult] = {
verificationGuards: Map[TopicPartition, VerificationGuard]): Map[TopicPartition, LogAppendResult] = {
val traceEnabled = isTraceEnabled
def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
Expand All @@ -1183,7 +1183,8 @@ class ReplicaManager(val config: KafkaConfig,
} else {
try {
val partition = getPartitionOrException(topicPartition)
val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal, verificationGuards.getOrElse(topicPartition, null))
val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,
verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL))
val numAppendedMessages = info.numMessages

// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
Expand Down Expand Up @@ -450,7 +450,7 @@ class PartitionLockTest extends Logging {
keepPartitionMetadataFile = true) {

override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal, verificationGuard: Object): LogAppendInfo = {
interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal, verificationGuard: VerificationGuard): LogAppendInfo = {
val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion, requestLocal, verificationGuard)
appendSemaphore.acquire()
appendInfo
Expand Down
Loading

0 comments on commit e8c8969

Please sign in to comment.