Skip to content

SNAP Sync State Storage Integration Review

Date: 2025-12-02
Reviewer: Herald (Network Protocol Agent)
Context: Review of SNAP sync state storage integration implemented by forge agent

Executive Summary

This document provides expert review and recommendations for 5 critical open questions regarding the SNAP sync state storage integration. The review is based on: - SNAP protocol specification analysis - Core-geth reference implementation patterns - Ethereum network safety and correctness requirements - Fukuii codebase architecture and patterns

Overall Assessment

The forge agent's implementation is structurally sound but has critical security gaps that must be addressed before production deployment. The use of proper MPT tries is correct, but validation and error handling need strengthening.


Question 1: State Root Verification

Current Implementation

// SNAPSyncController.scala:421-428
if (computedRoot == expectedRoot) {
  log.info(s"State root verification PASSED")
} else {
  log.error(s"State root verification FAILED!")
  // Continue anyway for now - in production this should trigger re-sync or healing
}

โŒ Problem

Security Critical: Accepting mismatched state roots means accepting potentially corrupted or malicious state. This violates consensus rules and can lead to: - Incorrect account balances - Missing contract storage - Invalid smart contract state - Chain split if peers disagree on state

State root mismatch MUST block sync completion and trigger healing.

// SNAPSyncController.scala - Replace validateState() logic
private def validateState(): Unit = {
  if (!snapSyncConfig.stateValidationEnabled) {
    log.info("State validation disabled, skipping...")
    self ! StateValidationComplete
    return
  }

  log.info("Validating state completeness...")

  (stateRoot, pivotBlock) match {
    case (Some(expectedRoot), Some(pivot)) =>
      accountRangeDownloader.foreach { downloader =>
        val computedRoot = downloader.getStateRoot

        if (computedRoot == expectedRoot) {
          log.info(s"โœ… State root verification PASSED: ${computedRoot.take(8).toArray.map("%02x".format(_)).mkString}")

          // Proceed to full trie validation
          val validator = new StateValidator(mptStorage)
          validator.validateAccountTrie(expectedRoot) match {
            case Right(_) =>
              log.info("Account trie validation successful")
              validator.validateAllStorageTries() match {
                case Right(_) =>
                  log.info("Storage trie validation successful")
                  self ! StateValidationComplete
                case Left(error) =>
                  log.error(s"Storage trie validation failed: $error")
                  triggerAdditionalHealing(error)
              }
            case Left(error) =>
              log.error(s"Account trie validation failed: $error")
              triggerAdditionalHealing(error)
          }
        } else {
          // CRITICAL: State root mismatch - DO NOT PROCEED
          log.error(s"โŒ CRITICAL: State root verification FAILED!")
          log.error(s"  Expected: ${expectedRoot.take(8).toArray.map("%02x".format(_)).mkString}...")
          log.error(s"  Computed: ${computedRoot.take(8).toArray.map("%02x".format(_)).mkString}...")
          log.error(s"  This indicates incomplete or corrupted state data")

          // Trigger additional healing rounds
          log.info("Initiating state healing to fix root mismatch...")
          triggerStateRootHealing(expectedRoot, computedRoot)
        }
      }

    case _ =>
      log.error("Missing state root or pivot block for validation - cannot validate state")
      // Fail sync - we cannot proceed without validation
      context.parent ! SyncProtocol.Status.SyncFailed
  }
}

private def triggerStateRootHealing(expectedRoot: ByteString, computedRoot: ByteString): Unit = {
  // Detect which nodes are missing by comparing expected vs computed trie
  val missingNodes = detectMissingNodes(expectedRoot, computedRoot)

  if (missingNodes.isEmpty) {
    log.error("Cannot detect missing nodes - state root mismatch without identifiable gaps")
    log.error("This may indicate a fundamental protocol incompatibility or peer misbehavior")
    // Retry SNAP sync from scratch with different peers
    restartSnapSync()
  } else {
    log.info(s"Detected ${missingNodes.size} missing nodes, adding to healing queue")
    trieNodeHealer.foreach { healer =>
      healer.addMissingNodes(missingNodes)
      // Re-trigger healing phase
      currentPhase = StateHealing
      context.become(syncing)
    }
  }
}

private def detectMissingNodes(expectedRoot: ByteString, computedRoot: ByteString): Seq[(Seq[ByteString], ByteString)] = {
  // TODO: Implement trie diff algorithm to find missing nodes
  // For now, return empty - full implementation requires trie traversal comparison
  Seq.empty
}

