Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class RocksDB(
private val byteArrayPair = new ByteArrayPair()
private val commitLatencyMs = new mutable.HashMap[String, Long]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 please can we reset this commitLatencyMs map too just like you do for loadMetrics. We currently don't reset it and might be returning metrics from previous batch (existing bug). Thanks.


private val loadMetrics = new mutable.HashMap[String, Long]()

private val acquireLock = new Object

@volatile private var db: NativeRocksDB = _
Expand Down Expand Up @@ -583,16 +585,28 @@ class RocksDB(
version: Long,
stateStoreCkptId: Option[String] = None,
readOnly: Boolean = false): RocksDB = {
val startTime = System.currentTimeMillis()

assert(version >= 0)
acquire(LoadStore)
recordedMetrics = None
// Reset the load metrics before loading
loadMetrics.clear()

logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with stateStoreCkptId: ${
MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
if (stateStoreCkptId.isDefined || enableStateStoreCheckpointIds && version == 0) {
loadWithCheckpointId(version, stateStoreCkptId, readOnly)
} else {
loadWithoutCheckpointId(version, readOnly)
}

// Record the metrics after loading
val duration = System.currentTimeMillis() - startTime
loadMetrics ++= Map(
"load" -> duration
)

this
}

Expand All @@ -608,9 +622,13 @@ class RocksDB(
* Source.
*/
def loadFromSnapshot(snapshotVersion: Long, endVersion: Long): RocksDB = {
val startTime = System.currentTimeMillis()

assert(snapshotVersion >= 0 && endVersion >= snapshotVersion)
acquire(LoadStore)
recordedMetrics = None
loadMetrics.clear()

logInfo(
log"Loading snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " +
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.")
Expand All @@ -627,6 +645,14 @@ class RocksDB(
}
// Report this snapshot version to the coordinator
reportSnapshotUploadToCoordinator(snapshotVersion)

// Record the metrics after loading
loadMetrics ++= Map(
"loadFromSnapshot" -> (System.currentTimeMillis() - startTime)
)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we set recordedMetrics here but not in load?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that load can be read only. For more context, metricsOpt is how the RocksDBMetrics is accessed. It uses recordedMetrics which was previously only updated in commit(). This caused metrics from loadFromSnapshot to not be up to date since it is currently only used as read-only which does not commit().

I added the metrics refresh in load().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 please add comment to say why you added this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment in both load() and loadFromSnapshot().

When I added recordedMetrics = Some(metrics) in load() it caused some tests to fail. This was due to cases when abort() was called and then after that metrics were expected to be empty. I fixed this by clearing the metrics in rollback. I will see if that fixes the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 Why do we want the metrics to be empty after rollback?

Copy link
Contributor Author

@dylanwong250 dylanwong250 Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running into a test case where in TransformWithStateExec there is the statement:

  if (isStreaming) {
    store.commit()
  } else {
    store.abort()
  }

The abort path would be taken and then the metrics would be set. But doing the set I was getting NPEs because the metrics did not exist in the metrics Map. I think this has something to do with how metrics are being initialized when isStreaming = false for TransformWithStateExec. The test that was failing was testTransformWithState.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed with Micheal to remove the refresh of the metrics on load() since these metrics are only currently reported on commit and the additional calls could increase latency of loading the store.

this
}

Expand Down Expand Up @@ -692,6 +718,8 @@ class RocksDB(
* Replay change log from the loaded version to the target version.
*/
private def replayChangelog(versionsAndUniqueIds: Array[(Long, Option[String])]): Unit = {
val startTime = System.currentTimeMillis()

assert(!versionsAndUniqueIds.isEmpty && versionsAndUniqueIds.head._1 == loadedVersion + 1,
s"Replay changelog should start from one version after loadedVersion: $loadedVersion," +
s" but it is not."
Expand Down Expand Up @@ -740,6 +768,12 @@ class RocksDB(
if (changelogReader != null) changelogReader.closeIfNeeded()
}
}

val duration = System.currentTimeMillis() - startTime
loadMetrics ++= Map(
"replayChangelog" -> duration,
"numReplayChangeLogFiles" -> versionsAndUniqueIds.length
)
}

/**
Expand Down Expand Up @@ -1066,7 +1100,14 @@ class RocksDB(
// ensure that changelog files are always written
try {
assert(changelogWriter.isDefined)
changelogWriter.foreach(_.commit())
val changeLogWriterCommitTimeMs = timeTakenMs {
changelogWriter.foreach(_.commit())
}
// Record the commit time for the changelog writer
commitLatencyMs ++= Map(
"changeLogWriterCommit" -> changeLogWriterCommitTimeMs
)

if (!isUploaded) {
snapshot.foreach(snapshotsToUploadQueue.offer)
}
Expand Down Expand Up @@ -1101,7 +1142,8 @@ class RocksDB(
numInternalKeysOnLoadedVersion = numInternalKeysOnWritingVersion
loadedVersion = newVersion
commitLatencyMs ++= Map(
"fileSync" -> fileSyncTimeMs
"fileSync" -> fileSyncTimeMs,
"saveZipFiles" -> fileManagerMetrics.saveZipFilesTimeMs.getOrElse(0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should be a better way to do this. Also even if there was no zip upload in this commit, this would return the metrics from when maintenance thread did upload. That can confuse the user or eng debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. I added a flag to only set this metric in commit when uploadSnapshot has been called which clears and recreates the fileManagerMetrics.

)
recordedMetrics = Some(metrics)
logInfo(log"Committed ${MDC(LogKeys.VERSION_NUM, newVersion)}, " +
Expand Down Expand Up @@ -1350,6 +1392,7 @@ class RocksDB(
totalSSTFilesBytes,
nativeOpsLatencyMicros,
commitLatencyMs,
loadMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern about race conditions if we're not creating a copy of the maps here? I know that this is a pre-existing pattern, but could you just verify with a quick test that if we get the metrics, reload the state store (resetting the maps), and then read the metrics, that the metrics are correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It seems that the existing commitLatencyMs also has the issue where the metrics in the maps from the call to metricsOpt can change. I added a clone for these maps and added a test.

bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
Expand Down Expand Up @@ -2027,6 +2070,7 @@ case class RocksDBMetrics(
totalSSTFilesBytes: Long,
nativeOpsHistograms: Map[String, RocksDBNativeHistogram],
lastCommitLatencyMs: Map[String, Long],
lastLoadMetrics: Map[String, Long],
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
import org.apache.spark.util.Utils.timeTakenMs

/**
* Class responsible for syncing RocksDB checkpoint files from local disk to DFS.
Expand Down Expand Up @@ -267,7 +268,7 @@ class RocksDBFileManager(
fileMapping: Map[String, RocksDBSnapshotFile],
columnFamilyMapping: Option[Map[String, ColumnFamilyInfo]] = None,
maxColumnFamilyId: Option[Short] = None,
checkpointUniqueId: Option[String] = None): Unit = {
checkpointUniqueId: Option[String] = None): RocksDBCheckpointMetadata = {
logFilesInDir(checkpointDir, log"Saving checkpoint files " +
log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
Expand Down Expand Up @@ -299,22 +300,33 @@ class RocksDBFileManager(
logDebug(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" +
log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")

if (version <= 1 && numKeys <= 0) {
// If we're writing the initial version and there's no data, we have to explicitly initialize
// the root directory. Normally saveImmutableFilesToDfs will do this initialization, but
// when there's no data that method won't write any files, and zipToDfsFile uses the
// CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories.
// Moreover, once we disable to track the number of keys, in which the numKeys is -1, we
// still need to create the initial dfs root directory anyway.
if (!rootDirChecked) {
val path = new Path(dfsRootDir)
if (!fm.exists(path)) fm.mkdirs(path)
rootDirChecked = true
val (_, zipFilesTimeMs) = timeTakenMs {
if (version <= 1 && numKeys <= 0) {
// If we're writing the initial version and there's no data, we have to initialize
// the root directory. Normally saveImmutableFilesToDfs will do this initialization, but
// when there's no data that method won't write any files, and zipToDfsFile uses the
// CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories.
// Moreover, once we disable to track the number of keys, in which the numKeys is -1, we
// still need to create the initial dfs root directory anyway.
if (!rootDirChecked) {
val path = new Path(dfsRootDir)
if (!fm.exists(path)) fm.mkdirs(path)
rootDirChecked = true
}
}
zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version, checkpointUniqueId))
logInfo(log"Saved checkpoint file for version ${MDC(LogKeys.VERSION_NUM, version)} " +
log"checkpointUniqueId: ${MDC(LogKeys.UUID, checkpointUniqueId.getOrElse(""))}")
}
zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version, checkpointUniqueId))
logInfo(log"Saved checkpoint file for version ${MDC(LogKeys.VERSION_NUM, version)} " +
log"checkpointUniqueId: ${MDC(LogKeys.UUID, checkpointUniqueId.getOrElse(""))}")

// populate the SaveCheckpointMetrics
saveCheckpointMetrics =
saveCheckpointMetrics.copy(
// Round up to 1ms to reassure that we've logged successfully and avoid flaky tests
saveZipFilesTimeMs = Some(Math.max(zipFilesTimeMs, 1L))
)

metadata
}

/**
Expand Down Expand Up @@ -962,7 +974,9 @@ case class RocksDBFileManagerMetrics(
bytesCopied: Long,
filesReused: Long,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
zipFileBytesUncompressed: Option[Long] = None)
zipFileBytesUncompressed: Option[Long] = None,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
saveZipFilesTimeMs: Option[Long] = None)

/**
* Metrics to return when requested but no operation has been performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ private[sql] class RocksDBStateStoreProvider
def commitLatencyMs(typ: String): Long =
rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L)

def loadMetrics(typ: String): Long =
rocksDBMetrics.lastLoadMetrics.getOrElse(typ, 0L)

def nativeOpsLatencyMillis(typ: String): Long = {
rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0)
}
Expand Down Expand Up @@ -331,6 +334,13 @@ private[sql] class RocksDBStateStoreProvider
CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"),
CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"),
CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"),
CUSTOM_METRIC_CHANGE_LOG_WRITER_COMMIT_TIME -> commitLatencyMs("changeLogWriterCommit"),
CUSTOM_METRIC_SAVE_ZIP_FILES_TIME -> commitLatencyMs("saveZipFiles"),

CUSTOM_METRIC_LOAD_FROM_SNAPSHOT_TIME -> loadMetrics("loadFromSnapshot"),
CUSTOM_METRIC_LOAD_TIME -> loadMetrics("load"),
CUSTOM_METRIC_REPLAY_CHANGE_LOG -> loadMetrics("replayChangelog"),
CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES -> loadMetrics("numReplayChangeLogFiles"),
CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied,
CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused,
Expand Down Expand Up @@ -927,6 +937,25 @@ object RocksDBStateStoreProvider {
"rocksdbFilesReused", "RocksDB: file manager - files reused")
val CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED = StateStoreCustomSizeMetric(
"rocksdbZipFileBytesUncompressed", "RocksDB: file manager - uncompressed zip file bytes")
val CUSTOM_METRIC_CHANGE_LOG_WRITER_COMMIT_TIME = StateStoreCustomTimingMetric(
"rocksdbChangeLogWriterCommitLatencyMs",
"RocksDB: commit - changelog checkpoint time")
val CUSTOM_METRIC_SAVE_ZIP_FILES_TIME = StateStoreCustomTimingMetric(
"rocksdbSaveZipFilesLatencyMs",
"RocksDB: commit - zip files sync to external storage time")

val CUSTOM_METRIC_LOAD_FROM_SNAPSHOT_TIME = StateStoreCustomTimingMetric(
"rocksdbLoadFromSnapshotLatencyMs",
"RocksDB: load from snapshot - time taken to load the store from snapshot")
val CUSTOM_METRIC_LOAD_TIME = StateStoreCustomTimingMetric(
"rocksdbLoadLatencyMs",
"RocksDB: load - time taken to load the store")
val CUSTOM_METRIC_REPLAY_CHANGE_LOG = StateStoreCustomTimingMetric(
"rocksdbReplayChangeLogLatencyMs",
"RocksDB: load replay change log - time taken to replay the change log")
val CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES = StateStoreCustomSizeMetric(
"rocksdbNumReplayChangelogFiles",
"RocksDB: load replay change log - number of change log files replayed")

val CUSTOM_METRIC_BLOCK_CACHE_MISS = StateStoreCustomSumMetric(
"rocksdbReadBlockCacheMissCount",
Expand Down Expand Up @@ -981,13 +1010,16 @@ object RocksDBStateStoreProvider {
CUSTOM_METRIC_FLUSH_TIME, CUSTOM_METRIC_COMMIT_COMPACT_TIME,
CUSTOM_METRIC_CHECKPOINT_TIME, CUSTOM_METRIC_FILESYNC_TIME,
CUSTOM_METRIC_BYTES_COPIED, CUSTOM_METRIC_FILES_COPIED, CUSTOM_METRIC_FILES_REUSED,
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED, CUSTOM_METRIC_GET_COUNT, CUSTOM_METRIC_PUT_COUNT,
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED, CUSTOM_METRIC_CHANGE_LOG_WRITER_COMMIT_TIME,
CUSTOM_METRIC_SAVE_ZIP_FILES_TIME, CUSTOM_METRIC_GET_COUNT, CUSTOM_METRIC_PUT_COUNT,
CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS, CUSTOM_METRIC_BYTES_READ,
CUSTOM_METRIC_BYTES_WRITTEN, CUSTOM_METRIC_ITERATOR_BYTES_READ, CUSTOM_METRIC_STALL_TIME,
CUSTOM_METRIC_TOTAL_COMPACT_TIME, CUSTOM_METRIC_COMPACT_READ_BYTES,
CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES,
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS,
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES)
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES,
CUSTOM_METRIC_LOAD_FROM_SNAPSHOT_TIME, CUSTOM_METRIC_LOAD_TIME, CUSTOM_METRIC_REPLAY_CHANGE_LOG,
CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES)

val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreSnapshotLastUploadInstanceMetric()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
"rocksdbTotalBytesReadThroughIterator", "rocksdbTotalBytesWrittenByFlush",
"rocksdbPinnedBlocksMemoryUsage", "rocksdbNumInternalColFamiliesKeys",
"rocksdbNumExternalColumnFamilies", "rocksdbNumInternalColumnFamilies",
"SnapshotLastUploaded.partition_0_default"))
"SnapshotLastUploaded.partition_0_default", "rocksdbChangeLogWriterCommitLatencyMs",
"rocksdbSaveZipFilesLatencyMs", "rocksdbLoadFromSnapshotLatencyMs",
"rocksdbLoadLatencyMs", "rocksdbReplayChangeLogLatencyMs",
"rocksdbNumReplayChangelogFiles"))
}
} finally {
query.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,103 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
}

test("load metrics are populated correctly") {
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
val conf = dbConf

withDB(remoteDir, conf = conf) { db =>
db.load(0)
db.put("a", "5")
db.put("b", "5")
db.commit()

db.doMaintenance() // upload snapshot
db.rollback() // invalidate the db, so next load will reload from dfs

db.load(1)
db.put("a", "10")
db.put("b", "25")
db.commit()

val m1 = db.metricsOpt.get
assert(m1.lastLoadMetrics("load") > 0)
// since we called load, loadFromSnapshot should not be populated
assert(!m1.lastLoadMetrics.contains("loadFromSnapshot"))
}
}
}

testWithChangelogCheckpointingEnabled("load from snapshot metrics are populated correctly") {
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
// We want a snapshot for every two delta files
val conf = dbConf.copy(minDeltasForSnapshot = 1)

withDB(remoteDir, conf = conf) { db =>
db.load(0)
db.put("a", "5")
db.commit()
db.doMaintenance()

db.load(1)
db.put("b", "10")
db.commit()
db.doMaintenance()

db.loadFromSnapshot(0, 1)

val m1 = db.metricsOpt.get
assert(m1.lastLoadMetrics("loadFromSnapshot") > 0)
// since we called loadFromSnapshot, load should not be populated
assert(!m1.lastLoadMetrics.contains("load"))
assert(m1.lastLoadMetrics("replayChangelog") > 0)
assert(m1.lastLoadMetrics("numReplayChangeLogFiles") == 1)
}
}
}

testWithChangelogCheckpointingEnabled("commit metrics are populated correctly") {
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
val conf = dbConf.copy()

withDB(remoteDir, conf = conf) { db =>
db.load(0)
db.put("a", "5")
db.put("b", "5")
db.commit()
db.doMaintenance() // upload snapshot

val m1 = db.metricsOpt.get
assert(m1.lastCommitLatencyMs("fileSync") > 0)
// Since changelog checkpoint is enabled, we should populate this metric
assert(m1.lastCommitLatencyMs("changeLogWriterCommit") > 0)
}
}
}

testWithChangelogCheckpointingDisabled("commit metrics are populated correctly") {
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
val conf = dbConf.copy()

withDB(remoteDir, conf = conf) { db =>
db.load(0)
db.put("a", "5")
db.put("b", "5")
db.commit()
db.doMaintenance() // upload snapshot

val m1 = db.metricsOpt.get
assert(m1.lastCommitLatencyMs("fileSync") > 0)
// When changelog checkpoint is NOT enabled we should
// always populate this metric in the snapshot
assert(m1.lastCommitLatencyMs("saveZipFiles") > 0)
}
}
}

// Add tests to check valid and invalid values for max_open_files passed to the underlying
// RocksDB instance.
Seq("-1", "100", "1000").foreach { maxOpenFiles =>
Expand Down