Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1b51ded
feat: optimize address batch pipeline
sergeytimoshin Mar 14, 2026
208ade3
format
sergeytimoshin Mar 14, 2026
57e4d49
feat: stabilize address batch pipeline
sergeytimoshin Mar 14, 2026
a8f93dc
feat: batch cold account loads in light client
sergeytimoshin Mar 14, 2026
5f63379
fix: harden load batching and mixed decompression
sergeytimoshin Mar 15, 2026
c43ad66
Fix prover startup and decompression load flow
sergeytimoshin Mar 16, 2026
f8562bd
cleanup: harden prover startup polling
sergeytimoshin Mar 16, 2026
9b588cf
format
sergeytimoshin Mar 16, 2026
4f60065
format
sergeytimoshin Mar 16, 2026
ae33276
cleanup
sergeytimoshin Mar 24, 2026
09894c9
cleanup
sergeytimoshin Mar 27, 2026
97310df
refactor: simplify batch data length validation and remove redundant …
sergeytimoshin Mar 27, 2026
275ec04
feat: optimize address batch pipeline
sergeytimoshin Mar 14, 2026
2c72d84
format
sergeytimoshin Mar 14, 2026
f7eec43
feat: stabilize address batch pipeline
sergeytimoshin Mar 14, 2026
4535b49
feat: batch cold account loads in light client
sergeytimoshin Mar 14, 2026
3966959
fix: harden load batching and mixed decompression
sergeytimoshin Mar 15, 2026
6548173
fix: harden test indexer proof parsing
sergeytimoshin Mar 16, 2026
98ed449
fix: harden runtime safety edge cases
sergeytimoshin Mar 17, 2026
338b4ac
fix: harden runtime safety fallout
sergeytimoshin Mar 17, 2026
62dc396
format
sergeytimoshin Mar 17, 2026
ffda165
format
sergeytimoshin Mar 17, 2026
bd856c3
cleanup
sergeytimoshin Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions forester/src/forester_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,20 +670,22 @@ fn parse_tree_status(
let fullness = next_index as f64 / capacity as f64 * 100.0;

let (queue_len, queue_cap) = queue_account
.map(|acc| {
unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) }
.ok()
.map(|hs| {
.map(
|acc| match unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) } {
Ok(hs) => {
let len = hs
.iter()
.filter(|(_, cell)| cell.sequence_number.is_none())
.count() as u64;
let cap = hs.get_capacity() as u64;
(len, cap)
})
.unwrap_or((0, 0))
})
.map(|(l, c)| (Some(l), Some(c)))
(Some(len), Some(cap))
}
Err(error) => {
warn!(?error, "Failed to parse StateV1 queue hash set");
(None, None)
}
},
)
.unwrap_or((None, None));

(
Expand Down Expand Up @@ -725,20 +727,22 @@ fn parse_tree_status(
let fullness = next_index as f64 / capacity as f64 * 100.0;

let (queue_len, queue_cap) = queue_account
.map(|acc| {
unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) }
.ok()
.map(|hs| {
.map(
|acc| match unsafe { parse_hash_set_from_bytes::<QueueAccount>(&acc.data) } {
Ok(hs) => {
let len = hs
.iter()
.filter(|(_, cell)| cell.sequence_number.is_none())
.count() as u64;
let cap = hs.get_capacity() as u64;
(len, cap)
})
.unwrap_or((0, 0))
})
.map(|(l, c)| (Some(l), Some(c)))
(Some(len), Some(cap))
}
Err(error) => {
warn!(?error, "Failed to parse AddressV1 queue hash set");
(None, None)
}
},
)
.unwrap_or((None, None));