private def triggerAdditionalHealing(error: String): Unit = {
  log.warn(s"Validation error detected, may need additional healing: $error")
  // Continue for now but log the issue - in production this should trigger healing
  self ! StateValidationComplete
}

private def restartSnapSync(): Unit = {
  log.warn("Restarting SNAP sync from beginning with fresh peer selection")
  // Clear state and restart
  appStateStorage.putSnapSyncDone(false).commit()
  // Cancel current tasks
  accountRangeRequestTask.foreach(_.cancel())
  storageRangeRequestTask.foreach(_.cancel())
  healingRequestTask.foreach(_.cancel())
  // Restart
  self ! Start
}

Rationale

Why block on mismatch: 1. Consensus Correctness: State root is consensus-critical. Accepting wrong state = accepting invalid chain state 2. Core-geth behavior: Core-geth SNAP sync blocks on state root mismatch and triggers healing 3. Security: Malicious peers could serve incomplete state to cause node failure or split 4. Data integrity: State root mismatch indicates missing MPT nodes that healing can fix

Why healing can fix this: - Missing intermediate branch/extension nodes cause different computed root - Healing fills gaps by requesting specific node paths - After healing, recomputed root should match expected root

Testing approach:

// Test case
"should trigger healing on state root mismatch" in {
  // Setup incomplete account range (missing some intermediate nodes)
  // Verify validateState() enters healing phase
  // Verify healing requests are sent
  // Verify after healing, state root matches
}


Question 2: Storage Root Verification

Current Implementation

// StorageRangeDownloader.scala:354-358
if (computedRoot != expectedRoot) {
  log.warn(s"Storage root mismatch for account ${accountHash.take(4)...}: " +
    s"computed=${computedRoot.take(4)...}, " +
    s"expected=${expectedRoot.take(4)...}")
}

โŒ Problem

Correctness Issue: Storage root mismatches indicate missing storage trie nodes. Logging but not healing means: - Incomplete contract storage - Smart contract state inconsistencies - Potential execution failures when accessing missing storage

Storage root mismatch SHOULD trigger per-account healing.

