Skip to content

Commit a7ee556

Browse files
feat: batch cold account loads in light client (#2341)
* feat: optimize address batch pipeline * format * feat: stabilize address batch pipeline * feat: batch cold account loads in light client * fix: harden load batching and mixed decompression * Fix prover startup and decompression load flow * cleanup: harden prover startup polling * format * format * cleanup * cleanup * refactor: simplify batch data length validation and remove redundant proof height checks * refactor: remove unused output_queue_index parameter from into_in_token_data methods
1 parent 4b7d2df commit a7ee556

32 files changed

Lines changed: 3152 additions & 1272 deletions

File tree

forester/src/processor/v2/helpers.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -493,12 +493,30 @@ impl StreamingAddressQueue {
493493
hashchain_idx: usize,
494494
) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
495495
let available = self.wait_for_batch(end);
496-
if start >= available {
496+
if available < end || start >= end {
497497
return Ok(None);
498498
}
499-
let actual_end = end.min(available);
499+
let actual_end = end;
500500
let data = lock_recover(&self.data, "streaming_address_queue.data");
501501

502+
let min_len = [
503+
data.addresses.len(),
504+
data.low_element_values.len(),
505+
data.low_element_next_values.len(),
506+
data.low_element_indices.len(),
507+
data.low_element_next_indices.len(),
508+
]
509+
.into_iter()
510+
.min()
511+
.unwrap_or(0);
512+
if min_len < actual_end {
513+
return Err(anyhow!(
514+
"incomplete batch data: min field length {} < required end {}",
515+
min_len,
516+
actual_end
517+
));
518+
}
519+
502520
let addresses = data.addresses[start..actual_end].to_vec();
503521
if addresses.is_empty() {
504522
return Err(anyhow!("Empty batch at start={}", start));
@@ -528,7 +546,11 @@ impl StreamingAddressQueue {
528546
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),
529547
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
530548
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
531-
low_element_proofs: data.reconstruct_proofs::<HEIGHT>(start..actual_end)?,
549+
low_element_proofs: data
550+
.reconstruct_proofs::<HEIGHT>(start..actual_end)
551+
.map_err(|error| {
552+
anyhow!("incomplete batch data: failed to reconstruct proofs: {error}")
553+
})?,
532554
addresses,
533555
leaves_hashchain,
534556
}))
@@ -566,6 +588,10 @@ impl StreamingAddressQueue {
566588
lock_recover(&self.data, "streaming_address_queue.data").start_index
567589
}
568590

