-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-52968][SS] Emit additional state store metrics #51679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4018608
d9cdeaf
86dc65c
4878bf0
97bf9ac
dc51917
c4f256b
3b99287
76c2c35
5fab7d4
f9d76b4
72ba026
a91d851
8590307
913b6c1
bae64c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,6 +158,8 @@ class RocksDB( | |
private val byteArrayPair = new ByteArrayPair() | ||
private val commitLatencyMs = new mutable.HashMap[String, Long]() | ||
|
||
private val loadMetrics = new mutable.HashMap[String, Long]() | ||
|
||
private val acquireLock = new Object | ||
|
||
@volatile private var db: NativeRocksDB = _ | ||
|
@@ -583,16 +585,28 @@ class RocksDB( | |
version: Long, | ||
stateStoreCkptId: Option[String] = None, | ||
readOnly: Boolean = false): RocksDB = { | ||
val startTime = System.currentTimeMillis() | ||
|
||
assert(version >= 0) | ||
acquire(LoadStore) | ||
recordedMetrics = None | ||
// Reset the load metrics before loading | ||
loadMetrics.clear() | ||
|
||
logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with stateStoreCkptId: ${ | ||
MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}") | ||
if (stateStoreCkptId.isDefined || enableStateStoreCheckpointIds && version == 0) { | ||
loadWithCheckpointId(version, stateStoreCkptId, readOnly) | ||
} else { | ||
loadWithoutCheckpointId(version, readOnly) | ||
} | ||
|
||
// Record the metrics after loading | ||
val duration = System.currentTimeMillis() - startTime | ||
loadMetrics ++= Map( | ||
"load" -> duration | ||
) | ||
|
||
this | ||
} | ||
|
||
|
@@ -608,9 +622,13 @@ class RocksDB( | |
* Source. | ||
*/ | ||
def loadFromSnapshot(snapshotVersion: Long, endVersion: Long): RocksDB = { | ||
val startTime = System.currentTimeMillis() | ||
|
||
assert(snapshotVersion >= 0 && endVersion >= snapshotVersion) | ||
acquire(LoadStore) | ||
recordedMetrics = None | ||
loadMetrics.clear() | ||
|
||
logInfo( | ||
log"Loading snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " + | ||
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.") | ||
|
@@ -627,6 +645,14 @@ class RocksDB( | |
} | ||
// Report this snapshot version to the coordinator | ||
reportSnapshotUploadToCoordinator(snapshotVersion) | ||
|
||
// Record the metrics after loading | ||
loadMetrics ++= Map( | ||
"loadFromSnapshot" -> (System.currentTimeMillis() - startTime) | ||
) | ||
|
||
// Refresh the recorded metrics after loading from snapshot | ||
recordedMetrics = Some(metrics) | ||
|
||
this | ||
} | ||
|
||
|
@@ -692,6 +718,8 @@ class RocksDB( | |
* Replay change log from the loaded version to the target version. | ||
*/ | ||
private def replayChangelog(versionsAndUniqueIds: Array[(Long, Option[String])]): Unit = { | ||
val startTime = System.currentTimeMillis() | ||
|
||
assert(!versionsAndUniqueIds.isEmpty && versionsAndUniqueIds.head._1 == loadedVersion + 1, | ||
s"Replay changelog should start from one version after loadedVersion: $loadedVersion," + | ||
s" but it is not." | ||
|
@@ -740,6 +768,12 @@ class RocksDB( | |
if (changelogReader != null) changelogReader.closeIfNeeded() | ||
} | ||
} | ||
|
||
val duration = System.currentTimeMillis() - startTime | ||
loadMetrics ++= Map( | ||
"replayChangelog" -> duration, | ||
"numReplayChangeLogFiles" -> versionsAndUniqueIds.length | ||
) | ||
} | ||
|
||
/** | ||
|
@@ -1066,7 +1100,14 @@ class RocksDB( | |
// ensure that changelog files are always written | ||
try { | ||
assert(changelogWriter.isDefined) | ||
changelogWriter.foreach(_.commit()) | ||
val changeLogWriterCommitTimeMs = timeTakenMs { | ||
changelogWriter.foreach(_.commit()) | ||
} | ||
// Record the commit time for the changelog writer | ||
commitLatencyMs ++= Map( | ||
"changeLogWriterCommit" -> changeLogWriterCommitTimeMs | ||
) | ||
|
||
if (!isUploaded) { | ||
snapshot.foreach(snapshotsToUploadQueue.offer) | ||
} | ||
|
@@ -1101,7 +1142,8 @@ class RocksDB( | |
numInternalKeysOnLoadedVersion = numInternalKeysOnWritingVersion | ||
loadedVersion = newVersion | ||
commitLatencyMs ++= Map( | ||
"fileSync" -> fileSyncTimeMs | ||
"fileSync" -> fileSyncTimeMs, | ||
"saveZipFiles" -> fileManagerMetrics.saveZipFilesTimeMs.getOrElse(0L) | ||
|
||
) | ||
recordedMetrics = Some(metrics) | ||
logInfo(log"Committed ${MDC(LogKeys.VERSION_NUM, newVersion)}, " + | ||
|
@@ -1350,6 +1392,7 @@ class RocksDB( | |
totalSSTFilesBytes, | ||
nativeOpsLatencyMicros, | ||
commitLatencyMs, | ||
loadMetrics, | ||
|
||
bytesCopied = fileManagerMetrics.bytesCopied, | ||
filesCopied = fileManagerMetrics.filesCopied, | ||
filesReused = fileManagerMetrics.filesReused, | ||
|
@@ -2027,6 +2070,7 @@ case class RocksDBMetrics( | |
totalSSTFilesBytes: Long, | ||
nativeOpsHistograms: Map[String, RocksDBNativeHistogram], | ||
lastCommitLatencyMs: Map[String, Long], | ||
lastLoadMetrics: Map[String, Long], | ||
dylanwong250 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
filesCopied: Long, | ||
bytesCopied: Long, | ||
filesReused: Long, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dylanwong250 please can we reset this
commitLatencyMs
map too just like you do forloadMetrics
. We currently don't reset it and might be returning metrics from previous batch (existing bug). Thanks.