-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52968][SS] Emit additional state store metrics #51679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
) | ||
|
||
// Refresh the recorded metrics after loading from snapshot | ||
recordedMetrics = Some(metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we set recordedMetrics here but not in load?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that load can be read only. For more context, metricsOpt is how the RocksDBMetrics is accessed. It uses recordedMetrics which was previously only updated in commit(). This caused metrics from loadFromSnapshot to not be up to date since it is currently only used as read-only which does not commit().
I added the metrics refresh in load().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dylanwong250 please add comment to say why you added this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment in both load() and loadFromSnapshot().
When I added recordedMetrics = Some(metrics)
in load() it caused some tests to fail. This was due to cases when abort() was called and then after that metrics were expected to be empty. I fixed this by clearing the metrics in rollback. I will see if that fixes the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dylanwong250 Why do we want the metrics to be empty after rollback?
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
@@ -1350,6 +1392,7 @@ class RocksDB( | |||
totalSSTFilesBytes, | |||
nativeOpsLatencyMicros, | |||
commitLatencyMs, | |||
loadMetrics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any concern about race conditions if we're not creating a copy of the maps here? I know that this is a pre-existing pattern, but could you just verify with a quick test that if we get the metrics, reload the state store (resetting the maps), and then read the metrics, that the metrics are correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. It seems that the existing commitLatencyMs also has the issue where the metrics in the maps from the call to metricsOpt can change. I added a clone for these maps and added a test.
) | ||
|
||
// Refresh the recorded metrics after loading from snapshot | ||
recordedMetrics = Some(metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you doing this? This pulls the rocksdb engine metrics right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Response in the other comment thread related to this.
) | ||
|
||
// Refresh the recorded metrics after loading from snapshot | ||
recordedMetrics = Some(metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dylanwong250 please add comment to say why you added this
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
Outdated
Show resolved
Hide resolved
saveCheckpointMetrics = | ||
saveCheckpointMetrics.copy( | ||
// Round up to 1ms to reassure that we've logged successfully and avoid flaky tests | ||
saveZipFilesTimeMs = Some(Math.max(zipFilesTimeMs, 1L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
@@ -1101,7 +1144,8 @@ class RocksDB( | |||
numInternalKeysOnLoadedVersion = numInternalKeysOnWritingVersion | |||
loadedVersion = newVersion | |||
commitLatencyMs ++= Map( | |||
"fileSync" -> fileSyncTimeMs | |||
"fileSync" -> fileSyncTimeMs, | |||
"saveZipFiles" -> fileManagerMetrics.saveZipFilesTimeMs.getOrElse(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there should be a better way to do this. Also even if there was no zip upload in this commit, this would return the metrics from when maintenance thread did upload. That can confuse the user or eng debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. I added a flag to only set this metric in commit when uploadSnapshot has been called which clears and recreates the fileManagerMetrics.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
) | ||
|
||
// Refresh the recorded metrics after loading from snapshot | ||
recordedMetrics = Some(metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dylanwong250 Why do we want the metrics to be empty after rollback?
@@ -1312,7 +1376,8 @@ class RocksDB( | |||
pinnedBlocksMemUsage, | |||
totalSSTFilesBytes, | |||
nativeOpsLatencyMicros, | |||
commitLatencyMs, | |||
commitLatencyMs.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making this change, can you just add a quick comment explaining why we need to clone?
Change looks good mostly. Besides nits, just had 1 question about rollback |
What changes were proposed in this pull request?
Add additional metrics in structured streaming:
State Store Commit Metrics
State Store Load Metrics
Why are the changes needed?
Currently there are no metrics emitted related to loading the state store and replaying the change log.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests to verify that the new metrics are populated and that query progress contains the correct metrics.
Was this patch authored or co-authored using generative AI tooling?
No