591+
pub fn tree_next_insertion_index(&self) -> u64 {
592+
lock_recover(&self.data, "streaming_address_queue.data").tree_next_insertion_index
593+
}
594+
569595
pub fn subtrees(&self) -> Vec<[u8; 32]> {
570596
lock_recover(&self.data, "streaming_address_queue.data")
571597
.subtrees

forester/src/processor/v2/strategy/address.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl<R: Rpc> TreeStrategy<R> for AddressTreeStrategy {
167167
}
168168

169169
let initial_root = streaming_queue.initial_root();
170-
let start_index = streaming_queue.start_index();
170+
let start_index = streaming_queue.tree_next_insertion_index();
171171

172172
let subtrees_arr: [[u8; 32]; DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize] =
173173
subtrees.try_into().map_err(|v: Vec<[u8; 32]>| {

program-tests/utils/src/e2e_test_env.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ where
764764
// // local_leaves_hash_chain is only used for a test assertion.
765765
// let local_nullifier_hash_chain = create_hash_chain_from_array(&addresses);
766766
// assert_eq!(leaves_hash_chain, local_nullifier_hash_chain);
767-
let start_index = address_queue.start_index as usize;
767+
let start_index = address_queue.tree_next_insertion_index as usize;
768768
assert!(
769769
start_index >= 2,
770770
"start index should be greater than 2 else tree is not inited"

program-tests/utils/src/mock_batched_forester.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ impl<const HEIGHT: usize> MockBatchedForester<HEIGHT> {
132132

133133
assert_eq!(computed_new_root, self.merkle_tree.root());
134134

135-
let proof_result = match ProofClient::local()
135+
let proof_client = ProofClient::local();
136+
let proof_result = match proof_client
136137
.generate_batch_append_proof(circuit_inputs)
137138
.await
138139
{
@@ -207,9 +208,8 @@ impl<const HEIGHT: usize> MockBatchedForester<HEIGHT> {
207208
batch_size,
208209
&[],
209210
)?;
210-
let proof_result = ProofClient::local()
211-
.generate_batch_update_proof(inputs)
212-
.await?;
211+
let proof_client = ProofClient::local();
212+
let proof_result = proof_client.generate_batch_update_proof(inputs).await?;
213213
let new_root = self.merkle_tree.root();
214214
let proof = CompressedProof {
215215
a: proof_result.0.proof.a,
@@ -318,7 +318,8 @@ impl<const HEIGHT: usize> MockBatchedAddressForester<HEIGHT> {
318318
)));
319319
}
320320
};
321-
let proof_result = match ProofClient::local()
321+
let proof_client = ProofClient::local();
322+
let proof_result = match proof_client
322323
.generate_batch_address_append_proof(inputs)
323324
.await
324325
{

program-tests/utils/src/test_batch_forester.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ pub async fn create_batch_update_address_tree_instruction_data_with_proof<R: Rpc
670670
// // local_leaves_hash_chain is only used for a test assertion.
671671
// let local_nullifier_hash_chain = create_hash_chain_from_slice(addresses.as_slice()).unwrap();
672672
// assert_eq!(leaves_hash_chain, local_nullifier_hash_chain);
673-
let start_index = address_queue.start_index as usize;
673+
let start_index = address_queue.tree_next_insertion_index as usize;
674674
assert!(
675675
start_index >= 1,
676676
"start index should be greater than 2 else tree is not inited"

prover/client/src/errors.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ pub enum ProverClientError {
3939

4040
#[error("Integer conversion failed: {0}")]
4141
IntegerConversion(String),
42-
4342
#[error("Hashchain mismatch: computed {computed:?} != expected {expected:?} (batch_size={batch_size}, next_index={next_index})")]
4443
HashchainMismatch {
4544
computed: [u8; 32],

prover/client/src/helpers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ pub fn bigint_to_u8_32(n: &BigInt) -> Result<[u8; 32], Box<dyn std::error::Error
4949
pub fn compute_root_from_merkle_proof<const HEIGHT: usize>(
5050
leaf: [u8; 32],
5151
path_elements: &[[u8; 32]; HEIGHT],
52-
path_index: u32,
52+
path_index: usize,
5353
) -> Result<([u8; 32], ChangelogEntry<HEIGHT>), ProverClientError> {
54-
let mut changelog_entry = ChangelogEntry::default_with_index(path_index as usize);
54+
let mut changelog_entry = ChangelogEntry::default_with_index(path_index);
5555

5656
let mut current_hash = leaf;
5757
let mut current_index = path_index;

prover/client/src/proof_client.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::time::sleep;
66
use tracing::{debug, error, info, trace, warn};
77

88
use crate::{
9-
constants::PROVE_PATH,
9+
constants::{PROVE_PATH, SERVER_ADDRESS},
1010
errors::ProverClientError,
1111
proof::{
1212
compress_proof, deserialize_gnark_proof_json, proof_from_json_struct, ProofCompressed,
@@ -17,14 +17,13 @@ use crate::{
1717
batch_append::{BatchAppendInputsJson, BatchAppendsCircuitInputs},
1818
batch_update::{update_inputs_string, BatchUpdateCircuitInputs},
1919
},
20+
prover::build_http_client,
2021
};
2122

2223
const MAX_RETRIES: u32 = 10;
2324
const BASE_RETRY_DELAY_SECS: u64 = 1;
2425
const DEFAULT_POLLING_INTERVAL_MS: u64 = 100;
2526
const DEFAULT_MAX_WAIT_TIME_SECS: u64 = 600;
26-
const DEFAULT_LOCAL_SERVER: &str = "http://localhost:3001";
27-
2827
const INITIAL_POLL_DELAY_SMALL_CIRCUIT_MS: u64 = 200;
2928
const INITIAL_POLL_DELAY_LARGE_CIRCUIT_MS: u64 = 200;
3029

@@ -70,8 +69,8 @@ pub struct ProofClient {
7069
impl ProofClient {
7170
pub fn local() -> Self {
7271
Self {
73-
client: Client::new(),
74-
server_address: DEFAULT_LOCAL_SERVER.to_string(),
72+
client: build_http_client(),
73+
server_address: SERVER_ADDRESS.to_string(),
7574
polling_interval: Duration::from_millis(DEFAULT_POLLING_INTERVAL_MS),
7675
max_wait_time: Duration::from_secs(DEFAULT_MAX_WAIT_TIME_SECS),
7776
api_key: None,
@@ -93,7 +92,7 @@ impl ProofClient {
9392
};
9493

9594
Self {
96-
client: Client::new(),
95+
client: build_http_client(),
9796
server_address,
9897
polling_interval,
9998
max_wait_time,
@@ -111,7 +110,7 @@ impl ProofClient {
111110
initial_poll_delay: Duration,
112111
) -> Self {
113112
Self {
114-
client: Client::new(),
113+
client: build_http_client(),
115114
server_address,
116115
polling_interval,
117116
max_wait_time,

prover/client/src/proof_types/batch_address_append/proof_inputs.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,12 @@ pub fn get_batch_address_append_circuit_inputs<const HEIGHT: usize>(
221221
}
222222
let new_element_values = &new_element_values[..zkp_batch_size];
223223
let mut new_root = [0u8; 32];
224-
let mut low_element_circuit_merkle_proofs = Vec::with_capacity(new_element_values.len());
225-
let mut new_element_circuit_merkle_proofs = Vec::with_capacity(new_element_values.len());
226-
let mut patched_low_element_next_values = Vec::with_capacity(new_element_values.len());
227-
let mut patched_low_element_next_indices = Vec::with_capacity(new_element_values.len());
228-
let mut patched_low_element_values = Vec::with_capacity(new_element_values.len());
229-
let mut patched_low_element_indices = Vec::with_capacity(new_element_values.len());
224+
let mut low_element_circuit_merkle_proofs = Vec::with_capacity(zkp_batch_size);
225+
let mut new_element_circuit_merkle_proofs = Vec::with_capacity(zkp_batch_size);
226+
let mut patched_low_element_next_values = Vec::with_capacity(zkp_batch_size);
227+
let mut patched_low_element_next_indices = Vec::with_capacity(zkp_batch_size);
228+
let mut patched_low_element_values = Vec::with_capacity(zkp_batch_size);
229+
let mut patched_low_element_indices = Vec::with_capacity(zkp_batch_size);
230230

231231
let computed_hashchain = create_hash_chain_from_slice(new_element_values).map_err(|e| {
232232
ProverClientError::GenericError(format!("Failed to compute hashchain: {}", e))
@@ -261,7 +261,7 @@ pub fn get_batch_address_append_circuit_inputs<const HEIGHT: usize>(
261261
let is_first_batch = indexed_changelog.is_empty();
262262
let mut expected_root_for_low = current_root;
263263

264-
for i in 0..new_element_values.len() {
264+
for i in 0..zkp_batch_size {
265265
let mut changelog_index = 0;
266266
let low_element_index = low_element_indices[i].try_into().map_err(|_| {
267267
ProverClientError::IntegerConversion(format!(
@@ -342,7 +342,7 @@ pub fn get_batch_address_append_circuit_inputs<const HEIGHT: usize>(
342342
let (computed_root, _) = compute_root_from_merkle_proof::<HEIGHT>(
343343
old_low_leaf_hash,
344344
&merkle_proof,
345-
low_element.index as u32,
345+
low_element.index,
346346
)?;
347347
if computed_root != expected_root_for_low {
348348
let low_value_bytes = bigint_to_be_bytes_array::<32>(&low_element.value)
@@ -383,7 +383,7 @@ pub fn get_batch_address_append_circuit_inputs<const HEIGHT: usize>(
383383
compute_root_from_merkle_proof::<HEIGHT>(
384384
new_low_leaf_hash,
385385
&merkle_proof,
386-
new_low_element.index as u32,
386+
new_low_element.index,
387387
)?;
388388

389389
patcher.push_changelog_entry::<HEIGHT>(changelog, changelog_entry);
@@ -424,7 +424,7 @@ pub fn get_batch_address_append_circuit_inputs<const HEIGHT: usize>(
424424
let (updated_root, changelog_entry) = compute_root_from_merkle_proof(
425425
new_element_leaf_hash,
426426
&merkle_proof_array,
427-
current_index as u32,
427+
current_index,
428428
)?;
429429

430430
if i == 0 && changelog.len() == 1 {
@@ -451,7 +451,7 @@ pub fn get_batch_address_append_circuit_inputs<const HEIGHT: usize>(
451451
let (root_with_zero, _) = compute_root_from_merkle_proof::<HEIGHT>(
452452
zero_hash,
453453
&merkle_proof_array,
454-
current_index as u32,
454+
current_index,
455455
)?;
456456
if root_with_zero != intermediate_root {
457457
tracing::error!(

prover/client/src/proof_types/batch_append/proof_inputs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ pub fn get_batch_append_inputs<const HEIGHT: usize>(
190190
let (updated_root, changelog_entry) = compute_root_from_merkle_proof(
191191
final_leaf,
192192
&merkle_proof_array,
193-
start_index + i as u32,
193+
start_index as usize + i,
194194
)?;
195195
new_root = updated_root;
196196
changelog.push(changelog_entry);

0 commit comments

Comments
 (0)