(
Expand Down
77 changes: 73 additions & 4 deletions forester/src/processor/v2/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,77 @@ impl StreamingAddressQueue {
hashchain_idx: usize,
) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
let available = self.wait_for_batch(end);
if start >= available {
if available < end || start >= end {
return Ok(None);
}
let actual_end = end.min(available);
let data = lock_recover(&self.data, "streaming_address_queue.data");

let min_len = [
data.addresses.len(),
data.low_element_values.len(),
data.low_element_next_values.len(),
data.low_element_indices.len(),
data.low_element_next_indices.len(),
]
.into_iter()
.min()
.unwrap_or(0);
if min_len < actual_end {
return Err(anyhow!(
"incomplete batch data: min field length {} < required end {}",
min_len,
actual_end
));
}

let addresses = data.addresses[start..actual_end].to_vec();
if addresses.is_empty() {
return Err(anyhow!("Empty batch at start={}", start));
return Ok(None);
}
let expected_len = addresses.len();
let Some(low_element_values) = data
.low_element_values
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
let Some(low_element_next_values) = data
.low_element_next_values
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
let Some(low_element_indices) = data
.low_element_indices
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
let Some(low_element_next_indices) = data
.low_element_next_indices
.get(start..end)
.map(|slice| slice.to_vec())
else {
return Ok(None);
};
if [
low_element_values.len(),
low_element_next_values.len(),
low_element_indices.len(),
low_element_next_indices.len(),
]
.iter()
.any(|&len| len != expected_len)
{
return Ok(None);
}
let low_element_proofs = match data.reconstruct_proofs::<HEIGHT>(start..end) {
Ok(proofs) if proofs.len() == expected_len => proofs,
Ok(_) | Err(_) => return Ok(None),
};

let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() {
Some(hashchain) => hashchain,
Expand All @@ -528,7 +589,11 @@ impl StreamingAddressQueue {
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
low_element_proofs: data.reconstruct_proofs::<HEIGHT>(start..actual_end)?,
low_element_proofs: data
.reconstruct_proofs::<HEIGHT>(start..actual_end)
.map_err(|error| {
anyhow!("incomplete batch data: failed to reconstruct proofs: {error}")
})?,
addresses,
leaves_hashchain,
}))
Expand Down Expand Up @@ -566,6 +631,10 @@ impl StreamingAddressQueue {
lock_recover(&self.data, "streaming_address_queue.data").start_index
}

pub fn tree_next_insertion_index(&self) -> u64 {
lock_recover(&self.data, "streaming_address_queue.data").tree_next_insertion_index
}

pub fn subtrees(&self) -> Vec<[u8; 32]> {
lock_recover(&self.data, "streaming_address_queue.data")
.subtrees
Expand Down
6 changes: 3 additions & 3 deletions forester/src/processor/v2/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
}

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}
Comment on lines 134 to 137
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Extract worker-pool bootstrap into a single helper.

The same fallible initialization block now exists in three paths. A small ensure_worker_pool() would keep future startup logging/retry behavior consistent and avoid one path drifting the next time worker bootstrap changes.

