Skip to content

Commit

Permalink
[SPARK-48687][SS] Add change to perform state schema validation and u…
Browse files Browse the repository at this point in the history
…pdate on driver in planning phase for stateful queries

### What changes were proposed in this pull request?
Add change to perform schema validation and update on driver for stateful queries

### Why are the changes needed?
Avoid making specific partition checks on executor. Also allows for versioning easily in the future

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests and ran existing tests

```
[info] Run completed in 10 seconds, 500 milliseconds.
[info] Total number of tests run: 29
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 29, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 35 s, completed Jun 21, 2024, 3:58:12 PM
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47035 from anishshri-db/task/schema-changes-on-driver.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Jun 26, 2024
1 parent a474b88 commit a50b30d
Show file tree
Hide file tree
Showing 98 changed files with 514 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit.NANOSECONDS

import org.apache.hadoop.conf.Configuration

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down Expand Up @@ -187,6 +189,11 @@ trait FlatMapGroupsWithStateExecBase
})
}

override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = {
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, hadoopConf,
groupingAttributes.toStructType, stateManager.stateSchema, session.sqlContext.sessionState)
}

override protected def doExecute(): RDD[InternalRow] = {
stateManager // force lazy init at driver
metrics // force lazy init at driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ class IncrementalExecution(
}
}

// Planning rule used to record the state schema for the first run and validate state schema
// changes across query runs.
object StateSchemaValidationRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case statefulOp: StatefulOperator if isFirstBatch =>
statefulOp.validateAndMaybeEvolveStateSchema(hadoopConf)
statefulOp
}
}

object StateOpIdRule extends SparkPlanPartialRule {
override val rule: PartialFunction[SparkPlan, SparkPlan] = {
case StateStoreSaveExec(keys, None, None, None, None, stateFormatVersion,
Expand Down Expand Up @@ -471,9 +481,12 @@ class IncrementalExecution(
if (isFirstBatch && currentBatchId != 0) {
checkOperatorValidWithMetadata(planWithStateOpId)
}
// The rule doesn't change the plan but cause the side effect that metadata is written
// in the checkpoint directory of stateful operator.

// The two rules below don't change the plan but can cause the side effect that
// metadata/schema is written in the checkpoint directory of stateful operator.
planWithStateOpId transform StateSchemaValidationRule.rule
planWithStateOpId transform WriteStatefulOperatorMetadataRule.rule

simulateWatermarkPropagation(planWithStateOpId)
planWithStateOpId transform WatermarkPropagationRule.rule
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit.NANOSECONDS

import org.apache.hadoop.conf.Configuration

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, JoinedRow, Literal, Predicate, UnsafeProjection, UnsafeRow}
Expand All @@ -31,6 +33,7 @@ import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.KeyToValuePair
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}


Expand Down Expand Up @@ -243,6 +246,23 @@ case class StreamingSymmetricHashJoinExec(
watermarkUsedForStateCleanup && watermarkHasChanged
}

override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = {
var result: Map[String, (StructType, StructType)] = Map.empty
// get state schema for state stores on left side of the join
result ++= SymmetricHashJoinStateManager.getSchemaForStateStores(LeftSide,
left.output, leftKeys, stateFormatVersion)

// get state schema for state stores on right side of the join
result ++= SymmetricHashJoinStateManager.getSchemaForStateStores(RightSide,
right.output, rightKeys, stateFormatVersion)

// validate and maybe evolve schema for all state stores across both sides of the join
result.foreach { case (stateStoreName, (keySchema, valueSchema)) =>
StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, hadoopConf,
keySchema, valueSchema, session.sessionState, storeName = stateStoreName)
}
}

protected override def doExecute(): RDD[InternalRow] = {
val stateStoreCoord = session.sessionState.streamingQueryManager.stateStoreCoordinator
val stateStoreNames = SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.UUID
import java.util.concurrent.TimeUnit.NANOSECONDS

import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -338,6 +340,13 @@ case class TransformWithStateExec(
)
}

override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = {
// TODO: transformWithState is special because we don't have the schema of the state directly
// within the passed args. We need to gather this after running the init function
// within the stateful processor on the driver. This also requires a schema format change
// when recording this information persistently.
}

override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.spark.sql.execution.streaming.state

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, StatefulOperatorStateInfo}
import org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, SchemaWriter}
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.{DataType, StructType}

class StateSchemaCompatibilityChecker(
Expand All @@ -37,30 +42,30 @@ class StateSchemaCompatibilityChecker(

fm.mkdirs(schemaFileLocation.getParent)

def check(keySchema: StructType, valueSchema: StructType): Unit = {
check(keySchema, valueSchema, ignoreValueSchema = false)
}
/**
* Function to check if new state store schema is compatible with the existing schema.
* @param oldSchema - old state schema
* @param newSchema - new state schema
* @param ignoreValueSchema - whether to ignore value schema or not
*/
private def check(
oldSchema: (StructType, StructType),
newSchema: (StructType, StructType),
ignoreValueSchema: Boolean) : Unit = {
val (storedKeySchema, storedValueSchema) = oldSchema
val (keySchema, valueSchema) = newSchema

def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: Boolean): Unit = {
if (fm.exists(schemaFileLocation)) {
logDebug(s"Schema file for provider $providerId exists. Comparing with provided schema.")
val (storedKeySchema, storedValueSchema) = readSchemaFile()
if (storedKeySchema.equals(keySchema) &&
(ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
// schema is exactly same
} else if (!schemasCompatible(storedKeySchema, keySchema)) {
throw StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
keySchema.toString)
} else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema)) {
throw StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
valueSchema.toString)
} else {
logInfo("Detected schema change which is compatible. Allowing to put rows.")
}
if (storedKeySchema.equals(keySchema) &&
(ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
// schema is exactly same
} else if (!schemasCompatible(storedKeySchema, keySchema)) {
throw StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
keySchema.toString)
} else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema)) {
throw StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
valueSchema.toString)
} else {
// schema doesn't exist, create one now
logDebug(s"Schema file for provider $providerId doesn't exist. Creating one.")
createSchemaFile(keySchema, valueSchema)
logInfo("Detected schema change which is compatible. Allowing to put rows.")
}
}

Expand All @@ -82,7 +87,20 @@ class StateSchemaCompatibilityChecker(
}
}

def createSchemaFile(keySchema: StructType, valueSchema: StructType): Unit = {
/**
* Function to read and return the existing key and value schema from the schema file, if it
* exists
* @return - Option of (keySchema, valueSchema) if the schema file exists, None otherwise
*/
private def getExistingKeyAndValueSchema(): Option[(StructType, StructType)] = {
if (fm.exists(schemaFileLocation)) {
Some(readSchemaFile())
} else {
None
}
}

private def createSchemaFile(keySchema: StructType, valueSchema: StructType): Unit = {
createSchemaFile(keySchema, valueSchema, schemaWriter)
}

Expand All @@ -103,10 +121,85 @@ class StateSchemaCompatibilityChecker(
}
}

def validateAndMaybeEvolveStateSchema(
newKeySchema: StructType,
newValueSchema: StructType,
ignoreValueSchema: Boolean): Unit = {
val existingSchema = getExistingKeyAndValueSchema()
if (existingSchema.isEmpty) {
// write the schema file if it doesn't exist
createSchemaFile(newKeySchema, newValueSchema)
} else {
// validate if the new schema is compatible with the existing schema
check(existingSchema.get, (newKeySchema, newValueSchema), ignoreValueSchema)
}
}

private def schemaFile(storeCpLocation: Path): Path =
new Path(new Path(storeCpLocation, "_metadata"), "schema")
}

object StateSchemaCompatibilityChecker {
val VERSION = 2

private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
if (!UnsafeRowUtils.isBinaryStable(schema)) {
throw new SparkUnsupportedOperationException(
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
messageParameters = Map("schema" -> schema.json)
)
}
}

/**
* Function to validate the schema of the state store and maybe evolve it if needed.
* We also verify for binary inequality columns in the schema and disallow them. We then perform
* key and value schema validation. Depending on the passed configs, a warning might be logged
* or an exception might be thrown if the schema is not compatible.
*
* @param stateInfo - StatefulOperatorStateInfo containing the state store information
* @param hadoopConf - Hadoop configuration
* @param newKeySchema - New key schema
* @param newValueSchema - New value schema
* @param sessionState - session state used to retrieve session config
* @param extraOptions - any extra options to be passed for StateStoreConf creation
* @param storeName - optional state store name
*/
def validateAndMaybeEvolveStateSchema(
stateInfo: StatefulOperatorStateInfo,
hadoopConf: Configuration,
newKeySchema: StructType,
newValueSchema: StructType,
sessionState: SessionState,
extraOptions: Map[String, String] = Map.empty,
storeName: String = StateStoreId.DEFAULT_STORE_NAME): Unit = {
// SPARK-47776: collation introduces the concept of binary (in)equality, which means
// in some collation we no longer be able to just compare the binary format of two
// UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be "semantically"
// same in case insensitive collation.
// State store is basically key-value storage, and the most provider implementations
// rely on the fact that all the columns in the key schema support binary equality.
// We need to disallow using binary inequality column in the key schema, before we
// could support this in majority of state store providers (or high-level of state
// store.)
disallowBinaryInequalityColumn(newKeySchema)

val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
val providerId = StateStoreProviderId(StateStoreId(stateInfo.checkpointLocation,
stateInfo.operatorId, 0, storeName), stateInfo.queryRunId)
val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf)
// regardless of configuration, we check compatibility to at least write schema file
// if necessary
// if the format validation for value schema is disabled, we also disable the schema
// compatibility checker for value schema as well.
val result = Try(
checker.validateAndMaybeEvolveStateSchema(newKeySchema, newValueSchema,
ignoreValueSchema = !storeConf.formatValidationCheckValue)
).toEither.fold(Some(_), _ => None)

// if schema validation is enabled and an exception is thrown, we re-throw it and fail the query
if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
throw result.get
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkContext, SparkEnv, SparkException, SparkUnsupportedOperationException}
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
Expand Down Expand Up @@ -524,9 +523,6 @@ object StateStore extends Logging {
@GuardedBy("loadedProviders")
private val loadedProviders = new mutable.HashMap[StateStoreProviderId, StateStoreProvider]()

@GuardedBy("loadedProviders")
private val schemaValidated = new mutable.HashMap[StateStoreProviderId, Option[Throwable]]()

private val maintenanceThreadPoolLock = new Object

// Shared exception between threads in thread pool that the scheduling thread
Expand Down Expand Up @@ -649,15 +645,6 @@ object StateStore extends Logging {
storeProvider.getStore(version)
}

private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
if (!UnsafeRowUtils.isBinaryStable(schema)) {
throw new SparkUnsupportedOperationException(
errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
messageParameters = Map("schema" -> schema.json)
)
}
}

private def getStateStoreProvider(
storeProviderId: StateStoreProviderId,
keySchema: StructType,
Expand All @@ -670,40 +657,6 @@ object StateStore extends Logging {
loadedProviders.synchronized {
startMaintenanceIfNeeded(storeConf)

if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) {
val result = schemaValidated.getOrElseUpdate(storeProviderId, {
// SPARK-47776: collation introduces the concept of binary (in)equality, which means
// in some collation we no longer be able to just compare the binary format of two
// UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be "semantically"
// same in case insensitive collation.
// State store is basically key-value storage, and the most provider implementations
// rely on the fact that all the columns in the key schema support binary equality.
// We need to disallow using binary inequality column in the key schema, before we
// could support this in majority of state store providers (or high-level of state
// store.)
disallowBinaryInequalityColumn(keySchema)

val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf)
// regardless of configuration, we check compatibility to at least write schema file
// if necessary
// if the format validation for value schema is disabled, we also disable the schema
// compatibility checker for value schema as well.
val ret = Try(
checker.check(keySchema, valueSchema,
ignoreValueSchema = !storeConf.formatValidationCheckValue)
).toEither.fold(Some(_), _ => None)
if (storeConf.stateSchemaCheckEnabled) {
ret
} else {
None
}
})

if (result.isDefined) {
throw result.get
}
}

// SPARK-42567 - Track load time for state store provider and log warning if takes longer
// than 2s.
val (provider, loadTimeMs) = Utils.timeTakenMs {
Expand Down
Loading

0 comments on commit a50b30d

Please sign in to comment.