diff --git a/ocp/data/intent/postgres/model.go b/ocp/data/intent/postgres/model.go index 0c1b38d..6774ef7 100644 --- a/ocp/data/intent/postgres/model.go +++ b/ocp/data/intent/postgres/model.go @@ -167,7 +167,8 @@ func fromIntentModel(obj *intentModel) *intent.Record { NativeAmount: obj.NativeAmount, UsdMarketValue: obj.UsdMarketValue, - IsSwapBuy: obj.IsSwap, + IsSwapBuy: obj.IsSwap, + IsReturned: obj.IsReturned, } if len(record.ExternalDepositMetadata.ExchangeCurrency) == 0 { diff --git a/ocp/data/swap/memory/store.go b/ocp/data/swap/memory/store.go index ac72dba..01ad9bb 100644 --- a/ocp/data/swap/memory/store.go +++ b/ocp/data/swap/memory/store.go @@ -42,6 +42,9 @@ func (s *store) Save(_ context.Context, data *swap.Record) error { data.Version++ + item.Nonce = data.Nonce + item.Blockhash = data.Blockhash + item.TransactionSignature = data.TransactionSignature item.TransactionBlob = data.TransactionBlob item.State = data.State item.Version = data.Version diff --git a/ocp/data/swap/postgres/model.go b/ocp/data/swap/postgres/model.go index b4dc127..253a133 100644 --- a/ocp/data/swap/postgres/model.go +++ b/ocp/data/swap/postgres/model.go @@ -102,7 +102,7 @@ func (m *model) dbSave(ctx context.Context, db *sqlx.DB) error { ON CONFLICT (swap_id) DO UPDATE - SET transaction_blob = $15, state = $16, version = ` + tableName + `.version + 1 + SET nonce = $11, blockhash = $12, transaction_signature = $14, transaction_blob = $15, state = $16, version = ` + tableName + `.version + 1 WHERE ` + tableName + `.swap_id = $1 AND ` + tableName + `.version = $17 RETURNING diff --git a/ocp/data/swap/tests/tests.go b/ocp/data/swap/tests/tests.go index c97c4b6..b0e0b49 100644 --- a/ocp/data/swap/tests/tests.go +++ b/ocp/data/swap/tests/tests.go @@ -207,6 +207,29 @@ func testUpdateHappyPath(t *testing.T, s swap.Store) { actual, err = s.GetById(ctx, "test_swap_id") require.NoError(t, err) assertEquivalentRecords(t, expected, actual) + + expected.Nonce = "rotated_nonce" + expected.Blockhash = "rotated_blockhash" + expected.TransactionSignature = "rotated_transaction_signature" + expected.TransactionBlob = []byte("rotated_transaction_blob") + expected.State = swap.StateCancelling + mutatedProof := expected.ProofSignature + "_should_be_ignored" + expected.ProofSignature = mutatedProof + + err = s.Save(ctx, expected) + require.NoError(t, err) + assert.EqualValues(t, 1, expected.Id) + assert.EqualValues(t, 3, expected.Version) + + actual, err = s.GetById(ctx, "test_swap_id") + require.NoError(t, err) + assert.Equal(t, "rotated_nonce", actual.Nonce) + assert.Equal(t, "rotated_blockhash", actual.Blockhash) + assert.Equal(t, "rotated_transaction_signature", actual.TransactionSignature) + assert.Equal(t, []byte("rotated_transaction_blob"), actual.TransactionBlob) + assert.Equal(t, swap.StateCancelling, actual.State) + assert.Equal(t, "test_proof_signature", actual.ProofSignature) + assert.NotEqual(t, mutatedProof, actual.ProofSignature) }) } diff --git a/ocp/worker/swap/runtime.go b/ocp/worker/swap/runtime.go index 07f00b0..b19462c 100644 --- a/ocp/worker/swap/runtime.go +++ b/ocp/worker/swap/runtime.go @@ -10,8 +10,10 @@ import ( indexerpb "github.com/code-payments/code-vm-indexer/generated/indexer/v1" ocp_data "github.com/code-payments/ocp-server/ocp/data" + "github.com/code-payments/ocp-server/ocp/data/nonce" "github.com/code-payments/ocp-server/ocp/data/swap" "github.com/code-payments/ocp-server/ocp/integration" + "github.com/code-payments/ocp-server/ocp/transaction" "github.com/code-payments/ocp-server/ocp/worker" ) @@ -21,17 +23,29 @@ type runtime struct { data ocp_data.Provider vmIndexerClient indexerpb.IndexerClient integration integration.Swap + solanaNoncePool *transaction.LocalNoncePool } -func New(log *zap.Logger, data ocp_data.Provider, vmIndexerClient indexerpb.IndexerClient, integration integration.Swap, configProvider ConfigProvider) worker.Runtime { +func New( + log *zap.Logger, + data ocp_data.Provider, + vmIndexerClient indexerpb.IndexerClient, + integration integration.Swap, + solanaNoncePool *transaction.LocalNoncePool, + configProvider ConfigProvider, +) (worker.Runtime, error) { + if err := solanaNoncePool.Validate(nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.PurposeOnDemandTransaction); err != nil { + return nil, err + } + return &runtime{ log: log, conf: configProvider(), data: data, vmIndexerClient: vmIndexerClient, integration: integration, - } - + solanaNoncePool: solanaNoncePool, + }, nil } func (p *runtime) Start(ctx context.Context, interval time.Duration) error { @@ -40,6 +54,7 @@ func (p *runtime) Start(ctx context.Context, interval time.Duration) error { swap.StateFunding, swap.StateFunded, swap.StateSubmitting, + swap.StateCancelling, } { go func(state swap.State) { diff --git a/ocp/worker/swap/util.go b/ocp/worker/swap/util.go index fcb7e13..6fcdd27 100644 --- a/ocp/worker/swap/util.go +++ b/ocp/worker/swap/util.go @@ -24,8 +24,11 @@ import ( transaction_util "github.com/code-payments/ocp-server/ocp/transaction" vm_util "github.com/code-payments/ocp-server/ocp/vm" "github.com/code-payments/ocp-server/solana" + compute_budget "github.com/code-payments/ocp-server/solana/computebudget" "github.com/code-payments/ocp-server/solana/currencycreator" + "github.com/code-payments/ocp-server/solana/system" "github.com/code-payments/ocp-server/solana/token" + "github.com/code-payments/ocp-server/solana/vm" ) func (p *runtime) validateSwapState(record *swap.Record, states ...swap.State) error { @@ -119,14 +122,19 @@ func (p *runtime) markSwapFailed(ctx context.Context, swapRecord *swap.Record) e if err != nil { return err } - err = p.validateCurrencyMetadataState(destinationCurrencyMetadataRecord, currency.MetadataStateExecutingInitialPurchase, currency.MetadataStateAvailable) + err = p.validateCurrencyMetadataState( + destinationCurrencyMetadataRecord, + currency.MetadataStateFundingAuthority, + currency.MetadataStateExecutingInitialPurchase, + currency.MetadataStateAvailable, + ) if err != nil { return err } } return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - err := p.validateSwapState(swapRecord, swap.StateSubmitting) + err := p.validateSwapState(swapRecord, swap.StateSubmitting, swap.StateCancelling) if err != nil { return err } @@ -137,7 +145,8 @@ func (p *runtime) markSwapFailed(ctx context.Context, swapRecord *swap.Record) e } if swapRecord.Kind == swap.KindReserve && !common.IsCoreMint(toMint) { - if destinationCurrencyMetadataRecord.State == currency.MetadataStateExecutingInitialPurchase { + switch destinationCurrencyMetadataRecord.State { + case currency.MetadataStateFundingAuthority, currency.MetadataStateExecutingInitialPurchase: destinationCurrencyMetadataRecord.State = currency.MetadataStateAbandoning err = p.data.SaveCurrencyMetadata(ctx, destinationCurrencyMetadataRecord) if err != nil { @@ -152,7 +161,133 @@ func (p *runtime) markSwapFailed(ctx context.Context, swapRecord *swap.Record) e }) } -func (p *runtime) markSwapCancelled(ctx context.Context, swapRecord *swap.Record) error { +func (p *runtime) autoRecoverSwapFunds(ctx context.Context, record *swap.Record) error { + if err := p.ensureSwapSourceIsInitialized(ctx, record, false); err != nil { + return errors.Wrap(err, "error ensuring swap source is initialized") + } + + selectedNonce, err := p.solanaNoncePool.GetNonce(ctx) + if err != nil { + return errors.Wrap(err, "error allocating nonce for cancel swap") + } + defer selectedNonce.ReleaseIfNotReserved(ctx) + + cancelTxn, err := p.buildCancelSwapTransaction(ctx, record, selectedNonce) + if err != nil { + return errors.Wrap(err, "error building cancel swap transaction") + } + + return p.markSwapCancelling( + ctx, + record, + selectedNonce, + cancelTxn, + ) +} + +func (p *runtime) buildCancelSwapTransaction( + ctx context.Context, + record *swap.Record, + selectedNonce *transaction_util.Nonce, +) (*solana.Transaction, error) { + owner, err := common.NewAccountFromPublicKeyString(record.Owner) + if err != nil { + return nil, errors.Wrap(err, "error parsing owner") + } + + fromMint, err := common.NewAccountFromPublicKeyString(record.FromMint) + if err != nil { + return nil, errors.Wrap(err, "error parsing from mint") + } + + sourceVmConfig, err := common.GetVmConfigForMint(ctx, p.data, fromMint) + if err != nil { + return nil, errors.Wrap(err, "error getting vm config for source mint") + } + + sourceTimelockAccounts, err := owner.GetTimelockAccounts(sourceVmConfig) + if err != nil { + return nil, errors.Wrap(err, "error getting source timelock accounts") + } + + memoryAccount, memoryIndex, err := vm_util.GetVirtualTimelockAccountLocationInMemory(ctx, p.data, sourceTimelockAccounts.Vault, false) + if err != nil { + return nil, errors.Wrap(err, "error getting source vta memory location") + } + + subsidizer := common.GetSubsidizer() + + cancelInstr := vm.NewCancelSwapInstruction( + &vm.CancelSwapInstructionAccounts{ + VmAuthority: sourceVmConfig.Authority.PublicKey().ToBytes(), + Vm: sourceVmConfig.Vm.PublicKey().ToBytes(), + VmMemory: memoryAccount.PublicKey().ToBytes(), + Swapper: owner.PublicKey().ToBytes(), + SwapPda: sourceTimelockAccounts.VmSwapAccounts.Pda.PublicKey().ToBytes(), + SwapAta: sourceTimelockAccounts.VmSwapAccounts.Ata.PublicKey().ToBytes(), + VmOmnibus: sourceVmConfig.Omnibus.PublicKey().ToBytes(), + }, + &vm.CancelSwapInstructionArgs{ + AccountIndex: memoryIndex, + Amount: record.SwapAmount + record.FeeAmount, + Bump: sourceTimelockAccounts.VmSwapAccounts.PdaBump, + }, + ) + + instructions := []solana.Instruction{ + system.AdvanceNonce(selectedNonce.Account.PublicKey().ToBytes(), subsidizer.PublicKey().ToBytes()), + compute_budget.SetComputeUnitLimit(75_000), + compute_budget.SetComputeUnitPrice(10_000), + cancelInstr, + } + + txn := solana.NewLegacyTransaction(subsidizer.PublicKey().ToBytes(), instructions...) + txn.SetBlockhash(selectedNonce.Blockhash) + + if err := txn.Sign( + subsidizer.PrivateKey().ToBytes(), + sourceVmConfig.Authority.PrivateKey().ToBytes(), + ); err != nil { + return nil, errors.Wrap(err, "error signing cancel swap transaction") + } + + return &txn, nil +} + +func (p *runtime) markSwapCancelling( + ctx context.Context, + swapRecord *swap.Record, + cancelNonce *transaction_util.Nonce, + cancelTxn *solana.Transaction, +) error { + return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { + err := p.validateSwapState(swapRecord, swap.StateSubmitting) + if err != nil { + return err + } + + err = p.markNonceReleasedDueToSubmittedTransaction(ctx, swapRecord) + if err != nil { + return err + } + + cancelTransactionSignature := base58.Encode(cancelTxn.Signature()) + + err = cancelNonce.MarkReservedWithSignature(ctx, cancelTransactionSignature) + if err != nil { + return err + } + + swapRecord.Nonce = cancelNonce.Account.PublicKey().ToBase58() + swapRecord.Blockhash = base58.Encode(cancelNonce.Blockhash[:]) + swapRecord.TransactionSignature = cancelTransactionSignature + swapRecord.TransactionBlob = cancelTxn.Marshal() + swapRecord.State = swap.StateCancelling + return p.data.SaveSwap(ctx, swapRecord) + }) +} + +func (p *runtime) markSwapCancelled(ctx context.Context, swapRecord *swap.Record, cancellationTxn *solana.ConfirmedTransaction) error { toMint, err := common.NewAccountFromPublicKeyString(swapRecord.ToMint) if err != nil { return err @@ -164,14 +299,28 @@ func (p *runtime) markSwapCancelled(ctx context.Context, swapRecord *swap.Record if err != nil { return err } - err = p.validateCurrencyMetadataState(destinationCurrencyMetadataRecord, currency.MetadataStateFundingAuthority, currency.MetadataStateAvailable) + err = p.validateCurrencyMetadataState( + destinationCurrencyMetadataRecord, + currency.MetadataStateFundingAuthority, + currency.MetadataStateExecutingInitialPurchase, + currency.MetadataStateAvailable, + ) + if err != nil { + return err + } + } + + var refundIntentRecord *intent.Record + var refundDepositRecord *deposit.Record + if swapRecord.State == swap.StateCancelling { + refundIntentRecord, refundDepositRecord, err = p.buildRefundRecordsForCancelledSwap(ctx, swapRecord, cancellationTxn) if err != nil { return err } } return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error { - err := p.validateSwapState(swapRecord, swap.StateCreated, swap.StateFunding, swap.StateFunded) + err := p.validateSwapState(swapRecord, swap.StateCreated, swap.StateFunding, swap.StateFunded, swap.StateCancelling) if err != nil { return err } @@ -182,10 +331,26 @@ func (p *runtime) markSwapCancelled(ctx context.Context, swapRecord *swap.Record if err != nil { return err } + case swap.StateCancelling: + err = p.markNonceReleasedDueToSubmittedTransaction(ctx, swapRecord) + if err != nil { + return err + } + + err = p.data.SaveIntent(ctx, refundIntentRecord) + if err != nil { + return err + } + + err = p.data.SaveExternalDeposit(ctx, refundDepositRecord) + if err != nil { + return err + } } if swapRecord.Kind == swap.KindReserve && !common.IsCoreMint(toMint) { - if destinationCurrencyMetadataRecord.State == currency.MetadataStateFundingAuthority { + switch destinationCurrencyMetadataRecord.State { + case currency.MetadataStateFundingAuthority, currency.MetadataStateExecutingInitialPurchase: destinationCurrencyMetadataRecord.State = currency.MetadataStateAbandoning err = p.data.SaveCurrencyMetadata(ctx, destinationCurrencyMetadataRecord) if err != nil { @@ -200,6 +365,101 @@ func (p *runtime) markSwapCancelled(ctx context.Context, swapRecord *swap.Record }) } +func (p *runtime) buildRefundRecordsForCancelledSwap(ctx context.Context, swapRecord *swap.Record, cancellationTxn *solana.ConfirmedTransaction) (*intent.Record, *deposit.Record, error) { + if cancellationTxn == nil { + return nil, nil, errors.New("cancellation transaction is required for refund records") + } + + owner, err := common.NewAccountFromPublicKeyString(swapRecord.Owner) + if err != nil { + return nil, nil, err + } + + fromMint, err := common.NewAccountFromPublicKeyString(swapRecord.FromMint) + if err != nil { + return nil, nil, err + } + + sourceVmConfig, err := common.GetVmConfigForMint(ctx, p.data, fromMint) + if err != nil { + return nil, nil, err + } + + ownerSourceTimelockVault, err := owner.ToTimelockVault(sourceVmConfig) + if err != nil { + return nil, nil, err + } + + quantity := swapRecord.SwapAmount + swapRecord.FeeAmount + + var exchangeCurrency currency_lib.Code + var nativeAmount, usdMarketValue float64 + var isReturned bool + switch swapRecord.FundingSource { + case swap.FundingSourceSubmitIntent: + fundingIntentRecord, err := p.data.GetIntent(ctx, swapRecord.FundingId) + if err != nil { + return nil, nil, err + } + if fundingIntentRecord.IntentType != intent.SendPublicPayment { + return nil, nil, errors.New("unexpected intent type") + } + exchangeCurrency = fundingIntentRecord.SendPublicPaymentMetadata.ExchangeCurrency + nativeAmount = fundingIntentRecord.SendPublicPaymentMetadata.NativeAmount + usdMarketValue = fundingIntentRecord.SendPublicPaymentMetadata.UsdMarketValue + isReturned = true + case swap.FundingSourceExternalWallet: + if !common.IsCoreMint(fromMint) { + return nil, nil, errors.New("unexpected source mint") + } + exchangeCurrency = currency_lib.USD + usdMarketValue, err = currency_util.CalculateUsdMarketValueFromTokenAmount(ctx, p.data, common.CoreMintAccount, quantity, time.Now()) + if err != nil { + return nil, nil, err + } + nativeAmount = usdMarketValue + default: + return nil, nil, errors.New("unsupported funding source") + } + + exchangeRate := currency_util.CalculateExchangeRate(fromMint, quantity, nativeAmount) + + intentRecord := &intent.Record{ + IntentId: getSwapDepositIntentID(swapRecord.TransactionSignature, ownerSourceTimelockVault), + IntentType: intent.ExternalDeposit, + + MintAccount: fromMint.PublicKey().ToBase58(), + + InitiatorOwnerAccount: owner.PublicKey().ToBase58(), + + ExternalDepositMetadata: &intent.ExternalDepositMetadata{ + DestinationTokenAccount: ownerSourceTimelockVault.PublicKey().ToBase58(), + Quantity: quantity, + ExchangeCurrency: exchangeCurrency, + ExchangeRate: exchangeRate, + NativeAmount: nativeAmount, + UsdMarketValue: usdMarketValue, + IsReturned: isReturned, + }, + + State: intent.StateConfirmed, + CreatedAt: time.Now(), + } + + depositRecord := &deposit.Record{ + Signature: swapRecord.TransactionSignature, + Destination: ownerSourceTimelockVault.PublicKey().ToBase58(), + Amount: quantity, + + Slot: cancellationTxn.Slot, + ConfirmationState: transaction.ConfirmationFinalized, + + CreatedAt: time.Now(), + } + + return intentRecord, depositRecord, nil +} + func (p *runtime) submitTransaction(ctx context.Context, record *swap.Record) error { err := p.validateSwapState(record, swap.StateSubmitting, swap.StateCancelling) if err != nil { @@ -710,6 +970,30 @@ func (p *runtime) ensureSwapDestinationIsInitialized(ctx context.Context, record return vm_util.EnsureVirtualTimelockAccountIsInitialized(ctx, p.data, destinationTimelockVault, true) } +func (p *runtime) ensureSwapSourceIsInitialized(ctx context.Context, record *swap.Record, waitForInitialization bool) error { + owner, err := common.NewAccountFromPublicKeyString(record.Owner) + if err != nil { + return err + } + + fromMint, err := common.NewAccountFromPublicKeyString(record.FromMint) + if err != nil { + return err + } + + sourceVmConfig, err := common.GetVmConfigForMint(ctx, p.data, fromMint) + if err != nil { + return err + } + + sourceTimelockVault, err := owner.ToTimelockVault(sourceVmConfig) + if err != nil { + return err + } + + return vm_util.EnsureVirtualTimelockAccountIsInitialized(ctx, p.data, sourceTimelockVault, waitForInitialization) +} + func (p *runtime) validateDestinationCurrencyReadyForSwap(ctx context.Context, swapRecord *swap.Record) (bool, error) { if swapRecord.Kind != swap.KindReserve { return true, nil diff --git a/ocp/worker/swap/worker.go b/ocp/worker/swap/worker.go index 31d6cab..ca47758 100644 --- a/ocp/worker/swap/worker.go +++ b/ocp/worker/swap/worker.go @@ -90,6 +90,8 @@ func (p *runtime) handle(ctx context.Context, record *swap.Record) error { err = p.handleStateFunded(ctx, record) case swap.StateSubmitting: err = p.handleStateSubmitting(ctx, record) + case swap.StateCancelling: + err = p.handleStateCancelling(ctx, record) } if err != nil { log.With(zap.Error(err)).Warn("failure processing swap") @@ -106,7 +108,7 @@ func (p *runtime) handleStateCreated(ctx context.Context, record *swap.Record) e // Cancel the swap if the client hasn't submitted the intent to fund the swap // within a reasonable amount of time if time.Since(record.CreatedAt) > p.conf.clientTimeoutToFund.Get(ctx) { - return p.markSwapCancelled(ctx, record) + return p.markSwapCancelled(ctx, record, nil) } return nil @@ -129,8 +131,7 @@ func (p *runtime) handleStateFunding(ctx context.Context, record *swap.Record) e case intent.StateConfirmed: return p.markSwapFunded(ctx, record) case intent.StateFailed: - // todo: Recovery flow to put back source funds into the source VM - return errors.New("funding intent failed") + return p.markSwapCancelled(ctx, record, nil) default: return nil } @@ -144,7 +145,7 @@ func (p *runtime) handleStateFunding(ctx context.Context, record *swap.Record) e if finalizedTxn != nil { if finalizedTxn.Err != nil || finalizedTxn.Meta.Err != nil { - return p.markSwapCancelled(ctx, record) + return p.markSwapCancelled(ctx, record, nil) } return p.markSwapFunded(ctx, record) } @@ -152,7 +153,7 @@ func (p *runtime) handleStateFunding(ctx context.Context, record *swap.Record) e // Cancel the swap if the external wallet funding transaction hasn't been // finalized within a reasonable amount of time if time.Since(record.CreatedAt) > p.conf.externalWalletFinalizationTimeout.Get(ctx) { - return p.markSwapCancelled(ctx, record) + return p.markSwapCancelled(ctx, record, nil) } return nil @@ -185,7 +186,7 @@ func (p *runtime) handleStateFunded(ctx context.Context, record *swap.Record) er if !isValid { // todo: Return funds if the amount was wrong - return p.markSwapCancelled(ctx, record) + return p.markSwapCancelled(ctx, record, nil) } err = p.ensureSwapDestinationIsInitialized(ctx, record) @@ -220,8 +221,7 @@ func (p *runtime) handleStateSubmitting(ctx context.Context, record *swap.Record if finalizedTxn != nil { if finalizedTxn.Err != nil || finalizedTxn.Meta.Err != nil { - // todo: Recovery flow to put back source funds into the source VM - return p.markSwapFailed(ctx, record) + return p.autoRecoverSwapFunds(ctx, record) } else { tokenBalances, err := p.data.GetBlockchainTransactionTokenBalances(ctx, record.TransactionSignature) if err != nil { @@ -255,3 +255,27 @@ func (p *runtime) handleStateSubmitting(ctx context.Context, record *swap.Record return p.submitTransaction(ctx, record) } + +func (p *runtime) handleStateCancelling(ctx context.Context, record *swap.Record) error { + if err := p.validateSwapState(record, swap.StateCancelling); err != nil { + return err + } + + finalizedTxn, err := p.data.GetBlockchainTransaction(ctx, record.TransactionSignature, solana.CommitmentFinalized) + if err != nil && err != solana.ErrSignatureNotFound { + return errors.Wrap(err, "error getting finalized cancel transaction") + } + + if finalizedTxn != nil { + if finalizedTxn.Err != nil || finalizedTxn.Meta.Err != nil { + return p.markSwapFailed(ctx, record) + } + return p.markSwapCancelled(ctx, record, finalizedTxn) + } + + if err := p.ensureSwapSourceIsInitialized(ctx, record, true); err != nil { + return errors.Wrap(err, "error ensuring swap source is initialized") + } + + return p.submitTransaction(ctx, record) +}