Also applies to: 534-537, 563-566

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v2/processor.rs` around lines 134 - 137, Create a
single helper method (e.g., ensure_worker_pool) on the processor that
encapsulates the fallible initialization currently duplicated: check
self.worker_pool.is_none(), call
spawn_proof_workers(&self.context.prover_config)?, and set self.worker_pool =
Some(WorkerPool { job_tx }); then replace the three duplicated blocks with calls
to ensure_worker_pool(). Reference the existing symbols spawn_proof_workers,
WorkerPool, self.worker_pool, and self.context.prover_config so the helper
performs the same behavior and returns the same error propagation as the
original code.


Expand Down Expand Up @@ -532,7 +532,7 @@ where
((queue_size / self.zkp_batch_size) as usize).min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down Expand Up @@ -561,7 +561,7 @@ where
let max_batches = max_batches.min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down
20 changes: 11 additions & 9 deletions forester/src/processor/v2/proof_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,27 @@ struct ProofClients {
}

impl ProofClients {
fn new(config: &ProverConfig) -> Self {
Self {
fn new(config: &ProverConfig) -> crate::Result<Self> {
Ok(Self {
append_client: ProofClient::with_config(
config.append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
nullify_client: ProofClient::with_config(
config.update_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
address_append_client: ProofClient::with_config(
config.address_append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
}
)?,
})
}

fn get_client(&self, input: &ProofInput) -> &ProofClient {
Expand All @@ -164,11 +164,13 @@ impl ProofClients {
}
}

pub fn spawn_proof_workers(config: &ProverConfig) -> async_channel::Sender<ProofJob> {
pub fn spawn_proof_workers(
config: &ProverConfig,
) -> crate::Result<async_channel::Sender<ProofJob>> {
let (job_tx, job_rx) = async_channel::bounded::<ProofJob>(256);
let clients = Arc::new(ProofClients::new(config));
let clients = Arc::new(ProofClients::new(config)?);
tokio::spawn(async move { run_proof_pipeline(job_rx, clients).await });
job_tx
Ok(job_tx)
}

async fn run_proof_pipeline(
Expand Down
2 changes: 1 addition & 1 deletion forester/src/processor/v2/strategy/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl<R: Rpc> TreeStrategy<R> for AddressTreeStrategy {
}

let initial_root = streaming_queue.initial_root();
let start_index = streaming_queue.start_index();
let start_index = streaming_queue.tree_next_insertion_index();

let subtrees_arr: [[u8; 32]; DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize] =
subtrees.try_into().map_err(|v: Vec<[u8; 32]>| {
Expand Down
80 changes: 27 additions & 53 deletions program-tests/utils/src/actions/legacy/instructions/transfer2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,23 @@ pub async fn create_generic_transfer2_instruction<R: Rpc + Indexer>(
payer: Pubkey,
should_filter_zero_outputs: bool,
) -> Result<Instruction, TokenSdkError> {
// // Get a single shared output queue for ALL compress/compress-and-close operations
// // This prevents reordering issues caused by the sort_by_key at the end
// let shared_output_queue = rpc
// .get_random_state_tree_info()
// .unwrap()
// .get_output_pubkey()
// .unwrap();
// Transfer2 supports a single output queue per instruction. Legacy helpers accept
// per-action queues, but normalize them down to one shared queue for the IX.
let mut explicit_output_queue = None;
for action in &actions {
let candidate = match action {
Transfer2InstructionType::Compress(input) => Some(input.output_queue),
Transfer2InstructionType::CompressAndClose(input) => Some(input.output_queue),
Transfer2InstructionType::Decompress(_)
| Transfer2InstructionType::Transfer(_)
| Transfer2InstructionType::Approve(_) => None,
};
if let Some(candidate) = candidate {
if explicit_output_queue.is_none() {
explicit_output_queue = Some(candidate);
}
}
}

let mut hashes = Vec::new();
actions.iter().for_each(|account| match account {
Expand Down Expand Up @@ -210,24 +220,16 @@ pub async fn create_generic_transfer2_instruction<R: Rpc + Indexer>(
.value;

let mut packed_tree_accounts = PackedAccounts::default();
// tree infos must be packed before packing the token input accounts
let packed_tree_infos = rpc_proof_result.pack_tree_infos(&mut packed_tree_accounts);
// Pack only input state tree infos. Grouped transfer2 proofs can span multiple output trees.
let packed_tree_infos = rpc_proof_result.pack_state_tree_infos(&mut packed_tree_accounts);

// We use a single shared output queue for all compress/compress-and-close operations to avoid ordering failures.
let shared_output_queue = if packed_tree_infos.address_trees.is_empty() {
let shared_output_queue = rpc
.get_random_state_tree_info()
let shared_output_queue = explicit_output_queue.unwrap_or_else(|| {
rpc.get_random_state_tree_info()
.unwrap()
.get_output_pubkey()
.unwrap();
packed_tree_accounts.insert_or_get(shared_output_queue)
} else {
packed_tree_infos
.state_trees
.as_ref()
.unwrap()
.output_tree_index
};
});
let shared_output_queue = packed_tree_accounts.insert_or_get(shared_output_queue);

let mut inputs_offset = 0;
let mut in_lamports = Vec::new();
Expand All @@ -242,14 +244,7 @@ pub async fn create_generic_transfer2_instruction<R: Rpc + Indexer>(
if let Some(ref input_token_account) = input.compressed_token_account {
let token_data = input_token_account
.iter()
.zip(
packed_tree_infos
.state_trees
.as_ref()
.unwrap()
.packed_tree_infos[inputs_offset..]
.iter(),
)
.zip(packed_tree_infos[inputs_offset..].iter())
.map(|(account, rpc_account)| {
if input.to != account.token.owner {
return Err(TokenSdkError::InvalidCompressInputOwner);
Expand Down Expand Up @@ -391,14 +386,7 @@ pub async fn create_generic_transfer2_instruction<R: Rpc + Indexer>(
let token_data = input
.compressed_token_account
.iter()
.zip(
packed_tree_infos
.state_trees
.as_ref()
.unwrap()
.packed_tree_infos[inputs_offset..]
.iter(),
)
.zip(packed_tree_infos[inputs_offset..].iter())
.map(|(account, rpc_account)| {
pack_input_token_account(
account,
Expand Down Expand Up @@ -460,14 +448,7 @@ pub async fn create_generic_transfer2_instruction<R: Rpc + Indexer>(
let token_data = input
.compressed_token_account
.iter()
.zip(
packed_tree_infos
.state_trees
.as_ref()
.unwrap()
.packed_tree_infos[inputs_offset..]
.iter(),
)
.zip(packed_tree_infos[inputs_offset..].iter())
.map(|(account, rpc_account)| {
pack_input_token_account(
account,
Expand Down Expand Up @@ -542,14 +523,7 @@ pub async fn create_generic_transfer2_instruction<R: Rpc + Indexer>(
let token_data = input
.compressed_token_account
.iter()
.zip(
packed_tree_infos
.state_trees
.as_ref()
.unwrap()
.packed_tree_infos[inputs_offset..]
.iter(),
)
.zip(packed_tree_infos[inputs_offset..].iter())
.map(|(account, rpc_account)| {
pack_input_token_account(
account,
Expand Down
Loading
Loading