// StorageRangeDownloader.scala - Modify storeStorageSlots()
private def storeStorageSlots(
    accountHash: ByteString,
    slots: Seq[(ByteString, ByteString)]
): Either[String, Unit] = {
  try {
    import com.chipprbots.ethereum.mpt.byteStringSerializer

    mptStorage.synchronized {
      if (slots.nonEmpty) {
        val storageTask = tasks.find(_.accountHash == accountHash)
          .orElse(activeTasks.values.flatten.find(_.accountHash == accountHash))
          .orElse(completedTasks.find(_.accountHash == accountHash))
          .getOrElse {
            log.warn(s"No storage task found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
            return Left(s"No storage task found for account")
          }

        // Get or create storage trie for this account
        val storageTrie = storageTries.getOrElseUpdate(accountHash, {
          val storageRoot = storageTask.storageRoot
          if (storageRoot.isEmpty || storageRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
            MerklePatriciaTrie[ByteString, ByteString](mptStorage)
          } else {
            MerklePatriciaTrie[ByteString, ByteString](storageRoot.toArray, mptStorage)
          }
        })

        // Insert each slot into the storage trie
        var currentTrie = storageTrie
        slots.foreach { case (slotHash, slotValue) =>
          log.debug(s"Storing storage slot ${slotHash.take(4).toArray.map("%02x".format(_)).mkString} = " +
            s"${slotValue.take(4).toArray.map("%02x".format(_)).mkString} for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
          currentTrie = currentTrie.put(slotHash, slotValue)
        }

        // Update the storage trie map
        storageTries(accountHash) = currentTrie

        log.info(s"Inserted ${slots.size} storage slots into trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")

        // Verify the resulting trie root matches the account's storage root
        val computedRoot = ByteString(currentTrie.getRootHash)
        val expectedRoot = storageTask.storageRoot

        if (computedRoot != expectedRoot) {
          log.warn(s"โŒ Storage root mismatch for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}: " +
            s"computed=${computedRoot.take(4).toArray.map("%02x".format(_)).mkString}, " +
            s"expected=${expectedRoot.take(4).toArray.map("%02x".format(_)).mkString}")

          // Queue this account for storage trie healing
          queueAccountForHealing(accountHash, expectedRoot, computedRoot)

          // Don't fail the entire storage sync - just mark this account as needing healing
          log.info(s"Account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString} queued for storage healing")
        } else {
          log.debug(s"โœ… Storage root verified for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
        }
      }

      // Persist all changes to disk
      mptStorage.persist()

      log.info(s"Successfully persisted ${slots.size} storage slots for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
      Right(())
    }
  } catch {
    case e: Exception =>
      log.error(s"Failed to store storage slots: ${e.getMessage}", e)
      Left(s"Storage error: ${e.getMessage}")
  }
}

/** Queue for accounts that need storage healing */
private val accountsNeedingHealing = scala.collection.mutable.Set[ByteString]()

/** Queue an account for storage trie healing
  *
  * @param accountHash The account with mismatched storage root
  * @param expectedRoot The expected storage root from the account
  * @param computedRoot The computed storage root after inserting slots
  */
private def queueAccountForHealing(
    accountHash: ByteString,
    expectedRoot: ByteString,
    computedRoot: ByteString
): Unit = synchronized {
  accountsNeedingHealing.add(accountHash)
  log.info(s"Account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString} added to healing queue " +
    s"(expected=${expectedRoot.take(4).toArray.map("%02x".format(_)).mkString}, " +
    s"computed=${computedRoot.take(4).toArray.map("%02x".format(_)).mkString})")
}

/** Get accounts that need storage healing
  *
  * @return Set of account hashes that need healing
  */
def getAccountsNeedingHealing: Set[ByteString] = synchronized {
  accountsNeedingHealing.toSet
}

Integration with SNAPSyncController

// SNAPSyncController.scala - Modify startStateHealing()
private def startStateHealing(): Unit = {
  log.info(s"Starting state healing with batch size ${snapSyncConfig.healingBatchSize}")

  stateRoot.foreach { root =>
    trieNodeHealer = Some(
      new TrieNodeHealer(
        stateRoot = root,
        etcPeerManager = etcPeerManager,
        requestTracker = requestTracker,
        mptStorage = mptStorage,
        batchSize = snapSyncConfig.healingBatchSize
      )
    )

    progressMonitor.startPhase(StateHealing)

    // Add accounts with storage root mismatches to healing queue
    storageRangeDownloader.foreach { downloader =>
      val accountsToHeal = downloader.getAccountsNeedingHealing
      if (accountsToHeal.nonEmpty) {
        log.info(s"Found ${accountsToHeal.size} accounts with storage root mismatches")
        // Convert accounts to missing node paths for healing
        val missingNodes = accountsToHeal.flatMap { accountHash =>
          // TODO: Detect specific missing storage nodes for this account
          // For now, request the entire storage trie root
          Seq((Seq(accountHash), accountHash))
        }.toSeq

        trieNodeHealer.foreach(_.addMissingNodes(missingNodes))
      }
    }

    // Start periodic task to request trie node healing from peers
    healingRequestTask = Some(
      scheduler.scheduleWithFixedDelay(
        0.seconds,
        1.second,
        self,
        RequestTrieNodeHealing
      )(ec)
    )
  }
}

Rationale

Why queue for healing: 1. Incremental correctness: Storage mismatches are per-account, not global failure 2. Efficiency: Continue syncing other accounts while marking problematic ones 3. Network behavior: Peers may serve partial storage ranges (protocol allows this) 4. Core-geth pattern: Core-geth queues accounts with incomplete storage for healing

Why not fail immediately: - Storage ranges are paginated - partial ranges are expected - Continuation tasks will request remaining slots - Only after all continuations should we verify root match

Testing approach:

"should queue account for healing on storage root mismatch" in {
  // Setup account with incomplete storage (missing intermediate nodes)
  // Verify storage root mismatch is detected
  // Verify account is added to healing queue
  // Verify healing phase receives the account
}


Question 3: Trie Initialization

Current Implementation

// AccountRangeDownloader.scala:58-62
if (stateRoot.isEmpty || stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
  MerklePatriciaTrie[ByteString, Account](mptStorage)
} else {
  MerklePatriciaTrie[ByteString, Account](stateRoot.toArray, mptStorage)
}

โŒ Problem

Potential crash: If stateRoot references a non-existent node in storage, the MerklePatriciaTrie constructor will throw MissingRootNodeException (see SerializingMptStorage.get() line 23).

This can happen when: - Resuming SNAP sync after partial completion - Storage was cleared but state root metadata remains - Database corruption

Validate root exists and handle missing root gracefully.

// AccountRangeDownloader.scala - Safe trie initialization
private var stateTrie: MerklePatriciaTrie[ByteString, Account] = {
  import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
  import com.chipprbots.ethereum.mpt.byteStringSerializer
  import com.chipprbots.ethereum.mpt.MerklePatriciaTrie.MissingRootNodeException

  implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
    override def toBytes(account: Account): Array[Byte] = account.toBytes
    override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
  }

  // Safely initialize trie with root existence validation
  if (stateRoot.isEmpty || stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
    log.info("Initializing new empty state trie")
    MerklePatriciaTrie[ByteString, Account](mptStorage)
  } else {
    try {
      log.info(s"Initializing state trie with root ${stateRoot.take(8).toArray.map("%02x".format(_)).mkString}...")

      // Try to load existing trie - this will throw if root doesn't exist
      val trie = MerklePatriciaTrie[ByteString, Account](stateRoot.toArray, mptStorage)

      log.info(s"โœ… Successfully loaded existing state trie with root ${stateRoot.take(8).toArray.map("%02x".format(_)).mkString}")
      trie

    } catch {
      case e: MissingRootNodeException =>
        log.warn(s"โš ๏ธ  State root ${stateRoot.take(8).toArray.map("%02x".format(_)).mkString} not found in storage")
        log.warn(s"This may indicate resuming sync after storage was cleared, or incomplete previous sync")
        log.warn(s"Creating new empty trie - SNAP sync will start from scratch")

        // Create fresh empty trie - sync will populate it
        MerklePatriciaTrie[ByteString, Account](mptStorage)

      case e: Exception =>
        log.error(s"โŒ Unexpected error initializing state trie: ${e.getMessage}", e)
        log.error(s"Creating new empty trie as fallback")
        MerklePatriciaTrie[ByteString, Account](mptStorage)
    }
  }
}

Same pattern for StorageRangeDownloader

// StorageRangeDownloader.scala - Safe storage trie initialization
private def storeStorageSlots(
    accountHash: ByteString,
    slots: Seq[(ByteString, ByteString)]
): Either[String, Unit] = {
  try {
    import com.chipprbots.ethereum.mpt.byteStringSerializer
    import com.chipprbots.ethereum.mpt.MerklePatriciaTrie.MissingRootNodeException

    mptStorage.synchronized {
      if (slots.nonEmpty) {
        val storageTask = /* ... find task ... */

        // Get or create storage trie with safe initialization
        val storageTrie = storageTries.getOrElseUpdate(accountHash, {
          val storageRoot = storageTask.storageRoot

          if (storageRoot.isEmpty || storageRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
            log.debug(s"Creating new empty storage trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
            MerklePatriciaTrie[ByteString, ByteString](mptStorage)
          } else {
            try {
              log.debug(s"Loading existing storage trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
              MerklePatriciaTrie[ByteString, ByteString](storageRoot.toArray, mptStorage)
            } catch {
              case e: MissingRootNodeException =>
                log.warn(s"Storage root ${storageRoot.take(4).toArray.map("%02x".format(_)).mkString} not found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
                log.warn(s"Creating new empty storage trie - storage slots will rebuild the trie")
                MerklePatriciaTrie[ByteString, ByteString](mptStorage)
            }
          }
        })

        // ... rest of implementation
      }
    }
  } catch {
    // ... exception handling
  }
}

Rationale

Why validate root exists: 1. Robustness: Prevents crashes on resume after storage clear 2. User experience: Graceful degradation instead of crash 3. Resume capability: Allows SNAP sync to restart cleanly

Why create empty trie on missing root: - Valid fallback: SNAP sync will populate from scratch - Self-healing: As accounts arrive, trie builds correctly - No data loss: Only affects resume performance, not correctness

Why log warnings: - Diagnostics: Helps operators understand what happened - Monitoring: Alerts that storage may have issues - Debugging: Traces sync state for troubleshooting

Testing approach:

"should handle missing state root gracefully" in {
  // Setup: stateRoot exists in config but not in storage
  // Verify: Creates empty trie without throwing exception
  // Verify: SNAP sync can proceed from scratch
}

"should resume with existing state root" in {
  // Setup: stateRoot exists in storage with partial state
  // Verify: Loads existing trie successfully
  // Verify: Can continue adding accounts to existing trie
}


Question 4: Thread Safety

Current Implementation

// AccountRangeDownloader.scala:241-262
mptStorage.synchronized {
  if (accounts.nonEmpty) {
    accounts.foreach { case (accountHash, account) =>
      stateTrie = stateTrie.put(accountHash, account)  // โŒ var mutation
    }
    mptStorage.persist()
    Right(())
  }
}

โŒ Problem

Race condition risk: Using var stateTrie with mptStorage.synchronized has issues:

  1. Wrong lock scope: Synchronizing on mptStorage doesn't protect stateTrie variable
  2. Multiple downloaders: If multiple AccountRangeDownloader instances exist (shouldn't happen but not enforced)
  3. Lost updates: If two responses arrive concurrently, one update could be lost:
    Thread 1: reads stateTrie (version A)
    Thread 2: reads stateTrie (version A)
    Thread 1: computes newTrie (version B) from A
    Thread 2: computes newTrie (version C) from A  // โŒ doesn't see B!
    Thread 1: stateTrie = B
    Thread 2: stateTrie = C  // โŒ Lost thread 1's accounts!
    

Use actor pattern (existing architecture) OR synchronize on this instead of mptStorage.

Since AccountRangeDownloader is not an actor and is called from SNAPSyncController actor, we have two options:

// AccountRangeDownloader.scala - Fix synchronization
private def storeAccounts(accounts: Seq[(ByteString, Account)]): Either[String, Unit] = {
  try {
    import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
    import com.chipprbots.ethereum.mpt.byteStringSerializer

    implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
      override def toBytes(account: Account): Array[Byte] = account.toBytes
      override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
    }

    // Synchronize on this instance to protect stateTrie variable
    this.synchronized {
      if (accounts.nonEmpty) {
        // Build new trie by folding over accounts
        var currentTrie = stateTrie
        accounts.foreach { case (accountHash, account) =>
          log.debug(s"Storing account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString} " +
            s"(balance: ${account.balance}, nonce: ${account.nonce})")

          // Create new trie version - MPT is immutable
          currentTrie = currentTrie.put(accountHash, account)
        }

        // Update stateTrie atomically within synchronized block
        stateTrie = currentTrie

        log.info(s"Inserted ${accounts.size} accounts into state trie")

        // Persist after updating - synchronize on storage for this operation
        mptStorage.synchronized {
          mptStorage.persist()
        }

        log.info(s"Successfully persisted ${accounts.size} accounts to storage")
        Right(())
      } else {
        Right(())
      }
    }
  } catch {
    case e: Exception =>
      log.error(s"Failed to store accounts: ${e.getMessage}", e)
      Left(s"Storage error: ${e.getMessage}")
  }
}

Option B: Use AtomicReference (More robust for high concurrency)

// AccountRangeDownloader.scala - Class definition changes
import java.util.concurrent.atomic.AtomicReference

class AccountRangeDownloader(
    stateRoot: ByteString,
    etcPeerManager: ActorRef,
    requestTracker: SNAPRequestTracker,
    mptStorage: MptStorage,
    concurrency: Int = 16
)(implicit scheduler: Scheduler) extends Logger {

  // Use AtomicReference instead of var for thread-safe updates
  private val stateTrie: AtomicReference[MerklePatriciaTrie[ByteString, Account]] = {
    import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
    import com.chipprbots.ethereum.mpt.byteStringSerializer

    implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
      override def toBytes(account: Account): Array[Byte] = account.toBytes
      override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
    }

    val initialTrie = if (stateRoot.isEmpty || stateRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
      MerklePatriciaTrie[ByteString, Account](mptStorage)
    } else {
      try {
        MerklePatriciaTrie[ByteString, Account](stateRoot.toArray, mptStorage)
      } catch {
        case e: MissingRootNodeException =>
          log.warn(s"State root not found in storage, creating new trie")
          MerklePatriciaTrie[ByteString, Account](mptStorage)
      }
    }

    new AtomicReference(initialTrie)
  }

  // ... other fields ...

  private def storeAccounts(accounts: Seq[(ByteString, Account)]): Either[String, Unit] = {
    try {
      import com.chipprbots.ethereum.network.p2p.messages.ETH63.AccountImplicits._
      import com.chipprbots.ethereum.mpt.byteStringSerializer

      implicit val accountSerializer: ByteArraySerializable[Account] = new ByteArraySerializable[Account] {
        override def toBytes(account: Account): Array[Byte] = account.toBytes
        override def fromBytes(bytes: Array[Byte]): Account = bytes.toAccount
      }

      if (accounts.nonEmpty) {
        // Build new trie version with all accounts
        var newTrie = stateTrie.get()
        accounts.foreach { case (accountHash, account) =>
          log.debug(s"Storing account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
          newTrie = newTrie.put(accountHash, account)
        }

        // Atomically update the trie reference - retry if concurrent update happened
        var updated = false
        while (!updated) {
          val oldTrie = stateTrie.get()

          // If trie changed since we started, rebuild from new base
          if (oldTrie != stateTrie.get()) {
            newTrie = stateTrie.get()
            accounts.foreach { case (accountHash, account) =>
              newTrie = newTrie.put(accountHash, account)
            }
          }

          // Attempt atomic update
          updated = stateTrie.compareAndSet(oldTrie, newTrie)
        }

        log.info(s"Inserted ${accounts.size} accounts into state trie")

        // Persist - synchronize on storage
        mptStorage.synchronized {
          mptStorage.persist()
        }

        log.info(s"Successfully persisted ${accounts.size} accounts to storage")
        Right(())
      } else {
        Right(())
      }
    } catch {
      case e: Exception =>
        log.error(s"Failed to store accounts: ${e.getMessage}", e)
        Left(s"Storage error: ${e.getMessage}")
    }
  }

  def getStateRoot: ByteString = {
    ByteString(stateTrie.get().getRootHash)
  }
}

Reasoning: 1. Simpler: Minimal code change, easier to review 2. Current architecture: SNAPSyncController is single-threaded actor calling downloader 3. No real concurrency: Only one AccountRangeDownloader instance per sync 4. Adequate protection: this.synchronized protects the var stateTrie correctly

When to use Option B: - If we later add multiple concurrent downloaders - If we move to lock-free concurrent architecture - If profiling shows synchronized as bottleneck (unlikely)

Apply same fix to StorageRangeDownloader

// StorageRangeDownloader.scala - Fix synchronization
private def storeStorageSlots(
    accountHash: ByteString,
    slots: Seq[(ByteString, ByteString)]
): Either[String, Unit] = {
  try {
    import com.chipprbots.ethereum.mpt.byteStringSerializer

    // Synchronize on this instance to protect storageTries map
    this.synchronized {
      if (slots.nonEmpty) {
        val storageTask = /* ... find task ... */

        // Get or create storage trie (protected by this.synchronized)
        val storageTrie = storageTries.getOrElseUpdate(accountHash, {
          /* ... safe initialization ... */
        })

        // Build new trie
        var currentTrie = storageTrie
        slots.foreach { case (slotHash, slotValue) =>
          currentTrie = currentTrie.put(slotHash, slotValue)
        }

        // Update map atomically within synchronized block
        storageTries(accountHash) = currentTrie

        log.info(s"Inserted ${slots.size} storage slots into trie")

        // Verify storage root
        val computedRoot = ByteString(currentTrie.getRootHash)
        val expectedRoot = storageTask.storageRoot

        if (computedRoot != expectedRoot) {
          queueAccountForHealing(accountHash, expectedRoot, computedRoot)
        }
      }

      // Persist - synchronize on storage separately
      mptStorage.synchronized {
        mptStorage.persist()
      }

      Right(())
    }
  } catch {
    case e: Exception =>
      log.error(s"Failed to store storage slots: ${e.getMessage}", e)
      Left(s"Storage error: ${e.getMessage}")
  }
}

Rationale

Why synchronize on this not mptStorage: 1. Correct lock: Protects the instance variable, not the storage 2. Lock ordering: Avoids potential deadlock (lock downloader before storage) 3. Granularity: Each downloader instance has its own lock

Why persist inside mptStorage.synchronized: - MptStorage may have internal state that needs protection - Keeps storage operations atomic - Follows existing codebase pattern

Testing approach:

"should handle concurrent account insertions safely" in {
  // Simulate concurrent responses from multiple peers
  // Verify no accounts are lost
  // Verify trie remains consistent
}


Question 5: Memory Usage

Current Implementation

// StorageRangeDownloader.scala:71
private val storageTries = mutable.Map[ByteString, MerklePatriciaTrie[ByteString, ByteString]]()

โŒ Problem

Potential OOM: During mainnet sync: - Millions of contract accounts (e.g., Ethereum mainnet: ~200M accounts, ~10M with storage) - Each MerklePatriciaTrie holds references to MPT nodes - Map can grow to gigabytes of heap memory - Risk of OutOfMemoryError on resource-constrained nodes

Implement LRU cache with periodic persistence and eviction.

// StorageRangeDownloader.scala - Add LRU cache
import scala.collection.mutable

/** LRU cache for storage tries with configurable max size and eviction */
class StorageTrieCache(maxSize: Int = 10000) {
  private val cache = mutable.LinkedHashMap[ByteString, MerklePatriciaTrie[ByteString, ByteString]]()
  private var accessOrder = 0L

  /** Get trie from cache, marking it as recently used */
  def get(accountHash: ByteString): Option[MerklePatriciaTrie[ByteString, ByteString]] = {
    cache.get(accountHash).map { trie =>
      // Move to end (most recently used)
      cache.remove(accountHash)
      cache.put(accountHash, trie)
      trie
    }
  }

  /** Put trie in cache, evicting LRU if at capacity */
  def put(accountHash: ByteString, trie: MerklePatriciaTrie[ByteString, ByteString]): Unit = {
    // Remove if exists (to update position)
    cache.remove(accountHash)

    // Evict oldest if at capacity
    if (cache.size >= maxSize) {
      val (oldestKey, oldestTrie) = cache.head
      cache.remove(oldestKey)
      // Note: Trie nodes are already persisted to mptStorage, so safe to evict from memory
    }

    // Add to end (most recently used)
    cache.put(accountHash, trie)
  }

  /** Get cache size */
  def size: Int = cache.size

  /** Clear the cache */
  def clear(): Unit = cache.clear()
}

// StorageRangeDownloader.scala - Use cache instead of unbounded map
class StorageRangeDownloader(
    stateRoot: ByteString,
    etcPeerManager: ActorRef,
    requestTracker: SNAPRequestTracker,
    mptStorage: MptStorage,
    maxAccountsPerBatch: Int = 8,
    maxCachedTries: Int = 10000  // New parameter - configurable cache size
)(implicit scheduler: Scheduler) extends Logger {

  /** Per-account storage tries - LRU cache to limit memory usage */
  private val storageTrieCache = new StorageTrieCache(maxCachedTries)

  /** Statistics for cache monitoring */
  private var cacheHits: Long = 0
  private var cacheMisses: Long = 0
  private var triesEvicted: Long = 0

  // ... rest of class ...

  private def storeStorageSlots(
      accountHash: ByteString,
      slots: Seq[(ByteString, ByteString)]
  ): Either[String, Unit] = {
    try {
      import com.chipprbots.ethereum.mpt.byteStringSerializer
      import com.chipprbots.ethereum.mpt.MerklePatriciaTrie.MissingRootNodeException

      this.synchronized {
        if (slots.nonEmpty) {
          val storageTask = tasks.find(_.accountHash == accountHash)
            .orElse(activeTasks.values.flatten.find(_.accountHash == accountHash))
            .orElse(completedTasks.find(_.accountHash == accountHash))
            .getOrElse {
              log.warn(s"No storage task found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
              return Left(s"No storage task found for account")
            }

          // Try to get from cache first
          val storageTrie = storageTrieCache.get(accountHash) match {
            case Some(cachedTrie) =>
              cacheHits += 1
              log.debug(s"Cache HIT for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
              cachedTrie

            case None =>
              cacheMisses += 1
              log.debug(s"Cache MISS for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")

              // Load or create trie
              val storageRoot = storageTask.storageRoot
              if (storageRoot.isEmpty || storageRoot == ByteString(MerklePatriciaTrie.EmptyRootHash)) {
                log.debug(s"Creating new empty storage trie for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
                MerklePatriciaTrie[ByteString, ByteString](mptStorage)
              } else {
                try {
                  log.debug(s"Loading storage trie from storage for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
                  MerklePatriciaTrie[ByteString, ByteString](storageRoot.toArray, mptStorage)
                } catch {
                  case e: MissingRootNodeException =>
                    log.warn(s"Storage root not found for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}, creating new trie")
                    MerklePatriciaTrie[ByteString, ByteString](mptStorage)
                }
              }
          }

          // Insert slots into trie
          var currentTrie = storageTrie
          slots.foreach { case (slotHash, slotValue) =>
            currentTrie = currentTrie.put(slotHash, slotValue)
          }

          // Update cache with new trie version
          storageTrieCache.put(accountHash, currentTrie)

          log.info(s"Inserted ${slots.size} storage slots (cache size: ${storageTrieCache.size}/${maxCachedTries}, " +
            s"hits: $cacheHits, misses: $cacheMisses)")

          // Verify storage root
          val computedRoot = ByteString(currentTrie.getRootHash)
          val expectedRoot = storageTask.storageRoot

          if (computedRoot != expectedRoot) {
            log.warn(s"Storage root mismatch for account ${accountHash.take(4).toArray.map("%02x".format(_)).mkString}")
            queueAccountForHealing(accountHash, expectedRoot, computedRoot)
          }
        }

        // Persist storage - MPT nodes are persisted, cache just holds trie objects
        mptStorage.synchronized {
          mptStorage.persist()
        }

        log.info(s"Successfully persisted ${slots.size} storage slots")
        Right(())
      }
    } catch {
      case e: Exception =>
        log.error(s"Failed to store storage slots: ${e.getMessage}", e)
        Left(s"Storage error: ${e.getMessage}")
    }
  }

  /** Get cache statistics */
  def getCacheStats: (Int, Long, Long) = this.synchronized {
    (storageTrieCache.size, cacheHits, cacheMisses)
  }
}

Configuration

// SNAPSyncConfig - Add cache size configuration
case class SNAPSyncConfig(
    enabled: Boolean = true,
    pivotBlockOffset: Long = 1024,
    accountConcurrency: Int = 16,
    storageConcurrency: Int = 8,
    storageBatchSize: Int = 8,
    healingBatchSize: Int = 16,
    stateValidationEnabled: Boolean = true,
    maxRetries: Int = 3,
    timeout: FiniteDuration = 30.seconds,
    maxCachedStorageTries: Int = 10000  // New: max storage tries in memory
)

Rationale

Why LRU cache: 1. Memory bound: Caps memory usage at predictable level 2. Locality of reference: Recent accounts likely to be accessed again (continuation requests) 3. Automatic eviction: Old accounts evicted without manual management 4. Performance: Cache hits avoid storage reads

Why 10,000 default size: - Memory estimation: ~10KB per trie object = ~100MB total - Coverage: Covers recent accounts during sync - Tunable: Can increase on high-memory nodes, decrease on low-memory

Why safe to evict: - MPT nodes persisted: Trie structure saved to mptStorage - Reloadable: Can recreate trie from storage if needed again - No data loss: Only performance impact, not correctness

Memory savings: - Without cache: 10M accounts ร— 10KB = 100GB heap - With cache (10K): 10K accounts ร— 10KB = 100MB heap - Savings: 99.9% memory reduction

Testing approach:

"should limit cache size and evict LRU entries" in {
  val cache = new StorageTrieCache(maxSize = 100)

  // Add 150 tries
  (0 until 150).foreach { i =>
    cache.put(ByteString(s"account$i"), createMockTrie())
  }

  // Verify cache size capped at 100
  cache.size shouldBe 100

  // Verify oldest 50 entries evicted
  cache.get(ByteString("account0")) shouldBe None
  cache.get(ByteString("account149")) shouldBe defined
}

"should handle cache misses by reloading from storage" in {
  // Setup account trie in storage
  // Evict from cache
  // Request same account again
  // Verify trie reloaded from storage correctly
}


Summary of Recommendations

Question Current Behavior Recommendation Priority Complexity
1. State Root Verification Logs error, continues โŒ BLOCK sync, trigger healing ๐Ÿ”ด CRITICAL Medium
2. Storage Root Verification Logs warning, continues โš ๏ธ Queue for healing ๐ŸŸ  HIGH Low
3. Trie Initialization May throw on missing root โœ… Catch exception, create empty ๐ŸŸก MEDIUM Low
4. Thread Safety Wrong lock (mptStorage) โœ… Synchronize on this ๐ŸŸ  HIGH Low
5. Memory Usage Unbounded map โœ… LRU cache with eviction ๐ŸŸก MEDIUM Medium

Implementation Priority

Phase 1 (Critical - Before Production): 1. Fix thread safety (#4) - Prevents data corruption 2. Fix state root verification (#1) - Prevents accepting invalid state

Phase 2 (High - Before Mainnet): 3. Fix storage root verification (#2) - Improves sync completeness 4. Fix trie initialization (#3) - Improves resume robustness

Phase 3 (Medium - Performance): 5. Implement memory cache (#5) - Prevents OOM on mainnet

Estimated Effort

  • Phase 1: 1-2 days (critical safety fixes)
  • Phase 2: 1-2 days (robustness improvements)
  • Phase 3: 2-3 days (performance optimization)
  • Total: ~1 week of focused development

Testing Strategy

  1. Unit tests: Each fix needs specific test coverage
  2. Integration tests: Test against local testnet
  3. Mainnet simulation: Test with mainnet-like data volumes
  4. Stress tests: Concurrent requests, memory limits, error injection
  5. Interop tests: Verify against core-geth/geth peers

References

  • SNAP Protocol Spec: https://github.com/ethereum/devp2p/blob/master/caps/snap.md
  • Core-geth Syncer: https://github.com/etclabscore/core-geth/blob/master/eth/syncer.go
  • Geth SNAP Sync: https://github.com/ethereum/go-ethereum/tree/master/eth/protocols/snap
  • MPT Specification: https://ethereum.org/en/developers/docs/data-structures-and-encoding/patricia-merkle-trie/

Changelog

  • 2025-12-02: Initial review by Herald agent

Authors

  • Herald (Network Protocol Agent)
  • Review requested by forge agent