diff --git a/crates/apollo_gateway/src/gateway.rs b/crates/apollo_gateway/src/gateway.rs index f53f4323cfc..0ee7e537dda 100644 --- a/crates/apollo_gateway/src/gateway.rs +++ b/crates/apollo_gateway/src/gateway.rs @@ -216,6 +216,7 @@ impl< metric_counters.count_private_transaction_received(); } } + let is_p2p = p2p_message_metadata.is_some(); if let RpcTransaction::Declare(ref declare_tx) = tx { if let Err(e) = self.check_declare_permissions(declare_tx) { @@ -243,7 +244,7 @@ impl< .inspect_err(|e| metric_counters.record_add_tx_failure(e))?; let proof_archive_handle = self - .store_proof_and_spawn_archiving(proof_data, internal_tx.tx_hash) + .store_proof_and_spawn_archiving(proof_data, internal_tx.tx_hash, is_p2p) .await .inspect_err(|e| metric_counters.record_add_tx_failure(e))?; @@ -278,28 +279,34 @@ impl< &self, proof_data: Option<(ProofFacts, Proof)>, tx_hash: TransactionHash, + is_p2p: bool, ) -> GatewayResult { let Some((proof_facts, proof)) = proof_data else { return Ok(None); }; - // TODO(Einat): Avoid archiving in P2P gateways. // Spawn the GCS archive write before the proof-manager store so the two run in parallel // — the proof-manager store is the dominant latency and there's no point serializing them. - let proof_archive_writer = self.proof_archive_writer.clone(); - let archive_proof_facts = proof_facts.clone(); - let archive_proof = proof.clone(); - let handle = tokio::spawn(async move { - let proof_facts_hash = archive_proof_facts.hash(); - let proof_archive_writer_start = Instant::now(); - let result = proof_archive_writer.set_proof(archive_proof_facts, archive_proof).await; - let proof_archive_writer_duration = proof_archive_writer_start.elapsed(); - info!( - "Proof archive writer took: {proof_archive_writer_duration:?} for tx hash: \ - {tx_hash:?}" - ); - (proof_facts_hash, result) - }); + let archive_handle = if is_p2p { + // Skip the GCS archive write for transactions received via P2P to avoid double writes. + None + } else { + let proof_archive_writer = self.proof_archive_writer.clone(); + let archive_proof_facts = proof_facts.clone(); + let archive_proof = proof.clone(); + Some(tokio::spawn(async move { + let proof_facts_hash = archive_proof_facts.hash(); + let proof_archive_writer_start = Instant::now(); + let result = + proof_archive_writer.set_proof(archive_proof_facts, archive_proof).await; + let proof_archive_writer_duration = proof_archive_writer_start.elapsed(); + info!( + "Proof archive writer took: {proof_archive_writer_duration:?} for tx hash: \ + {tx_hash:?}" + ); + (proof_facts_hash, result) + })) + }; // Proof is verified during conversion to internal tx. It is stored here, after // validation, to avoid storing proofs for rejected transactions. @@ -316,7 +323,9 @@ impl< } Err(e) => { // Tx will be rejected; abort the in-flight GCS write so it doesn't leak. - handle.abort(); + if let Some(handle) = &archive_handle { + handle.abort(); + } return Err(StarknetError::internal_with_logging( &format!("Failed to set proof in proof manager. tx_hash: {tx_hash:?}"), e, @@ -324,7 +333,7 @@ impl< } } - Ok(Some((handle, tx_hash))) + Ok(archive_handle.map(|handle| (handle, tx_hash))) } /// Awaits the spawned GCS archive task with a hard timeout. diff --git a/crates/apollo_gateway/src/gateway_test.rs b/crates/apollo_gateway/src/gateway_test.rs index 0ddaec4f3f6..38c7cd20411 100644 --- a/crates/apollo_gateway/src/gateway_test.rs +++ b/crates/apollo_gateway/src/gateway_test.rs @@ -382,6 +382,7 @@ async fn setup_mock_state( tx_args: &impl TestingTxArgs, expected_mempool_result: Result<(), MempoolClientError>, expected_proof_archive_result: Result<(), ProofArchiveError>, + p2p_message_metadata: Option, ) { let input_tx = tx_args.get_rpc_tx(); let expected_internal_tx = tx_args.get_internal_tx(); @@ -403,17 +404,20 @@ async fn setup_mock_state( state_reader.storage_view.extend(block_hash_state_maps.storage); // If the transaction has proof facts, expect set_proof to be called on the proof manager and - // the proof archive writer. + // the proof archive writer (non-P2P transactions only). + let is_p2p = p2p_message_metadata.is_some(); let proof_archive_will_succeed = expected_proof_archive_result.is_ok(); if let RpcTransaction::Invoke(RpcInvokeTransaction::V3(ref invoke_tx)) = input_tx { if !invoke_tx.proof_facts.is_empty() { mock_dependencies .expect_store_proof(invoke_tx.proof_facts.clone(), invoke_tx.proof.clone()); - mock_dependencies.expect_set_proof( - invoke_tx.proof_facts.clone(), - invoke_tx.proof.clone(), - expected_proof_archive_result, - ); + if !is_p2p { + mock_dependencies.expect_set_proof( + invoke_tx.proof_facts.clone(), + invoke_tx.proof.clone(), + expected_proof_archive_result, + ); + } } } @@ -422,11 +426,13 @@ async fn setup_mock_state( account_state: AccountState { address, nonce: *input_tx.nonce() }, }; let validation_args = ValidationArgs::from(&mempool_add_tx_args); - if proof_archive_will_succeed { + // On the P2P path no GCS write is attempted, so the archive result is irrelevant to whether + // the tx is forwarded to the mempool. + if is_p2p || proof_archive_will_succeed { mock_dependencies.expect_add_tx( AddTransactionArgsWrapper { args: mempool_add_tx_args, - p2p_message_metadata: p2p_message_metadata(), + p2p_message_metadata: p2p_message_metadata.clone(), }, expected_mempool_result, ); @@ -447,15 +453,16 @@ struct AddTxResults { async fn run_add_tx_and_extract_metrics( mock_dependencies: MockDependencies, tx_args: &impl TestingTxArgs, + p2p_message_metadata: Option, ) -> AddTxResults { let recorder = PrometheusBuilder::new().build_recorder(); let _recorder_guard = metrics::set_default_local_recorder(&recorder); let input_tx = tx_args.get_rpc_tx(); let gateway = mock_dependencies.gateway(); - let result = gateway.add_tx(input_tx.clone(), p2p_message_metadata()).await; + let result = gateway.add_tx(input_tx.clone(), p2p_message_metadata.clone()).await; - let metric_handle_for_queries = GatewayMetricHandle::new(&input_tx, &p2p_message_metadata()); + let metric_handle_for_queries = GatewayMetricHandle::new(&input_tx, &p2p_message_metadata); let metrics = recorder.handle().render(); AddTxResults { result, metric_handle_for_queries, metrics } @@ -491,10 +498,17 @@ async fn test_add_tx_negative( #[case] expected_mempool_result: Result<(), MempoolClientError>, #[case] expected_error_code: StarknetErrorCode, ) { - setup_mock_state(&mut mock_dependencies, &tx_args, expected_mempool_result, Ok(())).await; + setup_mock_state( + &mut mock_dependencies, + &tx_args, + expected_mempool_result, + Ok(()), + p2p_message_metadata(), + ) + .await; let AddTxResults { result, metric_handle_for_queries, metrics } = - run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await; + run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, p2p_message_metadata()).await; assert_eq!( metric_handle_for_queries.get_metric_value(GATEWAY_TRANSACTIONS_RECEIVED, &metrics), @@ -518,11 +532,21 @@ async fn test_add_tx_positive( declare_args() )] tx_args: impl TestingTxArgs, + #[values(None, p2p_message_metadata())] p2p_message_metadata: Option< + BroadcastedMessageMetadata, + >, ) { - setup_mock_state(&mut mock_dependencies, &tx_args, Ok(()), Ok(())).await; + setup_mock_state( + &mut mock_dependencies, + &tx_args, + Ok(()), + Ok(()), + p2p_message_metadata.clone(), + ) + .await; let AddTxResults { result, metric_handle_for_queries, metrics } = - run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await; + run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, p2p_message_metadata).await; assert_eq!( metric_handle_for_queries.get_metric_value(GATEWAY_TRANSACTIONS_RECEIVED, &metrics), @@ -541,10 +565,18 @@ async fn test_add_tx_fails_when_proof_archive_write_fails(mut mock_dependencies: let tx_args = invoke_args_with_client_side_proving(); let expected_proof_archive_result: Result<(), ProofArchiveError> = Err(ProofArchiveError::WriteError("failed upload".to_string())); - setup_mock_state(&mut mock_dependencies, &tx_args, Ok(()), expected_proof_archive_result).await; + let p2p_message_metadata: Option = None; + setup_mock_state( + &mut mock_dependencies, + &tx_args, + Ok(()), + expected_proof_archive_result, + p2p_message_metadata, + ) + .await; let AddTxResults { result, metrics, .. } = - run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await; + run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, None).await; assert!(result.unwrap_err().is_internal()); assert_eq!(GATEWAY_PROOF_ARCHIVE_WRITE_FAILURE.parse_numeric_metric::(&metrics), Some(1),); } @@ -553,9 +585,9 @@ async fn test_add_tx_fails_when_proof_archive_write_fails(mut mock_dependencies: #[tokio::test] async fn test_private_transaction_counter(mut mock_dependencies: MockDependencies) { let private_tx_args = invoke_args_with_client_side_proving(); - setup_mock_state(&mut mock_dependencies, &private_tx_args, Ok(()), Ok(())).await; + setup_mock_state(&mut mock_dependencies, &private_tx_args, Ok(()), Ok(()), None).await; let AddTxResults { metrics, .. } = - run_add_tx_and_extract_metrics(mock_dependencies, &private_tx_args).await; + run_add_tx_and_extract_metrics(mock_dependencies, &private_tx_args, None).await; assert_eq!( GATEWAY_PRIVATE_TRANSACTIONS_RECEIVED.parse_numeric_metric::(&metrics), Some(1) @@ -568,9 +600,9 @@ async fn test_public_transaction_does_not_increment_private_counter( mut mock_dependencies: MockDependencies, ) { let public_tx_args = invoke_args(); - setup_mock_state(&mut mock_dependencies, &public_tx_args, Ok(()), Ok(())).await; + setup_mock_state(&mut mock_dependencies, &public_tx_args, Ok(()), Ok(()), None).await; let AddTxResults { metrics, .. } = - run_add_tx_and_extract_metrics(mock_dependencies, &public_tx_args).await; + run_add_tx_and_extract_metrics(mock_dependencies, &public_tx_args, None).await; assert_eq!( GATEWAY_PRIVATE_TRANSACTIONS_RECEIVED.parse_numeric_metric::(&metrics), Some(0) @@ -604,7 +636,7 @@ async fn test_add_tx_fails_when_proof_verification_fails(mut mock_dependencies: // Run add_tx and verify it fails. let AddTxResults { result, metric_handle_for_queries, metrics } = - run_add_tx_and_extract_metrics(mock_dependencies, &tx_args).await; + run_add_tx_and_extract_metrics(mock_dependencies, &tx_args, p2p_message_metadata()).await; // Assert the transaction was received but failed. assert_eq!(