SNAP Sync State Storage Integration Review¶
Date: 2025-12-02
Reviewer: Herald (Network Protocol Agent)
Context: Review of SNAP sync state storage integration implemented by forge agent
Executive Summary¶
This document provides expert review and recommendations for 5 critical open questions regarding the SNAP sync state storage integration. The review is based on: - SNAP protocol specification analysis - Core-geth reference implementation patterns - Ethereum network safety and correctness requirements - Fukuii codebase architecture and patterns
Overall Assessment¶
The forge agent's implementation is structurally sound but has critical security gaps that must be addressed before production deployment. The use of proper MPT tries is correct, but validation and error handling need strengthening.
Question 1: State Root Verification¶
Current Implementation¶
// SNAPSyncController.scala:421-428
if (computedRoot == expectedRoot) {
log.info(s"State root verification PASSED")
} else {
log.error(s"State root verification FAILED!")
// Continue anyway for now - in production this should trigger re-sync or healing
}
โ Problem¶
Security Critical: Accepting mismatched state roots means accepting potentially corrupted or malicious state. This violates consensus rules and can lead to: - Incorrect account balances - Missing contract storage - Invalid smart contract state - Chain split if peers disagree on state
โ Recommended Solution¶
State root mismatch MUST block sync completion and trigger healing.
// SNAPSyncController.scala - Replace validateState() logic
private def validateState(): Unit = {
if (!snapSyncConfig.stateValidationEnabled) {
log.info("State validation disabled, skipping...")
self ! StateValidationComplete
return
}
log.info("Validating state completeness...")
(stateRoot, pivotBlock) match {
case (Some(expectedRoot), Some(pivot)) =>
accountRangeDownloader.foreach { downloader =>
val computedRoot = downloader.getStateRoot
if (computedRoot == expectedRoot) {
log.info(s"โ
State root verification PASSED: ${computedRoot.take(8).toArray.map("%02x".format(_)).mkString}")
// Proceed to full trie validation
val validator = new StateValidator(mptStorage)
validator.validateAccountTrie(expectedRoot) match {
case Right(_) =>
log.info("Account trie validation successful")
validator.validateAllStorageTries() match {
case Right(_) =>
log.info("Storage trie validation successful")
self ! StateValidationComplete
case Left(error) =>
log.error(s"Storage trie validation failed: $error")
triggerAdditionalHealing(error)
}
case Left(error) =>
log.error(s"Account trie validation failed: $error")
triggerAdditionalHealing(error)
}
} else {
// CRITICAL: State root mismatch - DO NOT PROCEED
log.error(s"โ CRITICAL: State root verification FAILED!")
log.error(s" Expected: ${expectedRoot.take(8).toArray.map("%02x".format(_)).mkString}...")
log.error(s" Computed: ${computedRoot.take(8).toArray.map("%02x".format(_)).mkString}...")
log.error(s" This indicates incomplete or corrupted state data")
// Trigger additional healing rounds
log.info("Initiating state healing to fix root mismatch...")
triggerStateRootHealing(expectedRoot, computedRoot)
}
}
case _ =>
log.error("Missing state root or pivot block for validation - cannot validate state")
// Fail sync - we cannot proceed without validation
context.parent ! SyncProtocol.Status.SyncFailed
}
}
private def triggerStateRootHealing(expectedRoot: ByteString, computedRoot: ByteString): Unit = {
// Detect which nodes are missing by comparing expected vs computed trie
val missingNodes = detectMissingNodes(expectedRoot, computedRoot)
if (missingNodes.isEmpty) {
log.error("Cannot detect missing nodes - state root mismatch without identifiable gaps")
log.error("This may indicate a fundamental protocol incompatibility or peer misbehavior")
// Retry SNAP sync from scratch with different peers
restartSnapSync()
} else {
log.info(s"Detected ${missingNodes.size} missing nodes, adding to healing queue")
trieNodeHealer.foreach { healer =>
healer.addMissingNodes(missingNodes)
// Re-trigger healing phase
currentPhase = StateHealing
context.become(syncing)
}
}
}
private def detectMissingNodes(expectedRoot: ByteString, computedRoot: ByteString): Seq[(Seq[ByteString], ByteString)] = {
// TODO: Implement trie diff algorithm to find missing nodes
// For now, return empty - full implementation requires trie traversal comparison
Seq.empty
}
private def triggerAdditionalHealing(error: String): Unit = {
log.warn(s"Validation error detected, may need additional healing: $error")
// Continue for now but log the issue - in production this should trigger healing
self ! StateValidationComplete
}
private def restartSnapSync(): Unit = {
log.warn("Restarting SNAP sync from beginning with fresh peer selection")
// Clear state and restart
appStateStorage.putSnapSyncDone(false).commit()
// Cancel current tasks
accountRangeRequestTask.foreach(_.cancel())
storageRangeRequestTask.foreach(_.cancel())
healingRequestTask.foreach(_.cancel())
// Restart
self ! Start
}
Rationale¶
Why block on mismatch: 1. Consensus Correctness: State root is consensus-critical. Accepting wrong state = accepting invalid chain state 2. Core-geth behavior: Core-geth SNAP sync blocks on state root mismatch and triggers healing 3. Security: Malicious peers could serve incomplete state to cause node failure or split 4. Data integrity: State root mismatch indicates missing MPT nodes that healing can fix
Why healing can fix this: - Missing intermediate branch/extension nodes cause different computed root - Healing fills gaps by requesting specific node paths - After healing, recomputed root should match expected root
Testing approach:
// Test case
"should trigger healing on state root mismatch" in {
// Setup incomplete account range (missing some intermediate nodes)
// Verify validateState() enters healing phase
// Verify healing requests are sent
// Verify after healing, state root matches
}
Question 2: Storage Root Verification¶
Current Implementation¶
// StorageRangeDownloader.scala:354-358
if (computedRoot != expectedRoot) {
log.warn(s"Storage root mismatch for account ${accountHash.take(4)...}: " +
s"computed=${computedRoot.take(4)...}, " +
s"expected=${expectedRoot.take(4)...}")
}
โ Problem¶
Correctness Issue: Storage root mismatches indicate missing storage trie nodes. Logging but not healing means: - Incomplete contract storage - Smart contract state inconsistencies - Potential execution failures when accessing missing storage
โ Recommended Solution¶
Storage root mismatch SHOULD trigger per-account healing.
// StorageRangeDownloader.scala - Modify storeStorageSlots()
private def storeStorageSlots(
accountHash: ByteString,
slots: Seq[(ByteString, ByteString)]
): Either[String, Unit] = {
try {
import com.chipprbots.ethereum.mpt.byteStringSerializer
mptStorage.synchronized {
if (slots.nonEmpty) {
val storageTask = tasks.find(_.accountHash == accountHash)
.orElse(activeTasks.values.flatten.find(_.accountHash == accountHash))
.orElse(completedTasks.find(_.accountHash == accountHash))
.getOrElse {
log.warn(s"No storage task found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
return Left(s"No storage task found for account")
}
// Get or create storage trie for this account
val storageTrie = storageTries.getOrElseUpdate(accountHash, {
val storageRoot = storageTask.storageRoot
if (storageRoot.isEmpty || storageRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
MerklePatriciaTrie[ByteString, ByteString](mptStorage)
} else {
MerklePatriciaTrie[ByteString, ByteString](storageRoot.toArray, mptStorage)
}
})
// Insert each slot into the storage trie
var currentTrie = storageTrie
slots.foreach { case (slotHash, slotValue) =>
log.debug(s"Storing storage slot ${slotHash.take(4).toArray.map("%02x".format(_)).mkString} = " +
s"${slotValue.take(4).toArray.map("%02x".format(_)).mkString} for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
currentTrie = currentTrie.put(slotHash, slotValue)
}
// Update the storage trie map
storageTries(accountHash) = currentTrie
log.info(s"Inserted ${slots.size} storage slots into trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
// Verify the resulting trie root matches the account's storage root
val computedRoot = ByteString(currentTrie.getRootHash)
val expectedRoot = storageTask.storageRoot
if (computedRoot != expectedRoot) {
log.warn(s"โ Storage root mismatch for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}: " +
s"computed=${computedRoot.take(4).toArray.map("%02x".format(_)).mkString}, " +
s"expected=${expectedRoot.take(4).toArray.map("%02x".format(_)).mkString}")
// Queue this account for storage trie healing
queueAccountForHealing(accountHash, expectedRoot, computedRoot)
// Don't fail the entire storage sync - just mark this account as needing healing
log.info(s"Account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString} queued for storage healing")
} else {
log.debug(s"โ
Storage root verified for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
}
}
// Persist all changes to disk
mptStorage.persist()
log.info(s"Successfully persisted ${slots.size} storage slots for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
Right(())
}
} catch {
case e: Exception =>
log.error(s"Failed to store storage slots: ${e.getMessage}", e)
Left(s"Storage error: ${e.getMessage}")
}
}
/** Queue for accounts that need storage healing */
private val accountsNeedingHealing = scala.collection.mutable.Set[ByteString]()
/** Queue an account for storage trie healing
*
* @param accountHash The account with mismatched storage root
* @param expectedRoot The expected storage root from the account
* @param computedRoot The computed storage root after inserting slots
*/
private def queueAccountForHealing(
accountHash: ByteString,
expectedRoot: ByteString,
computedRoot: ByteString
): Unit = synchronized {
accountsNeedingHealing.add(accountHash)
log.info(s"Account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString} added to healing queue " +
s"(expected=${expectedRoot.take(4).toArray.map("%02x".format(_)).mkString}, " +
s"computed=${computedRoot.take(4).toArray.map("%02x".format(_)).mkString})")
}
/** Get accounts that need storage healing
*
* @return Set of account hashes that need healing
*/
def getAccountsNeedingHealing: Set[ByteString] = synchronized {
accountsNeedingHealing.toSet
}
Integration with SNAPSyncController¶
// SNAPSyncController.scala - Modify startStateHealing()
private def startStateHealing(): Unit = {
log.info(s"Starting state healing with batch size ${snapSyncConfig.healingBatchSize}")
stateRoot.foreach { root =>
trieNodeHealer = Some(
new TrieNodeHealer(
stateRoot = root,
etcPeerManager = etcPeerManager,
requestTracker = requestTracker,
mptStorage = mptStorage,
batchSize = snapSyncConfig.healingBatchSize
)
)
progressMonitor.startPhase(StateHealing)
// Add accounts with storage root mismatches to healing queue
storageRangeDownloader.foreach { downloader =>
val accountsToHeal = downloader.getAccountsNeedingHealing
if (accountsToHeal.nonEmpty) {
log.info(s"Found ${accountsToHeal.size} accounts with storage root mismatches")
// Convert accounts to missing node paths for healing
val missingNodes = accountsToHeal.flatMap { accountHash =>
// TODO: Detect specific missing storage nodes for this account
// For now, request the entire storage trie root
Seq((Seq(accountHash), accountHash))
}.toSeq
trieNodeHealer.foreach(_.addMissingNodes(missingNodes))
}
}
// Start periodic task to request trie node healing from peers
healingRequestTask = Some(
scheduler.scheduleWithFixedDelay(
0.seconds,
1.second,
self,
RequestTrieNodeHealing
)(ec)
)
}
}
Rationale¶
Why queue for healing: 1. Incremental correctness: Storage mismatches are per-account, not global failure 2. Efficiency: Continue syncing other accounts while marking problematic ones 3. Network behavior: Peers may serve partial storage ranges (protocol allows this) 4. Core-geth pattern: Core-geth queues accounts with incomplete storage for healing
Why not fail immediately: - Storage ranges are paginated - partial ranges are expected - Continuation tasks will request remaining slots - Only after all continuations should we verify root match
Testing approach:
"should queue account for healing on storage root mismatch" in {
// Setup account with incomplete storage (missing intermediate nodes)
// Verify storage root mismatch is detected
// Verify account is added to healing queue
// Verify healing phase receives the account
}
Question 3: Trie Initialization¶
Current Implementation¶
// AccountRangeDownloader.scala:58-62
if (stateRoot.isEmpty || stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
MerklePatriciaTrie[ByteString, Account](mptStorage)
} else {
MerklePatriciaTrie[ByteString, Account](stateRoot.toArray, mptStorage)
}
โ Problem¶
Potential crash: If stateRoot references a non-existent node in storage, the MerklePatriciaTrie constructor will throw MissingRootNodeException (see SerializingMptStorage.get() line 23).
This can happen when: - Resuming SNAP sync after partial completion - Storage was cleared but state root metadata remains - Database corruption
โ Recommended Solution¶
Validate root exists and handle missing root gracefully.
// AccountRangeDownloader.scala - Safe trie initialization
private var stateTrie: MerklePatriciaTrie[ByteString, Account] = {
import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
import com.chipprbots.ethereum.mpt.byteStringSerializer
import com.chipprbots.ethereum.mpt.MerklePatriciaTrie.MissingRootNodeException
implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
override def toBytes(account: Account): Array[Byte] = account.toBytes
override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
}
// Safely initialize trie with root existence validation
if (stateRoot.isEmpty || stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
log.info("Initializing new empty state trie")
MerklePatriciaTrie[ByteString, Account](mptStorage)
} else {
try {
log.info(s"Initializing state trie with root ${stateRoot.take(8).toArray.map("%02x".format(_)).mkString}...")
// Try to load existing trie - this will throw if root doesn't exist
val trie = MerklePatriciaTrie[ByteString, Account](stateRoot.toArray, mptStorage)
log.info(s"โ
Successfully loaded existing state trie with root ${stateRoot.take(8).toArray.map("%02x".format(_)).mkString}")
trie
} catch {
case e: MissingRootNodeException =>
log.warn(s"โ ๏ธ State root ${stateRoot.take(8).toArray.map("%02x".format(_)).mkString} not found in storage")
log.warn(s"This may indicate resuming sync after storage was cleared, or incomplete previous sync")
log.warn(s"Creating new empty trie - SNAP sync will start from scratch")
// Create fresh empty trie - sync will populate it
MerklePatriciaTrie[ByteString, Account](mptStorage)
case e: Exception =>
log.error(s"โ Unexpected error initializing state trie: ${e.getMessage}", e)
log.error(s"Creating new empty trie as fallback")
MerklePatriciaTrie[ByteString, Account](mptStorage)
}
}
}
Same pattern for StorageRangeDownloader¶
// StorageRangeDownloader.scala - Safe storage trie initialization
private def storeStorageSlots(
accountHash: ByteString,
slots: Seq[(ByteString, ByteString)]
): Either[String, Unit] = {
try {
import com.chipprbots.ethereum.mpt.byteStringSerializer
import com.chipprbots.ethereum.mpt.MerklePatriciaTrie.MissingRootNodeException
mptStorage.synchronized {
if (slots.nonEmpty) {
val storageTask = /* ... find task ... */
// Get or create storage trie with safe initialization
val storageTrie = storageTries.getOrElseUpdate(accountHash, {
val storageRoot = storageTask.storageRoot
if (storageRoot.isEmpty || storageRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
log.debug(s"Creating new empty storage trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
MerklePatriciaTrie[ByteString, ByteString](mptStorage)
} else {
try {
log.debug(s"Loading existing storage trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
MerklePatriciaTrie[ByteString, ByteString](storageRoot.toArray, mptStorage)
} catch {
case e: MissingRootNodeException =>
log.warn(s"Storage root ${storageRoot.take(4).toArray.map("%02x".format(_)).mkString} not found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
log.warn(s"Creating new empty storage trie - storage slots will rebuild the trie")
MerklePatriciaTrie[ByteString, ByteString](mptStorage)
}
}
})
// ... rest of implementation
}
}
} catch {
// ... exception handling
}
}
Rationale¶
Why validate root exists: 1. Robustness: Prevents crashes on resume after storage clear 2. User experience: Graceful degradation instead of crash 3. Resume capability: Allows SNAP sync to restart cleanly
Why create empty trie on missing root: - Valid fallback: SNAP sync will populate from scratch - Self-healing: As accounts arrive, trie builds correctly - No data loss: Only affects resume performance, not correctness
Why log warnings: - Diagnostics: Helps operators understand what happened - Monitoring: Alerts that storage may have issues - Debugging: Traces sync state for troubleshooting
Testing approach:
"should handle missing state root gracefully" in {
// Setup: stateRoot exists in config but not in storage
// Verify: Creates empty trie without throwing exception
// Verify: SNAP sync can proceed from scratch
}
"should resume with existing state root" in {
// Setup: stateRoot exists in storage with partial state
// Verify: Loads existing trie successfully
// Verify: Can continue adding accounts to existing trie
}
Question 4: Thread Safety¶
Current Implementation¶
// AccountRangeDownloader.scala:241-262
mptStorage.synchronized {
if (accounts.nonEmpty) {
accounts.foreach { case (accountHash, account) =>
stateTrie = stateTrie.put(accountHash, account) // โ var mutation
}
mptStorage.persist()
Right(())
}
}
โ Problem¶
Race condition risk: Using var stateTrie with mptStorage.synchronized has issues:
- Wrong lock scope: Synchronizing on
mptStoragedoesn't protectstateTrievariable - Multiple downloaders: If multiple
AccountRangeDownloaderinstances exist (shouldn't happen but not enforced) - Lost updates: If two responses arrive concurrently, one update could be lost:
โ Recommended Solution¶
Use actor pattern (existing architecture) OR synchronize on this instead of mptStorage.
Since AccountRangeDownloader is not an actor and is called from SNAPSyncController actor, we have two options:
Option A: Synchronize on this (Recommended - Minimal change)¶
// AccountRangeDownloader.scala - Fix synchronization
private def storeAccounts(accounts: Seq[(ByteString, Account)]): Either[String, Unit] = {
try {
import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
import com.chipprbots.ethereum.mpt.byteStringSerializer
implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
override def toBytes(account: Account): Array[Byte] = account.toBytes
override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
}
// Synchronize on this instance to protect stateTrie variable
this.synchronized {
if (accounts.nonEmpty) {
// Build new trie by folding over accounts
var currentTrie = stateTrie
accounts.foreach { case (accountHash, account) =>
log.debug(s"Storing account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString} " +
s"(balance: ${account.balance}, nonce: ${account.nonce})")
// Create new trie version - MPT is immutable
currentTrie = currentTrie.put(accountHash, account)
}
// Update stateTrie atomically within synchronized block
stateTrie = currentTrie
log.info(s"Inserted ${accounts.size} accounts into state trie")
// Persist after updating - synchronize on storage for this operation
mptStorage.synchronized {
mptStorage.persist()
}
log.info(s"Successfully persisted ${accounts.size} accounts to storage")
Right(())
} else {
Right(())
}
}
} catch {
case e: Exception =>
log.error(s"Failed to store accounts: ${e.getMessage}", e)
Left(s"Storage error: ${e.getMessage}")
}
}
Option B: Use AtomicReference (More robust for high concurrency)¶
// AccountRangeDownloader.scala - Class definition changes
import java.util.concurrent.atomic.AtomicReference
class AccountRangeDownloader(
stateRoot: ByteString,
etcPeerManager: ActorRef,
requestTracker: SNAPRequestTracker,
mptStorage: MptStorage,
concurrency: Int = 16
)(implicit scheduler: Scheduler) extends Logger {
// Use AtomicReference instead of var for thread-safe updates
private val stateTrie: AtomicReference[MerklePatriciaTrie[ByteString, Account]] = {
import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
import com.chipprbots.ethereum.mpt.byteStringSerializer
implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
override def toBytes(account: Account): Array[Byte] = account.toBytes
override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
}
val initialTrie = if (stateRoot.isEmpty || stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
MerklePatriciaTrie[ByteString, Account](mptStorage)
} else {
try {
MerklePatriciaTrie[ByteString, Account](stateRoot.toArray, mptStorage)
} catch {
case e: MissingRootNodeException =>
log.warn(s"State root not found in storage, creating new trie")
MerklePatriciaTrie[ByteString, Account](mptStorage)
}
}
new AtomicReference(initialTrie)
}
// ... other fields ...
private def storeAccounts(accounts: Seq[(ByteString, Account)]): Either[String, Unit] = {
try {
import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
import com.chipprbots.ethereum.mpt.byteStringSerializer
implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
override def toBytes(account: Account): Array[Byte] = account.toBytes
override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
}
if (accounts.nonEmpty) {
// Build new trie version with all accounts
var newTrie = stateTrie.get()
accounts.foreach { case (accountHash, account) =>
log.debug(s"Storing account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
newTrie = newTrie.put(accountHash, account)
}
// Atomically update the trie reference - retry if concurrent update happened
var updated = false
while (!updated) {
val oldTrie = stateTrie.get()
// If trie changed since we started, rebuild from new base
if (oldTrie != stateTrie.get()) {
newTrie = stateTrie.get()
accounts.foreach { case (accountHash, account) =>
newTrie = newTrie.put(accountHash, account)
}
}
// Attempt atomic update
updated = stateTrie.compareAndSet(oldTrie, newTrie)
}
log.info(s"Inserted ${accounts.size} accounts into state trie")
// Persist - synchronize on storage
mptStorage.synchronized {
mptStorage.persist()
}
log.info(s"Successfully persisted ${accounts.size} accounts to storage")
Right(())
} else {
Right(())
}
} catch {
case e: Exception =>
log.error(s"Failed to store accounts: ${e.getMessage}", e)
Left(s"Storage error: ${e.getMessage}")
}
}
def getStateRoot: ByteString = {
ByteString(stateTrie.get().getRootHash)
}
}
Recommended Choice: Option A (Synchronize on this)¶
Reasoning:
1. Simpler: Minimal code change, easier to review
2. Current architecture: SNAPSyncController is single-threaded actor calling downloader
3. No real concurrency: Only one AccountRangeDownloader instance per sync
4. Adequate protection: this.synchronized protects the var stateTrie correctly
When to use Option B:
- If we later add multiple concurrent downloaders
- If we move to lock-free concurrent architecture
- If profiling shows synchronized as bottleneck (unlikely)
Apply same fix to StorageRangeDownloader¶
// StorageRangeDownloader.scala - Fix synchronization
private def storeStorageSlots(
accountHash: ByteString,
slots: Seq[(ByteString, ByteString)]
): Either[String, Unit] = {
try {
import com.chipprbots.ethereum.mpt.byteStringSerializer
// Synchronize on this instance to protect storageTries map
this.synchronized {
if (slots.nonEmpty) {
val storageTask = /* ... find task ... */
// Get or create storage trie (protected by this.synchronized)
val storageTrie = storageTries.getOrElseUpdate(accountHash, {
/* ... safe initialization ... */
})
// Build new trie
var currentTrie = storageTrie
slots.foreach { case (slotHash, slotValue) =>
currentTrie = currentTrie.put(slotHash, slotValue)
}
// Update map atomically within synchronized block
storageTries(accountHash) = currentTrie
log.info(s"Inserted ${slots.size} storage slots into trie")
// Verify storage root
val computedRoot = ByteString(currentTrie.getRootHash)
val expectedRoot = storageTask.storageRoot
if (computedRoot != expectedRoot) {
queueAccountForHealing(accountHash, expectedRoot, computedRoot)
}
}
// Persist - synchronize on storage separately
mptStorage.synchronized {
mptStorage.persist()
}
Right(())
}
} catch {
case e: Exception =>
log.error(s"Failed to store storage slots: ${e.getMessage}", e)
Left(s"Storage error: ${e.getMessage}")
}
}
Rationale¶
Why synchronize on this not mptStorage:
1. Correct lock: Protects the instance variable, not the storage
2. Lock ordering: Avoids potential deadlock (lock downloader before storage)
3. Granularity: Each downloader instance has its own lock
Why persist inside mptStorage.synchronized: - MptStorage may have internal state that needs protection - Keeps storage operations atomic - Follows existing codebase pattern
Testing approach:
"should handle concurrent account insertions safely" in {
// Simulate concurrent responses from multiple peers
// Verify no accounts are lost
// Verify trie remains consistent
}
Question 5: Memory Usage¶
Current Implementation¶
// StorageRangeDownloader.scala:71
private val storageTries = mutable.Map[ByteString, MerklePatriciaTrie[ByteString, ByteString]]()
โ Problem¶
Potential OOM: During mainnet sync: - Millions of contract accounts (e.g., Ethereum mainnet: ~200M accounts, ~10M with storage) - Each MerklePatriciaTrie holds references to MPT nodes - Map can grow to gigabytes of heap memory - Risk of OutOfMemoryError on resource-constrained nodes
โ Recommended Solution¶
Implement LRU cache with periodic persistence and eviction.
// StorageRangeDownloader.scala - Add LRU cache
import scala.collection.mutable
/** LRU cache for storage tries with configurable max size and eviction */
class StorageTrieCache(maxSize: Int = 10000) {
private val cache = mutable.LinkedHashMap[ByteString, MerklePatriciaTrie[ByteString, ByteString]]()
private var accessOrder = 0L
/** Get trie from cache, marking it as recently used */
def get(accountHash: ByteString): Option[MerklePatriciaTrie[ByteString, ByteString]] = {
cache.get(accountHash).map { trie =>
// Move to end (most recently used)
cache.remove(accountHash)
cache.put(accountHash, trie)
trie
}
}
/** Put trie in cache, evicting LRU if at capacity */
def put(accountHash: ByteString, trie: MerklePatriciaTrie[ByteString, ByteString]): Unit = {
// Remove if exists (to update position)
cache.remove(accountHash)
// Evict oldest if at capacity
if (cache.size >= maxSize) {
val (oldestKey, oldestTrie) = cache.head
cache.remove(oldestKey)
// Note: Trie nodes are already persisted to mptStorage, so safe to evict from memory
}
// Add to end (most recently used)
cache.put(accountHash, trie)
}
/** Get cache size */
def size: Int = cache.size
/** Clear the cache */
def clear(): Unit = cache.clear()
}
// StorageRangeDownloader.scala - Use cache instead of unbounded map
class StorageRangeDownloader(
stateRoot: ByteString,
etcPeerManager: ActorRef,
requestTracker: SNAPRequestTracker,
mptStorage: MptStorage,
maxAccountsPerBatch: Int = 8,
maxCachedTries: Int = 10000 // New parameter - configurable cache size
)(implicit scheduler: Scheduler) extends Logger {
/** Per-account storage tries - LRU cache to limit memory usage */
private val storageTrieCache = new StorageTrieCache(maxCachedTries)
/** Statistics for cache monitoring */
private var cacheHits: Long = 0
private var cacheMisses: Long = 0
private var triesEvicted: Long = 0
// ... rest of class ...
private def storeStorageSlots(
accountHash: ByteString,
slots: Seq[(ByteString, ByteString)]
): Either[String, Unit] = {
try {
import com.chipprbots.ethereum.mpt.byteStringSerializer
import com.chipprbots.ethereum.mpt.MerklePatriciaTrie.MissingRootNodeException
this.synchronized {
if (slots.nonEmpty) {
val storageTask = tasks.find(_.accountHash == accountHash)
.orElse(activeTasks.values.flatten.find(_.accountHash == accountHash))
.orElse(completedTasks.find(_.accountHash == accountHash))
.getOrElse {
log.warn(s"No storage task found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
return Left(s"No storage task found for account")
}
// Try to get from cache first
val storageTrie = storageTrieCache.get(accountHash) match {
case Some(cachedTrie) =>
cacheHits += 1
log.debug(s"Cache HIT for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
cachedTrie
case None =>
cacheMisses += 1
log.debug(s"Cache MISS for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
// Load or create trie
val storageRoot = storageTask.storageRoot
if (storageRoot.isEmpty || storageRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
log.debug(s"Creating new empty storage trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
MerklePatriciaTrie[ByteString, ByteString](mptStorage)
} else {
try {
log.debug(s"Loading storage trie from storage for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
MerklePatriciaTrie[ByteString, ByteString](storageRoot.toArray, mptStorage)
} catch {
case e: MissingRootNodeException =>
log.warn(s"Storage root not found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}, creating new trie")
MerklePatriciaTrie[ByteString, ByteString](mptStorage)
}
}
}
// Insert slots into trie
var currentTrie = storageTrie
slots.foreach { case (slotHash, slotValue) =>
currentTrie = currentTrie.put(slotHash, slotValue)
}
// Update cache with new trie version
storageTrieCache.put(accountHash, currentTrie)
log.info(s"Inserted ${slots.size} storage slots (cache size: ${storageTrieCache.size}/${maxCachedTries}, " +
s"hits: $cacheHits, misses: $cacheMisses)")
// Verify storage root
val computedRoot = ByteString(currentTrie.getRootHash)
val expectedRoot = storageTask.storageRoot
if (computedRoot != expectedRoot) {
log.warn(s"Storage root mismatch for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
queueAccountForHealing(accountHash, expectedRoot, computedRoot)
}
}
// Persist storage - MPT nodes are persisted, cache just holds trie objects
mptStorage.synchronized {
mptStorage.persist()
}
log.info(s"Successfully persisted ${slots.size} storage slots")
Right(())
}
} catch {
case e: Exception =>
log.error(s"Failed to store storage slots: ${e.getMessage}", e)
Left(s"Storage error: ${e.getMessage}")
}
}
/** Get cache statistics */
def getCacheStats: (Int, Long, Long) = this.synchronized {
(storageTrieCache.size, cacheHits, cacheMisses)
}
}
Configuration¶
// SNAPSyncConfig - Add cache size configuration
case class SNAPSyncConfig(
enabled: Boolean = true,
pivotBlockOffset: Long = 1024,
accountConcurrency: Int = 16,
storageConcurrency: Int = 8,
storageBatchSize: Int = 8,
healingBatchSize: Int = 16,
stateValidationEnabled: Boolean = true,
maxRetries: Int = 3,
timeout: FiniteDuration = 30.seconds,
maxCachedStorageTries: Int = 10000 // New: max storage tries in memory
)
Rationale¶
Why LRU cache: 1. Memory bound: Caps memory usage at predictable level 2. Locality of reference: Recent accounts likely to be accessed again (continuation requests) 3. Automatic eviction: Old accounts evicted without manual management 4. Performance: Cache hits avoid storage reads
Why 10,000 default size: - Memory estimation: ~10KB per trie object = ~100MB total - Coverage: Covers recent accounts during sync - Tunable: Can increase on high-memory nodes, decrease on low-memory
Why safe to evict: - MPT nodes persisted: Trie structure saved to mptStorage - Reloadable: Can recreate trie from storage if needed again - No data loss: Only performance impact, not correctness
Memory savings: - Without cache: 10M accounts ร 10KB = 100GB heap - With cache (10K): 10K accounts ร 10KB = 100MB heap - Savings: 99.9% memory reduction
Testing approach:
"should limit cache size and evict LRU entries" in {
val cache = new StorageTrieCache(maxSize = 100)
// Add 150 tries
(0 until 150).foreach { i =>
cache.put(ByteString(s"account$i"), createMockTrie())
}
// Verify cache size capped at 100
cache.size shouldBe 100
// Verify oldest 50 entries evicted
cache.get(ByteString("account0")) shouldBe None
cache.get(ByteString("account149")) shouldBe defined
}
"should handle cache misses by reloading from storage" in {
// Setup account trie in storage
// Evict from cache
// Request same account again
// Verify trie reloaded from storage correctly
}
Summary of Recommendations¶
| Question | Current Behavior | Recommendation | Priority | Complexity |
|---|---|---|---|---|
| 1. State Root Verification | Logs error, continues | โ BLOCK sync, trigger healing | ๐ด CRITICAL | Medium |
| 2. Storage Root Verification | Logs warning, continues | โ ๏ธ Queue for healing | ๐ HIGH | Low |
| 3. Trie Initialization | May throw on missing root | โ Catch exception, create empty | ๐ก MEDIUM | Low |
| 4. Thread Safety | Wrong lock (mptStorage) | โ
Synchronize on this |
๐ HIGH | Low |
| 5. Memory Usage | Unbounded map | โ LRU cache with eviction | ๐ก MEDIUM | Medium |
Implementation Priority¶
Phase 1 (Critical - Before Production): 1. Fix thread safety (#4) - Prevents data corruption 2. Fix state root verification (#1) - Prevents accepting invalid state
Phase 2 (High - Before Mainnet): 3. Fix storage root verification (#2) - Improves sync completeness 4. Fix trie initialization (#3) - Improves resume robustness
Phase 3 (Medium - Performance): 5. Implement memory cache (#5) - Prevents OOM on mainnet
Estimated Effort¶
- Phase 1: 1-2 days (critical safety fixes)
- Phase 2: 1-2 days (robustness improvements)
- Phase 3: 2-3 days (performance optimization)
- Total: ~1 week of focused development
Testing Strategy¶
- Unit tests: Each fix needs specific test coverage
- Integration tests: Test against local testnet
- Mainnet simulation: Test with mainnet-like data volumes
- Stress tests: Concurrent requests, memory limits, error injection
- Interop tests: Verify against core-geth/geth peers
References¶
- SNAP Protocol Spec: https://github.com/ethereum/devp2p/blob/master/caps/snap.md
- Core-geth Syncer: https://github.com/etclabscore/core-geth/blob/master/eth/syncer.go
- Geth SNAP Sync: https://github.com/ethereum/go-ethereum/tree/master/eth/protocols/snap
- MPT Specification: https://ethereum.org/en/developers/docs/data-structures-and-encoding/patricia-merkle-trie/
Changelog¶
- 2025-12-02: Initial review by Herald agent
Authors¶
- Herald (Network Protocol Agent)
- Review requested by forge agent