Skip to content

[SPARK-52968][SS] Emit additional state store metrics #51679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

dylanwong250
Copy link
Contributor

@dylanwong250 dylanwong250 commented Jul 28, 2025

What changes were proposed in this pull request?

Add additional metrics in structured streaming:

State Store Commit Metrics

"rocksdbChangeLogWriterCommitLatencyMs"
"rocksdbSaveZipFilesLatencyMs"

State Store Load Metrics

"rocksdbLoadFromSnapshotLatencyMs"
"rocksdbLoadLatencyMs"
"rocksdbReplayChangeLogLatencyMs"
"rocksdbNumReplayChangelogFiles"

Why are the changes needed?

Currently there are no metrics emitted related to loading the state store and replaying the change log.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests to verify that the new metrics are populated and that query progress contains the correct metrics.

Was this patch authored or co-authored using generative AI tooling?

No

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we set recordedMetrics here but not in load?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that load can be read only. For more context, metricsOpt is how the RocksDBMetrics is accessed. It uses recordedMetrics which was previously only updated in commit(). This caused metrics from loadFromSnapshot to not be up to date since it is currently only used as read-only which does not commit().

I added the metrics refresh in load().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 please add comment to say why you added this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment in both load() and loadFromSnapshot().

When I added recordedMetrics = Some(metrics) in load() it caused some tests to fail. This was due to cases when abort() was called and then after that metrics were expected to be empty. I fixed this by clearing the metrics in rollback. I will see if that fixes the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 Why do we want the metrics to be empty after rollback?

@@ -1350,6 +1392,7 @@ class RocksDB(
totalSSTFilesBytes,
nativeOpsLatencyMicros,
commitLatencyMs,
loadMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern about race conditions if we're not creating a copy of the maps here? I know that this is a pre-existing pattern, but could you just verify with a quick test that if we get the metrics, reload the state store (resetting the maps), and then read the metrics, that the metrics are correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It seems that the existing commitLatencyMs also has the issue where the metrics in the maps from the call to metricsOpt can change. I added a clone for these maps and added a test.

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you doing this? This pulls the rocksdb engine metrics right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Response in the other comment thread related to this.

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 please add comment to say why you added this

saveCheckpointMetrics =
saveCheckpointMetrics.copy(
// Round up to 1ms to reassure that we've logged successfully and avoid flaky tests
saveZipFilesTimeMs = Some(Math.max(zipFilesTimeMs, 1L))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

@@ -1101,7 +1144,8 @@ class RocksDB(
numInternalKeysOnLoadedVersion = numInternalKeysOnWritingVersion
loadedVersion = newVersion
commitLatencyMs ++= Map(
"fileSync" -> fileSyncTimeMs
"fileSync" -> fileSyncTimeMs,
"saveZipFiles" -> fileManagerMetrics.saveZipFilesTimeMs.getOrElse(0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should be a better way to do this. Also even if there was no zip upload in this commit, this would return the metrics from when maintenance thread did upload. That can confuse the user or eng debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. I added a flag to only set this metric in commit when uploadSnapshot has been called which clears and recreates the fileManagerMetrics.

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 Why do we want the metrics to be empty after rollback?

@@ -1312,7 +1376,8 @@ class RocksDB(
pinnedBlocksMemUsage,
totalSSTFilesBytes,
nativeOpsLatencyMicros,
commitLatencyMs,
commitLatencyMs.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change, can you just add a quick comment explaining why we need to clone?

@liviazhu
Copy link
Contributor

Change looks good mostly. Besides nits, just had 1 question about rollback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants