Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions crates/apollo_gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl<
metric_counters.count_private_transaction_received();
}
}
let is_p2p = p2p_message_metadata.is_some();

if let RpcTransaction::Declare(ref declare_tx) = tx {
if let Err(e) = self.check_declare_permissions(declare_tx) {
Expand Down Expand Up @@ -243,7 +244,7 @@ impl<
.inspect_err(|e| metric_counters.record_add_tx_failure(e))?;

let proof_archive_handle = self
.store_proof_and_spawn_archiving(proof_data, internal_tx.tx_hash)
.store_proof_and_spawn_archiving(proof_data, internal_tx.tx_hash, is_p2p)
.await
.inspect_err(|e| metric_counters.record_add_tx_failure(e))?;

Expand Down Expand Up @@ -278,28 +279,34 @@ impl<
&self,
proof_data: Option<(ProofFacts, Proof)>,
tx_hash: TransactionHash,
is_p2p: bool,
) -> GatewayResult<ProofArchiveHandle> {
let Some((proof_facts, proof)) = proof_data else {
return Ok(None);
};

// TODO(Einat): Avoid archiving in P2P gateways.
// Spawn the GCS archive write before the proof-manager store so the two run in parallel
// — the proof-manager store is the dominant latency and there's no point serializing them.
let proof_archive_writer = self.proof_archive_writer.clone();
let archive_proof_facts = proof_facts.clone();
let archive_proof = proof.clone();
let handle = tokio::spawn(async move {
let proof_facts_hash = archive_proof_facts.hash();
let proof_archive_writer_start = Instant::now();
let result = proof_archive_writer.set_proof(archive_proof_facts, archive_proof).await;
let proof_archive_writer_duration = proof_archive_writer_start.elapsed();
info!(
"Proof archive writer took: {proof_archive_writer_duration:?} for tx hash: \
{tx_hash:?}"
);
(proof_facts_hash, result)
});
let archive_handle = if is_p2p {
// Skip the GCS archive write for transactions received via P2P to avoid double writes.
None
} else {
let proof_archive_writer = self.proof_archive_writer.clone();
let archive_proof_facts = proof_facts.clone();
let archive_proof = proof.clone();
Some(tokio::spawn(async move {
let proof_facts_hash = archive_proof_facts.hash();
let proof_archive_writer_start = Instant::now();
let result =
proof_archive_writer.set_proof(archive_proof_facts, archive_proof).await;
let proof_archive_writer_duration = proof_archive_writer_start.elapsed();
info!(
"Proof archive writer took: {proof_archive_writer_duration:?} for tx hash: \
{tx_hash:?}"
);
(proof_facts_hash, result)
}))
};

// Proof is verified during conversion to internal tx. It is stored here, after
// validation, to avoid storing proofs for rejected transactions.
Expand All @@ -316,15 +323,17 @@ impl<
}
Err(e) => {
// Tx will be rejected; abort the in-flight GCS write so it doesn't leak.
handle.abort();
if let Some(handle) = &archive_handle {
handle.abort();
}
return Err(StarknetError::internal_with_logging(
&format!("Failed to set proof in proof manager. tx_hash: {tx_hash:?}"),
e,
));
}
}

Ok(Some((handle, tx_hash)))
Ok(archive_handle.map(|handle| (handle, tx_hash)))
}

/// Awaits the spawned GCS archive task with a hard timeout.
Expand Down
74 changes: 53 additions & 21 deletions crates/apollo_gateway/src/gateway_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ async fn setup_mock_state(
tx_args: &impl TestingTxArgs,
expected_mempool_result: Result<(), MempoolClientError>,
expected_proof_archive_result: Result<(), ProofArchiveError>,
p2p_message_metadata: Option<BroadcastedMessageMetadata>,
) {
let input_tx = tx_args.get_rpc_tx();
let expected_internal_tx = tx_args.get_internal_tx();
Expand All @@ -403,17 +404,20 @@ async fn setup_mock_state(
state_reader.storage_view.extend(block_hash_state_maps.storage);

// If the transaction has proof facts, expect set_proof to be called on the proof manager and
// the proof archive writer.
// the proof archive writer (non-P2P transactions only).
let is_p2p = p2p_message_metadata.is_some();
let proof_archive_will_succeed = expected_proof_archive_result.is_ok();
if let RpcTransaction::Invoke(RpcInvokeTransaction::V3(ref invoke_tx)) = input_tx {
if !invoke_tx.proof_facts.is_empty() {
mock_dependencies
.expect_store_proof(invoke_tx.proof_facts.clone(), invoke_tx.proof.clone());
mock_dependencies.expect_set_proof(
invoke_tx.proof_facts.clone(),
invoke_tx.proof.clone(),
expected_proof_archive_result,
);
if !is_p2p {
mock_dependencies.expect_set_proof(
invoke_tx.proof_facts.clone(),
invoke_tx.proof.clone(),
expected_proof_archive_result,
);
}
}
}

Expand All @@ -422,11 +426,13 @@ async fn setup_mock_state(
account_state: AccountState { address, nonce: *input_tx.nonce() },
};
let validation_args = ValidationArgs::from(&mempool_add_tx_args);
if proof_archive_will_succeed {
// On the P2P path no GCS write is attempted, so the archive result is irrelevant to whether
// the tx is forwarded to the mempool.
if is_p2p || proof_archive_will_succeed {
mock_dependencies.expect_add_tx(
AddTransactionArgsWrapper {
args: mempool_add_tx_args,
p2p_message_metadata: p2p_message_metadata(),
p2p_message_metadata: p2p_message_metadata.clone(),
},
expected_mempool_result,
);
Expand All @@ -447,15 +453,16 @@ struct AddTxResults {
async fn run_add_tx_and_extract_metrics(
mock_dependencies: MockDependencies,
tx_args: &impl TestingTxArgs,
p2p_message_metadata: Option<BroadcastedMessageMetadata>,
) -> AddTxResults {
let recorder = PrometheusBuilder::new().build_recorder();
let _recorder_guard = metrics::set_default_local_recorder(&recorder);

let input_tx = tx_args.get_rpc_tx();
let gateway = mock_dependencies.gateway();
let result = gateway.add_tx(input_tx.clone(), p2p_message_metadata()).await;
let result = gateway.add_tx(input_tx.clone(), p2p_message_metadata.clone()).await;

let metric_handle_for_queries = GatewayMetricHandle::new(&input_tx, &p2p_message_metadata());
let metric_handle_for_queries = GatewayMetricHandle::new(&input_tx, &p2p_message_metadata);
let metrics = recorder.handle().render();

AddTxResults { result, metric_handle_for_queries, metrics }
Expand Down Expand Up @@ -491,10 +498,17 @@ async fn test_add_tx_negative(
#[case] expected_mempool_result: Result<(), MempoolClientError>,
#[case] expected_error_code: StarknetErrorCode,
) {
setup_mock_state(&mut mock_dependencies, &tx_args, expected_mempool_result, Ok(())).await;
setup_mock_state(
&mut mock_dependencies,
&tx_args,
expected_mempool_result,
Ok(()),
p2p_message_metadata(),
)
.await;

let AddTxResults { result, metric_handle_for_queries, metrics } =
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await;
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, p2p_message_metadata()).await;

assert_eq!(
metric_handle_for_queries.get_metric_value(GATEWAY_TRANSACTIONS_RECEIVED, &metrics),
Expand All @@ -518,11 +532,21 @@ async fn test_add_tx_positive(
declare_args()
)]
tx_args: impl TestingTxArgs,
#[values(None, p2p_message_metadata())] p2p_message_metadata: Option<
BroadcastedMessageMetadata,
>,
) {
setup_mock_state(&mut mock_dependencies, &tx_args, Ok(()), Ok(())).await;
setup_mock_state(
&mut mock_dependencies,
&tx_args,
Ok(()),
Ok(()),
p2p_message_metadata.clone(),
)
.await;

let AddTxResults { result, metric_handle_for_queries, metrics } =
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await;
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, p2p_message_metadata).await;

assert_eq!(
metric_handle_for_queries.get_metric_value(GATEWAY_TRANSACTIONS_RECEIVED, &metrics),
Expand All @@ -541,10 +565,18 @@ async fn test_add_tx_fails_when_proof_archive_write_fails(mut mock_dependencies:
let tx_args = invoke_args_with_client_side_proving();
let expected_proof_archive_result: Result<(), ProofArchiveError> =
Err(ProofArchiveError::WriteError("failed upload".to_string()));
setup_mock_state(&mut mock_dependencies, &tx_args, Ok(()), expected_proof_archive_result).await;
let p2p_message_metadata: Option<BroadcastedMessageMetadata> = None;
setup_mock_state(
&mut mock_dependencies,
&tx_args,
Ok(()),
expected_proof_archive_result,
p2p_message_metadata,
)
.await;

let AddTxResults { result, metrics, .. } =
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await;
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, None).await;
assert!(result.unwrap_err().is_internal());
assert_eq!(GATEWAY_PROOF_ARCHIVE_WRITE_FAILURE.parse_numeric_metric::<u64>(&metrics), Some(1),);
}
Expand All @@ -553,9 +585,9 @@ async fn test_add_tx_fails_when_proof_archive_write_fails(mut mock_dependencies:
#[tokio::test]
async fn test_private_transaction_counter(mut mock_dependencies: MockDependencies) {
let private_tx_args = invoke_args_with_client_side_proving();
setup_mock_state(&mut mock_dependencies, &private_tx_args, Ok(()), Ok(())).await;
setup_mock_state(&mut mock_dependencies, &private_tx_args, Ok(()), Ok(()), None).await;
let AddTxResults { metrics, .. } =
run_add_tx_and_extract_metrics(mock_dependencies, &private_tx_args).await;
run_add_tx_and_extract_metrics(mock_dependencies, &private_tx_args, None).await;
assert_eq!(
GATEWAY_PRIVATE_TRANSACTIONS_RECEIVED.parse_numeric_metric::<u64>(&metrics),
Some(1)
Expand All @@ -568,9 +600,9 @@ async fn test_public_transaction_does_not_increment_private_counter(
mut mock_dependencies: MockDependencies,
) {
let public_tx_args = invoke_args();
setup_mock_state(&mut mock_dependencies, &public_tx_args, Ok(()), Ok(())).await;
setup_mock_state(&mut mock_dependencies, &public_tx_args, Ok(()), Ok(()), None).await;
let AddTxResults { metrics, .. } =
run_add_tx_and_extract_metrics(mock_dependencies, &public_tx_args).await;
run_add_tx_and_extract_metrics(mock_dependencies, &public_tx_args, None).await;
assert_eq!(
GATEWAY_PRIVATE_TRANSACTIONS_RECEIVED.parse_numeric_metric::<u64>(&metrics),
Some(0)
Expand Down Expand Up @@ -604,7 +636,7 @@ async fn test_add_tx_fails_when_proof_verification_fails(mut mock_dependencies:

// Run add_tx and verify it fails.
let AddTxResults { result, metric_handle_for_queries, metrics } =
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await;
run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, p2p_message_metadata()).await;

// Assert the transaction was received but failed.
assert_eq!(
Expand Down
Loading