Skip to content
Prev Previous commit
Next Next commit
Merge branch 'master' into SPARK-52968
  • Loading branch information
Dylan Wong committed Jul 29, 2025
commit dc519172e204bf4c5cfa75c5d848122525150b4a
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ class RocksDB(

private val loadMetrics = new mutable.HashMap[String, Long]()

private val acquireLock = new Object

@volatile private var db: NativeRocksDB = _
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None
private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
Expand Down Expand Up @@ -1221,25 +1219,20 @@ class RocksDB(
* Drop uncommitted changes, and roll back to previous version.
*/
def rollback(): Unit = {
acquire(RollbackStore)
try {
numKeysOnWritingVersion = numKeysOnLoadedVersion
numInternalKeysOnWritingVersion = numInternalKeysOnLoadedVersion
loadedVersion = -1L
lastCommitBasedStateStoreCkptId = None
lastCommittedStateStoreCkptId = None
loadedStateStoreCkptId = None
sessionStateStoreCkptId = None
lineageManager.clear()
changelogWriter.foreach(_.abort())
// Make sure changelogWriter gets recreated next time.
changelogWriter = None
// If we roll back we clear the recorded metrics
recordedMetrics = None
logInfo(log"Rolled back to ${MDC(LogKeys.VERSION_NUM, loadedVersion)}")
} finally {
release(RollbackStore)
}
numKeysOnWritingVersion = numKeysOnLoadedVersion
numInternalKeysOnWritingVersion = numInternalKeysOnLoadedVersion
loadedVersion = -1L
lastCommitBasedStateStoreCkptId = None
lastCommittedStateStoreCkptId = None
loadedStateStoreCkptId = None
sessionStateStoreCkptId = None
lineageManager.clear()
changelogWriter.foreach(_.abort())
// Make sure changelogWriter gets recreated next time.
changelogWriter = None
// If we roll back we clear the recorded metrics
recordedMetrics = None
logInfo(log"Rolled back to ${MDC(LogKeys.VERSION_NUM, loadedVersion)}")
}

def doMaintenance(): Unit = {
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.