From dd0ab04ba40b727d1f7a0d1b0b7e2fefe5720188 Mon Sep 17 00:00:00 2001 From: Marcelo Politzer <251334+mpolitzer@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:48:55 -0300 Subject: [PATCH 1/3] chore(contracts): bump contracts to version 2.2.0 and dave to 2.1.1 --- Makefile | 8 +++---- pkg/contracts/iconsensus/iconsensus.go | 33 +++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 8f0c7b633..715f41320 100644 --- a/Makefile +++ b/Makefile @@ -18,14 +18,14 @@ TARGET_OS?=$(shell uname) export TARGET_OS ROLLUPS_NODE_VERSION := 2.0.0-alpha.9 -ROLLUPS_CONTRACTS_VERSION := 2.1.1 +ROLLUPS_CONTRACTS_VERSION := 2.2.0 ROLLUPS_CONTRACTS_URL:=https://github.com/cartesi/rollups-contracts/releases/download/ ROLLUPS_CONTRACTS_ARTIFACT:=rollups-contracts-$(ROLLUPS_CONTRACTS_VERSION)-artifacts.tar.gz -ROLLUPS_CONTRACTS_SHA256:=2e7a105d656de2adafad6439a5ff00f35b997aaf27972bd1becc33dea8817861 -ROLLUPS_PRT_CONTRACTS_VERSION := 2.1.0 +ROLLUPS_CONTRACTS_SHA256:=31c20a8c50f794185957ebd6e554fc99c8e01f0fdf9a80628d031fb0edc7091d +ROLLUPS_PRT_CONTRACTS_VERSION := 2.1.1 ROLLUPS_PRT_CONTRACTS_URL:=https://github.com/cartesi/dave/releases/download/ ROLLUPS_PRT_CONTRACTS_ARTIFACT:=cartesi-rollups-prt-$(ROLLUPS_PRT_CONTRACTS_VERSION)-contract-artifacts.tar.gz -ROLLUPS_PRT_CONTRACTS_SHA256:=f1d918676a06c1cbc4be429a5e1750de1fb873e1c20f4923d1313767d1ab7312 +ROLLUPS_PRT_CONTRACTS_SHA256:=830815bcd858a67b73738c6747030960d88ed1e2e0b123086f2112b1ff47f7c9 IMAGE_TAG ?= devel diff --git a/pkg/contracts/iconsensus/iconsensus.go b/pkg/contracts/iconsensus/iconsensus.go index 3b92ebe5b..d0071cdd2 100644 --- a/pkg/contracts/iconsensus/iconsensus.go +++ b/pkg/contracts/iconsensus/iconsensus.go @@ -31,7 +31,7 @@ var ( // IConsensusMetaData contains all meta data concerning the IConsensus contract. var IConsensusMetaData = &bind.MetaData{ - ABI: "[{\"type\":\"function\",\"name\":\"getEpochLength\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"getNumberOfAcceptedClaims\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"isOutputsMerkleRootValid\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"submitClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[],\"stateMutability\":\"nonpayable\"},{\"type\":\"function\",\"name\":\"supportsInterface\",\"inputs\":[{\"name\":\"interfaceId\",\"type\":\"bytes4\",\"internalType\":\"bytes4\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"event\",\"name\":\"ClaimAccepted\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"event\",\"name\":\"ClaimSubmitted\",\"inputs\":[{\"name\":\"submitter\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"error\",\"name\":\"NotEpochFinalBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"epochLength\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotFirstClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotPastBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"currentBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]}]", + ABI: "[{\"type\":\"function\",\"name\":\"getEpochLength\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"getNumberOfAcceptedClaims\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"getNumberOfSubmittedClaims\",\"inputs\":[],\"outputs\":[{\"name\":\"\",\"type\":\"uint256\",\"internalType\":\"uint256\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"isOutputsMerkleRootValid\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"function\",\"name\":\"submitClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"internalType\":\"bytes32\"}],\"outputs\":[],\"stateMutability\":\"nonpayable\"},{\"type\":\"function\",\"name\":\"supportsInterface\",\"inputs\":[{\"name\":\"interfaceId\",\"type\":\"bytes4\",\"internalType\":\"bytes4\"}],\"outputs\":[{\"name\":\"\",\"type\":\"bool\",\"internalType\":\"bool\"}],\"stateMutability\":\"view\"},{\"type\":\"event\",\"name\":\"ClaimAccepted\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"event\",\"name\":\"ClaimSubmitted\",\"inputs\":[{\"name\":\"submitter\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"appContract\",\"type\":\"address\",\"indexed\":true,\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"indexed\":false,\"internalType\":\"uint256\"},{\"name\":\"outputsMerkleRoot\",\"type\":\"bytes32\",\"indexed\":false,\"internalType\":\"bytes32\"}],\"anonymous\":false},{\"type\":\"error\",\"name\":\"NotEpochFinalBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"epochLength\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotFirstClaim\",\"inputs\":[{\"name\":\"appContract\",\"type\":\"address\",\"internalType\":\"address\"},{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]},{\"type\":\"error\",\"name\":\"NotPastBlock\",\"inputs\":[{\"name\":\"lastProcessedBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"},{\"name\":\"currentBlockNumber\",\"type\":\"uint256\",\"internalType\":\"uint256\"}]}]", } // IConsensusABI is the input ABI used to generate the binding from. @@ -242,6 +242,37 @@ func (_IConsensus *IConsensusCallerSession) GetNumberOfAcceptedClaims() (*big.In return _IConsensus.Contract.GetNumberOfAcceptedClaims(&_IConsensus.CallOpts) } +// GetNumberOfSubmittedClaims is a free data retrieval call binding the contract method 0xee5e0faa. +// +// Solidity: function getNumberOfSubmittedClaims() view returns(uint256) +func (_IConsensus *IConsensusCaller) GetNumberOfSubmittedClaims(opts *bind.CallOpts) (*big.Int, error) { + var out []interface{} + err := _IConsensus.contract.Call(opts, &out, "getNumberOfSubmittedClaims") + + if err != nil { + return *new(*big.Int), err + } + + out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) + + return out0, err + +} + +// GetNumberOfSubmittedClaims is a free data retrieval call binding the contract method 0xee5e0faa. +// +// Solidity: function getNumberOfSubmittedClaims() view returns(uint256) +func (_IConsensus *IConsensusSession) GetNumberOfSubmittedClaims() (*big.Int, error) { + return _IConsensus.Contract.GetNumberOfSubmittedClaims(&_IConsensus.CallOpts) +} + +// GetNumberOfSubmittedClaims is a free data retrieval call binding the contract method 0xee5e0faa. +// +// Solidity: function getNumberOfSubmittedClaims() view returns(uint256) +func (_IConsensus *IConsensusCallerSession) GetNumberOfSubmittedClaims() (*big.Int, error) { + return _IConsensus.Contract.GetNumberOfSubmittedClaims(&_IConsensus.CallOpts) +} + // IsOutputsMerkleRootValid is a free data retrieval call binding the contract method 0xe5cc8664. // // Solidity: function isOutputsMerkleRootValid(address appContract, bytes32 outputsMerkleRoot) view returns(bool) From 210463cf7cd25220af60dcb57cc9bcbb37a672f5 Mon Sep 17 00:00:00 2001 From: Marcelo Politzer <251334+mpolitzer@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:00:50 -0300 Subject: [PATCH 2/3] refactor(claimer): replace ChunkedFilterLogs with FindTransitions binary search --- internal/claimer/blockchain.go | 246 ++++++++++++++----------------- internal/claimer/claimer.go | 133 +++++++++++------ internal/claimer/claimer_test.go | 24 ++- internal/claimer/service.go | 39 ++--- 4 files changed, 239 insertions(+), 203 deletions(-) diff --git a/internal/claimer/blockchain.go b/internal/claimer/blockchain.go index e148460dd..4618f80b0 100644 --- a/internal/claimer/blockchain.go +++ b/internal/claimer/blockchain.go @@ -6,7 +6,6 @@ package claimer import ( "context" "fmt" - "iter" "log/slog" "math/big" @@ -15,8 +14,6 @@ import ( "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/ethutil" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -61,7 +58,7 @@ type iclaimerBlockchain interface { error, ) - getBlockNumber(ctx context.Context) (*big.Int, error) + getDefaultBlockNumber(ctx context.Context) (*big.Int, error) getConsensusAddress( ctx context.Context, @@ -77,24 +74,27 @@ type claimerBlockchain struct { defaultBlock config.DefaultBlock } -func (self *claimerBlockchain) submitClaimToBlockchain( +func (cb *claimerBlockchain) submitClaimToBlockchain( ic *iconsensus.IConsensus, application *model.Application, epoch *model.Epoch, ) (common.Hash, error) { txHash := common.Hash{} + if cb.txOpts == nil { + return txHash, fmt.Errorf("txOpts is required for claim submission") + } lastBlockNumber := new(big.Int).SetUint64(epoch.LastBlock) - tx, err := ic.SubmitClaim(self.txOpts, application.IApplicationAddress, + tx, err := ic.SubmitClaim(cb.txOpts, application.IApplicationAddress, lastBlockNumber, *epoch.OutputsMerkleRoot) if err != nil { - self.logger.Error("submitClaimToBlockchain:failed", + cb.logger.Error("submitClaimToBlockchain:failed", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, "error", err) } else { txHash = tx.Hash() - self.logger.Debug("submitClaimToBlockchain:success", + cb.logger.Debug("submitClaimToBlockchain:success", "appContractAddress", application.IApplicationAddress, "claimHash", *epoch.OutputsMerkleRoot, "last_block", epoch.LastBlock, @@ -103,25 +103,54 @@ func (self *claimerBlockchain) submitClaimToBlockchain( return txHash, err } -func unwrapClaimSubmitted( - ic *iconsensus.IConsensus, - pull func() (log *types.Log, err error, ok bool), +type EventIterator interface { + Next() bool + Close() error + Error() error +} + +func newOracle( + nr func(*bind.CallOpts) (*big.Int, error), ) ( - *iconsensus.IConsensusClaimSubmitted, - bool, - error, + func(ctx context.Context, block uint64) (*big.Int, error), +) { + return func(ctx context.Context, block uint64) (*big.Int, error) { + return nr(&bind.CallOpts{ + Context: ctx, + BlockNumber: new(big.Int).SetUint64(block), + }) + } +} + +func newOnHit[IT EventIterator]( + ctx context.Context, + address common.Address, + filter func (*bind.FilterOpts, []common.Address, []common.Address) (IT, error), + onEvent func(IT), +) ( + func(block uint64) error, ) { - log, err, ok := pull() - if !ok || err != nil { - return nil, false, err + return func(block uint64) error { + filterOpts := &bind.FilterOpts{ + Context: ctx, + Start: block, + End: &block, + } + it, err := filter(filterOpts, nil, []common.Address{address}) + if err != nil { + return err + } + defer it.Close() + for it.Next() { + onEvent(it) + } + return it.Error() } - ev, err := ic.ParseClaimSubmitted(*log) - return ev, true, err } // scan the event stream for a claimSubmitted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( +func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -132,78 +161,42 @@ func (self *claimerBlockchain) findClaimSubmittedEventAndSucc( *iconsensus.IConsensusClaimSubmitted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } - - // filter must match: - // - `ClaimSubmitted` events - // - submitter == nil (any) - // - appContract == claim.IApplicationAddress - c, err := iconsensus.IConsensusMetaData.GetAbi() - topics, err := abi.MakeTopics( - []any{c.Events[model.MonitoredEvent_ClaimSubmitted.String()].ID}, - nil, - []any{application.IApplicationAddress}, + oracle := newOracle(ic.GetNumberOfSubmittedClaims) + events := []*iconsensus.IConsensusClaimSubmitted{} + onHit := newOnHit(ctx, application.IApplicationAddress, ic.FilterClaimSubmitted, + func(it *iconsensus.IConsensusClaimSubmittedIterator) { + event := it.Event + if (len(events) == 0) || claimSubmittedEventMatches(application, epoch, event) { + events = append(events, event) + } + }, ) - if err != nil { - return nil, nil, nil, err - } - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(epoch.LastBlock), - ToBlock: endBlock, - Addresses: []common.Address{application.IConsensusAddress}, - Topics: topics, - }) + numSubmittedClaims, err := oracle(ctx, epoch.LastBlock) if err != nil { return nil, nil, nil, err } - - // pull events instead of iterating - next, stop := iter.Pull2(it) - defer stop() - for { - event, ok, err := unwrapClaimSubmitted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - lastBlock := event.LastProcessedBlockNumber.Uint64() - - if claimSubmittedEventMatches(application, epoch, event) { - // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimSubmitted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - return ic, event, succ, err - } else if lastBlock > epoch.LastBlock { - err = fmt.Errorf("No matching claim, searched up to %v", event) - return nil, nil, nil, err - } + _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numSubmittedClaims, oracle, onHit) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to walk ClaimSubmitted transitions: %w", err) } -} -func unwrapClaimAccepted( - ic *iconsensus.IConsensus, - pull func() (log *types.Log, err error, ok bool), -) ( - *iconsensus.IConsensusClaimAccepted, - bool, - error, -) { - log, err, ok := pull() - if !ok || err != nil { - return nil, false, err + if len(events) == 0 { + return ic, nil, nil, nil + } else if len(events) == 1 { + return ic, events[0], nil, nil + } else { + return ic, events[0], events[1], nil } - ev, err := ic.ParseClaimAccepted(*log) - return ev, true, err } // scan the event stream for a claimAccepted event that matches claim. // return this event and its successor -func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( +func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, @@ -214,91 +207,78 @@ func (self *claimerBlockchain) findClaimAcceptedEventAndSucc( *iconsensus.IConsensusClaimAccepted, error, ) { - ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, self.client) + ic, err := iconsensus.NewIConsensus(application.IConsensusAddress, cb.client) if err != nil { return nil, nil, nil, err } - // filter must match: - // - `ClaimAccepted` events - // - appContract == claim.IApplicationAddress - c, err := iconsensus.IConsensusMetaData.GetAbi() - topics, err := abi.MakeTopics( - []any{c.Events[model.MonitoredEvent_ClaimAccepted.String()].ID}, - []any{application.IApplicationAddress}, + oracle := newOracle(ic.GetNumberOfAcceptedClaims) + events := []*iconsensus.IConsensusClaimAccepted{} + filter := func( + opts *bind.FilterOpts, + _ []common.Address, + appContract []common.Address, + ) (*iconsensus.IConsensusClaimAcceptedIterator, error) { + return ic.FilterClaimAccepted(opts, appContract) + } + onHit := newOnHit(ctx, application.IApplicationAddress, filter, + func(it *iconsensus.IConsensusClaimAcceptedIterator) { + event := it.Event + if (len(events) == 0) || claimAcceptedEventMatches(application, epoch, event) { + events = append(events, event) + } + }, ) + + numAcceptedClaims, err := oracle(ctx, epoch.LastBlock) if err != nil { return nil, nil, nil, err } - - it, err := self.filter.ChunkedFilterLogs(ctx, self.client, ethereum.FilterQuery{ - FromBlock: new(big.Int).SetUint64(epoch.LastBlock), - ToBlock: endBlock, - Addresses: []common.Address{application.IConsensusAddress}, - Topics: topics, - }) + _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numAcceptedClaims, oracle, onHit) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fmt.Errorf("failed to walk ClaimAccepted transitions: %w", err) } - // pull events instead of iterating - next, stop := iter.Pull2(it) - defer stop() - for { - event, ok, err := unwrapClaimAccepted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - lastBlock := event.LastProcessedBlockNumber.Uint64() - - if claimAcceptedEventMatches(application, epoch, event) { - // found the event, does it has a successor? try to fetch it - succ, ok, err := unwrapClaimAccepted(ic, next) - if !ok || err != nil { - return ic, event, nil, err - } - return ic, event, succ, err - } else if lastBlock > epoch.LastBlock { - err = fmt.Errorf("No matching claim, searched up to %v", event) - return nil, nil, nil, err - } + if len(events) == 0 { + return ic, nil, nil, nil + } else if len(events) == 1 { + return ic, events[0], nil, nil + } else { + return ic, events[0], events[1], nil } } -func (self *claimerBlockchain) getConsensusAddress( +func (cb *claimerBlockchain) getConsensusAddress( ctx context.Context, app *model.Application, ) (common.Address, error) { - return ethutil.GetConsensus(ctx, self.client, app.IApplicationAddress) + return ethutil.GetConsensus(ctx, cb.client, app.IApplicationAddress) } -/* poll a transaction hash for its submission status and receipt */ -func (self *claimerBlockchain) pollTransaction( +// poll a transaction for its receipt +func (cb *claimerBlockchain) pollTransaction( ctx context.Context, txHash common.Hash, - endBlock *big.Int, + endBlockNumber *big.Int, ) (bool, *types.Receipt, error) { - _, isPending, err := self.client.TransactionByHash(ctx, txHash) - if err != nil || isPending { - return false, nil, err - } - - receipt, err := self.client.TransactionReceipt(ctx, txHash) + receipt, err := cb.client.TransactionReceipt(ctx, txHash) if err != nil { return false, nil, err } - if receipt.BlockNumber.Cmp(endBlock) >= 0 { - return false, receipt, err + // receipt must be committed before use. Return false until it is. + if receipt.BlockNumber.Cmp(endBlockNumber) >= 0 { + return false, nil, nil } - return receipt.Status == 1, receipt, err + return receipt.Status == 1, receipt, nil } -/* Retrieve the block number of "DefaultBlock" */ -func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, error) { +/* Retrieve the block number for the configured commitment level in ethereum terms, + * that is: `latest`, `safe`, `finalized`, etc. Which may be many blocks behind. */ +func (cb *claimerBlockchain) getDefaultBlockNumber(ctx context.Context) (*big.Int, error) { var nr int64 - switch self.defaultBlock { + switch cb.defaultBlock { case model.DefaultBlock_Pending: nr = rpc.PendingBlockNumber.Int64() case model.DefaultBlock_Latest: @@ -308,10 +288,10 @@ func (self *claimerBlockchain) getBlockNumber(ctx context.Context) (*big.Int, er case model.DefaultBlock_Safe: nr = rpc.SafeBlockNumber.Int64() default: - return nil, fmt.Errorf("default block '%v' not supported", self.defaultBlock) + return nil, fmt.Errorf("default block '%v' not supported", cb.defaultBlock) } - hdr, err := self.client.HeaderByNumber(ctx, big.NewInt(nr)) + hdr, err := cb.client.HeaderByNumber(ctx, big.NewInt(nr)) if err != nil { return nil, err } diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index 5e9d3f3db..748660a28 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -51,9 +51,9 @@ import ( ) var ( - ErrClaimMismatch = fmt.Errorf("Claim and antecessor mismatch") - ErrEventMismatch = fmt.Errorf("Computed Claim mismatches ClaimSubmitted event") - ErrMissingEvent = fmt.Errorf("Accepted claim has no matching blockchain event") + ErrClaimMismatch = fmt.Errorf("constraints failed for epoch claim and its successor.") + ErrEventMismatch = fmt.Errorf("epoch claim does not match its corresponding event.") + ErrMissingEvent = fmt.Errorf("epoch claim does not have a corresponding event.") ) type iclaimerRepository interface { @@ -97,7 +97,7 @@ type iclaimerRepository interface { LoadNodeConfigRaw(ctx context.Context, key string) (rawJSON []byte, createdAt, updatedAt time.Time, err error) } -/* transition claims from computed to submitted */ +// transition epoch claims from computed to submitted. func (s *Service) submitClaimsAndUpdateDatabase( acceptedOrSubmittedEpochs map[int64]*model.Epoch, computedEpochs map[int64]*model.Epoch, @@ -107,7 +107,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( errs := []error{} var err error - // check claims in flight + // check claims in flight. NOTE: map mutation + iteration is safe in Go for key, txHash := range s.claimsInFlight { ready, receipt, err := s.blockchain.pollTransaction(s.Context, txHash, endBlock) if err != nil { @@ -128,12 +128,22 @@ func (s *Service) submitClaimsAndUpdateDatabase( computedEpoch.Index, receipt.TxHash, ) + + // NOTE: there is no point in trying the other claims on a database error + // so we just return and try again on the next tick if err != nil { errs = append(errs, err) return errs } + + // we expect apps[key] to always exist, + // but guard its use behind this `if` to ensure there is no panic if we are wrong. + appAddress := common.Address{} + if app, ok := apps[key]; ok { + appAddress = app.IApplicationAddress + } s.Logger.Info("Claim submitted", - "app", apps[key].IApplicationAddress, + "app", appAddress, "receipt_block_number", receipt.BlockNumber, "claim_hash", fmt.Sprintf("%x", computedEpoch.OutputsMerkleRoot), "last_block", computedEpoch.LastBlock, @@ -146,7 +156,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( delete(s.claimsInFlight, key) } - // check computed epochs + // check computed epochs. NOTE: map mutation + iteration is safe in Go for key, currEpoch := range computedEpochs { var ic *iconsensus.IConsensus var prevClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted @@ -163,7 +173,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } if previousEpochExists { err := checkEpochSequenceConstraint(prevEpoch, currEpoch) @@ -180,7 +190,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } // the previous epoch must have a matching claim submission event. @@ -190,7 +200,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } if prevClaimSubmissionEvent == nil { err = s.setApplicationInoperable( @@ -203,7 +213,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } if !claimSubmittedEventMatches(app, prevEpoch, prevClaimSubmissionEvent) { s.Logger.Error("event mismatch", @@ -218,11 +228,11 @@ func (s *Service) submitClaimsAndUpdateDatabase( currEpoch.Index, prevEpoch.Index, prevEpoch.VirtualIndex, - prevClaimSubmissionEvent, + prevClaimSubmissionEvent.Raw.TxHash, ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } } else { // first claim @@ -231,7 +241,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } } @@ -250,7 +260,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } s.Logger.Debug("Updating claim status to submitted", "app", app.IApplicationAddress, @@ -267,7 +277,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } delete(s.claimsInFlight, key) s.Logger.Info("Claim previously submitted", @@ -283,7 +293,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( "claim_hash", fmt.Sprintf("%x", prevEpoch.OutputsMerkleRoot), "last_block", prevEpoch.LastBlock, ) - goto nextApp + continue } s.Logger.Debug("Submitting claim to blockchain", "app", app.IApplicationAddress, @@ -294,7 +304,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( if err != nil { delete(computedEpochs, key) errs = append(errs, err) - goto nextApp + continue } s.claimsInFlight[key] = txHash } else { @@ -305,12 +315,11 @@ func (s *Service) submitClaimsAndUpdateDatabase( ) } - nextApp: } return errs } -/* transition claims from submitted to accepted */ +// transition claims from submitted to accepted func (s *Service) acceptClaimsAndUpdateDatabase( acceptedEpochs map[int64]*model.Epoch, submittedEpochs map[int64]*model.Epoch, @@ -320,7 +329,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( errs := []error{} var err error - // check submitted claims + // check submitted epochs. NOTE: map mutation + iteration is safe in Go for key, submittedEpoch := range submittedEpochs { var prevEvent *iconsensus.IConsensusClaimAccepted var currEvent *iconsensus.IConsensusClaimAccepted @@ -331,50 +340,64 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err := s.checkConsensusForAddressChange(app); err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue } if prevExists { err := checkEpochSequenceConstraint(acceptedEpoch, submittedEpoch) if err != nil { - s.Logger.Error("Database mismatch on epochs.", + s.Logger.Error("epoch sequence check failed.", "app", app.IApplicationAddress, "previous_epoch_index", acceptedEpoch.Index, "current_epoch_index", submittedEpoch.Index, "err", err, ) + err = s.setApplicationInoperable( + s.Context, + app, + "%v. application: %v", + err, + app.IApplicationAddress, + ) delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue } - // if prevClaimRow exists, there must be a matching event + // if an epoch was accepted, there must exist a matching event for it _, prevEvent, currEvent, err = s.blockchain.findClaimAcceptedEventAndSucc(s.Context, app, acceptedEpoch, endBlock) if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue } if prevEvent == nil { - s.Logger.Error("Missing event", + s.Logger.Error("accepted epoch has no matching event", "app", app.IApplicationAddress, "claim", acceptedEpoch, "err", ErrMissingEvent, ) delete(submittedEpochs, key) errs = append(errs, ErrMissingEvent) - goto nextApp + continue } if !claimAcceptedEventMatches(app, acceptedEpoch, prevEvent) { - s.Logger.Error("Event mismatch", + s.Logger.Error("accepted epoch does not match event", "app", app.IApplicationAddress, "claim", acceptedEpoch, "event", prevEvent, "err", ErrEventMismatch, ) + err = s.setApplicationInoperable( + s.Context, + app, + "epoch: %v does not match event with tx_hash: %v", + acceptedEpoch.Index, + prevEvent.Raw.TxHash, + ) delete(submittedEpochs, key) - errs = append(errs, ErrEventMismatch) - goto nextApp + errs = append(errs, err) + continue } } else { // first claim @@ -383,7 +406,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue } } @@ -399,9 +422,16 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "event", currEvent, "err", ErrEventMismatch, ) + err := s.setApplicationInoperable( + s.Context, + app, + "event mismatch for epoch %v, event tx_hash: %v", + acceptedEpoch.Index, + prevEvent.Raw.TxHash, + ) delete(submittedEpochs, key) - errs = append(errs, ErrEventMismatch) - goto nextApp + errs = append(errs, err) + continue } s.Logger.Debug("Updating claim status to accepted", "app", app.IApplicationAddress, @@ -413,7 +443,7 @@ func (s *Service) acceptClaimsAndUpdateDatabase( if err != nil { delete(submittedEpochs, key) errs = append(errs, err) - goto nextApp + continue } s.Logger.Info("Claim accepted", "app", currEvent.AppContract, @@ -423,7 +453,6 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "tx", txHash, ) } - nextApp: } return errs } @@ -456,17 +485,25 @@ func (s *Service) checkConsensusForAddressChange( return nil } -func checkEpochConstraint(c *model.Epoch) error { - if c.FirstBlock > c.LastBlock { - return fmt.Errorf("unexpected epoch state. first_block: %v > last_block: %v", c.FirstBlock, c.LastBlock) +func checkEpochConstraint(epoch *model.Epoch) error { + if epoch.FirstBlock > epoch.LastBlock { + return fmt.Errorf("unexpected epoch state. first_block: %v > last_block: %v", + epoch.FirstBlock, epoch.LastBlock) } - if c.Status == model.EpochStatus_ClaimSubmitted { - if c.OutputsMerkleRoot == nil { - return fmt.Errorf("unexpected epoch state. missing claim_hash.") + + mustHaveOutputsMerkleRoot := epoch.Status == model.EpochStatus_ClaimSubmitted || + epoch.Status == model.EpochStatus_ClaimAccepted || + epoch.Status == model.EpochStatus_ClaimComputed + if mustHaveOutputsMerkleRoot { + if epoch.OutputsMerkleRoot == nil { + return fmt.Errorf("unexpected epoch state. missing outputs_merkle_root.") } } - if c.Status == model.EpochStatus_ClaimAccepted || c.Status == model.EpochStatus_ClaimSubmitted { - if c.ClaimTransactionHash == nil { + + mustHaveClaimTransactionHash := epoch.Status == model.EpochStatus_ClaimSubmitted || + epoch.Status == model.EpochStatus_ClaimAccepted + if mustHaveClaimTransactionHash { + if epoch.ClaimTransactionHash == nil { return fmt.Errorf("unexpected epoch state. missing claim_transaction_hash.") } } @@ -498,13 +535,21 @@ func checkEpochSequenceConstraint(prevEpoch *model.Epoch, currEpoch *model.Epoch } func claimSubmittedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimSubmitted) bool { + if application == nil || epoch == nil || event == nil { + return false + } return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } func claimAcceptedEventMatches(application *model.Application, epoch *model.Epoch, event *iconsensus.IConsensusClaimAccepted) bool { + if application == nil || epoch == nil || event == nil { + return false + } return application.IApplicationAddress == event.AppContract && + epoch.OutputsMerkleRoot != nil && *epoch.OutputsMerkleRoot == event.OutputsMerkleRoot && epoch.LastBlock == event.LastProcessedBlockNumber.Uint64() } diff --git a/internal/claimer/claimer_test.go b/internal/claimer/claimer_test.go index 620650711..c782ccef0 100644 --- a/internal/claimer/claimer_test.go +++ b/internal/claimer/claimer_test.go @@ -158,7 +158,7 @@ func (m *claimerBlockchainMock) pollTransaction( args.Get(1).(*types.Receipt), args.Error(2) } -func (m *claimerBlockchainMock) getBlockNumber(ctx context.Context) (*big.Int, error) { +func (m *claimerBlockchainMock) getDefaultBlockNumber(ctx context.Context) (*big.Int, error) { args := m.Called(ctx) return args.Get(0).(*big.Int), args.Error(1) @@ -570,7 +570,7 @@ func TestSubmitClaimWithAntecessorMismatch(t *testing.T) { b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil). Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil). Once() @@ -595,7 +595,7 @@ func TestSubmitClaimWithEventMismatch(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, wrongEvent, nil) - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil) errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) @@ -614,7 +614,7 @@ func TestSubmitClaimWithAntecessorOutOfOrder(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil) errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) @@ -637,7 +637,7 @@ func TestErrSubmittedMissingEvent(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil) errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) @@ -658,7 +658,7 @@ func TestConsensusAddressChangedOnSubmittedClaims(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(wrongConsensusAddress, nil). Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil). Once() @@ -735,6 +735,9 @@ func TestAcceptClaimWithAntecessorMismatch(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). + Return(nil). + Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -758,6 +761,9 @@ func TestAcceptClaimWithEventMismatch(t *testing.T) { Return(app.IConsensusAddress, nil).Once() b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). Return(&iconsensus.IConsensus{}, prevEvent, wrongEvent, nil) + r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). + Return(nil). + Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -775,6 +781,10 @@ func TestAcceptClaimWithAntecessorOutOfOrder(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() + r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). + Return(nil). + Once() + errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(wrongEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) assert.Equal(t, 1, len(errs)) @@ -840,7 +850,7 @@ func TestConsensusAddressChangedOnAcceptedClaims(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(wrongConsensusAddress, nil). Once() - r.On("UpdateApplicationState", nil, int64(0), model.ApplicationState_Inoperable, mock.Anything). + r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). Return(nil). Once() diff --git a/internal/claimer/service.go b/internal/claimer/service.go index 0efa3c6f4..ded670c87 100644 --- a/internal/claimer/service.go +++ b/internal/claimer/service.go @@ -8,13 +8,11 @@ import ( "errors" "fmt" "log/slog" - "math/big" "github.com/cartesi/rollups-node/internal/config" "github.com/cartesi/rollups-node/internal/config/auth" - . "github.com/cartesi/rollups-node/internal/model" + "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/internal/repository" - "github.com/cartesi/rollups-node/pkg/ethutil" "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -34,22 +32,31 @@ type CreateInfo struct { type Service struct { service.Service - repository iclaimerRepository - blockchain iclaimerBlockchain - claimsInFlight map[int64]common.Hash // application.ID -> txHash + repository iclaimerRepository + blockchain iclaimerBlockchain + + // submitted claims waiting for confirmation from the blockchain. + // only accessed from tick, so no need for a lock + // contains: application ID -> transaction hash, with a maximum of one + // key per application due to the epoch advancement logic. + claimsInFlight map[int64]common.Hash submissionEnabled bool } const ClaimerConfigKey = "claimer" type PersistentConfig struct { - DefaultBlock DefaultBlock + DefaultBlock model.DefaultBlock ClaimSubmissionEnabled bool ChainID uint64 } func Create(ctx context.Context, c *CreateInfo) (*Service, error) { var err error + + if c == nil { + return nil, errors.New("invalid CreateInfo is nil") + } if err = ctx.Err(); err != nil { return nil, err // This returns context.Canceled or context.DeadlineExceeded. } @@ -102,12 +109,6 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) { client: c.EthConn, txOpts: txOpts, defaultBlock: c.Config.BlockchainDefaultBlock, - - filter: ethutil.Filter{ - MinChunkSize: ethutil.DefaultMinChunkSize, - MaxChunkSize: new(big.Int).SetUint64(c.Config.BlockchainMaxBlockRange), - Logger: s.Logger, - }, } return s, nil @@ -129,6 +130,7 @@ func (s *Service) Stop(bool) []error { return nil } +// NOTE: tick is not re-entrant! func (s *Service) Tick() []error { errs := []error{} @@ -157,16 +159,15 @@ func (s *Service) Tick() []error { return nil } - // we have claims to check - // get the latest/safe/finalized, etc. block - endBlock, err := s.blockchain.getBlockNumber(s.Context) + // we have claims to check. Get the latest/safe/finalized, etc. block + defaultBlockNumber, err := s.blockchain.getDefaultBlockNumber(s.Context) if err != nil { errs = append(errs, err) return errs } - errs = append(errs, s.submitClaimsAndUpdateDatabase(acceptedOrSubmittedEpochs, computedEpochs, computedApps, endBlock)...) - errs = append(errs, s.acceptClaimsAndUpdateDatabase(acceptedEpochs, submittedEpochs, submittedApps, endBlock)...) + errs = append(errs, s.submitClaimsAndUpdateDatabase(acceptedOrSubmittedEpochs, computedEpochs, computedApps, defaultBlockNumber)...) + errs = append(errs, s.acceptClaimsAndUpdateDatabase(acceptedEpochs, submittedEpochs, submittedApps, defaultBlockNumber)...) return errs } @@ -178,7 +179,7 @@ func setupPersistentConfig( ) (*PersistentConfig, error) { config, err := repository.LoadNodeConfig[PersistentConfig](ctx, repo, ClaimerConfigKey) if config == nil && errors.Is(err, repository.ErrNotFound) { - nc := NodeConfig[PersistentConfig]{ + nc := model.NodeConfig[PersistentConfig]{ Key: ClaimerConfigKey, Value: PersistentConfig{ DefaultBlock: c.BlockchainDefaultBlock, From a5f508a8a1cd0d0740d040d9f4702533f141e963 Mon Sep 17 00:00:00 2001 From: Marcelo Politzer <251334+mpolitzer@users.noreply.github.com> Date: Wed, 11 Mar 2026 14:16:09 -0300 Subject: [PATCH 3/3] feat: add last block checked for submitted and accepted claims --- internal/claimer/blockchain.go | 41 +- internal/claimer/claimer.go | 469 +++++++++++------- internal/claimer/claimer_test.go | 112 +++-- internal/model/models.go | 110 ++-- internal/repository/postgres/application.go | 20 +- internal/repository/postgres/claimer.go | 4 + .../db/rollupsdb/public/table/application.go | 132 ++--- .../000001_create_initial_schema.up.sql | 2 + .../repotest/application_test_cases.go | 25 +- 9 files changed, 546 insertions(+), 369 deletions(-) diff --git a/internal/claimer/blockchain.go b/internal/claimer/blockchain.go index 4618f80b0..bd9f1309b 100644 --- a/internal/claimer/blockchain.go +++ b/internal/claimer/blockchain.go @@ -26,7 +26,8 @@ type iclaimerBlockchain interface { ctx context.Context, application *model.Application, epoch *model.Epoch, - endBlock *big.Int, + fromBlock uint64, + toBlock uint64, ) ( *iconsensus.IConsensus, *iconsensus.IConsensusClaimSubmitted, @@ -50,7 +51,8 @@ type iclaimerBlockchain interface { ctx context.Context, application *model.Application, epoch *model.Epoch, - endBlock *big.Int, + fromBlock uint64, + toBlock uint64, ) ( *iconsensus.IConsensus, *iconsensus.IConsensusClaimAccepted, @@ -70,7 +72,6 @@ type claimerBlockchain struct { client *ethclient.Client txOpts *bind.TransactOpts logger *slog.Logger - filter ethutil.Filter defaultBlock config.DefaultBlock } @@ -103,7 +104,7 @@ func (cb *claimerBlockchain) submitClaimToBlockchain( return txHash, err } -type EventIterator interface { +type eventIterator interface { Next() bool Close() error Error() error @@ -111,9 +112,7 @@ type EventIterator interface { func newOracle( nr func(*bind.CallOpts) (*big.Int, error), -) ( - func(ctx context.Context, block uint64) (*big.Int, error), -) { +) func(ctx context.Context, block uint64) (*big.Int, error) { return func(ctx context.Context, block uint64) (*big.Int, error) { return nr(&bind.CallOpts{ Context: ctx, @@ -122,14 +121,12 @@ func newOracle( } } -func newOnHit[IT EventIterator]( +func newOnHit[IT eventIterator]( ctx context.Context, address common.Address, - filter func (*bind.FilterOpts, []common.Address, []common.Address) (IT, error), + filter func(*bind.FilterOpts, []common.Address, []common.Address) (IT, error), onEvent func(IT), -) ( - func(block uint64) error, -) { +) func(block uint64) error { return func(block uint64) error { filterOpts := &bind.FilterOpts{ Context: ctx, @@ -154,7 +151,8 @@ func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, - endBlock *big.Int, + fromBlock uint64, + toBlock uint64, ) ( *iconsensus.IConsensus, *iconsensus.IConsensusClaimSubmitted, @@ -170,7 +168,7 @@ func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( onHit := newOnHit(ctx, application.IApplicationAddress, ic.FilterClaimSubmitted, func(it *iconsensus.IConsensusClaimSubmittedIterator) { event := it.Event - if (len(events) == 0) || claimSubmittedEventMatches(application, epoch, event) { + if (len(events) > 0) || claimSubmittedEventMatches(application, epoch, event) { events = append(events, event) } }, @@ -180,7 +178,7 @@ func (cb *claimerBlockchain) findClaimSubmittedEventAndSucc( if err != nil { return nil, nil, nil, err } - _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numSubmittedClaims, oracle, onHit) + _, err = ethutil.FindTransitions(ctx, fromBlock, toBlock, numSubmittedClaims, oracle, onHit) if err != nil { return nil, nil, nil, fmt.Errorf("failed to walk ClaimSubmitted transitions: %w", err) } @@ -200,7 +198,8 @@ func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( ctx context.Context, application *model.Application, epoch *model.Epoch, - endBlock *big.Int, + fromBlock uint64, + toBlock uint64, ) ( *iconsensus.IConsensus, *iconsensus.IConsensusClaimAccepted, @@ -224,7 +223,7 @@ func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( onHit := newOnHit(ctx, application.IApplicationAddress, filter, func(it *iconsensus.IConsensusClaimAcceptedIterator) { event := it.Event - if (len(events) == 0) || claimAcceptedEventMatches(application, epoch, event) { + if (len(events) > 0) || claimAcceptedEventMatches(application, epoch, event) { events = append(events, event) } }, @@ -234,7 +233,7 @@ func (cb *claimerBlockchain) findClaimAcceptedEventAndSucc( if err != nil { return nil, nil, nil, err } - _, err = ethutil.FindTransitions(ctx, epoch.LastBlock, endBlock.Uint64(), numAcceptedClaims, oracle, onHit) + _, err = ethutil.FindTransitions(ctx, fromBlock, toBlock, numAcceptedClaims, oracle, onHit) if err != nil { return nil, nil, nil, fmt.Errorf("failed to walk ClaimAccepted transitions: %w", err) } @@ -267,11 +266,7 @@ func (cb *claimerBlockchain) pollTransaction( } // receipt must be committed before use. Return false until it is. - if receipt.BlockNumber.Cmp(endBlockNumber) >= 0 { - return false, nil, nil - } - - return receipt.Status == 1, receipt, nil + return receipt.BlockNumber.Cmp(endBlockNumber) <= 0, receipt, nil } /* Retrieve the block number for the configured commitment level in ethereum terms, diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index 748660a28..129075ef8 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -93,20 +93,25 @@ type iclaimerRepository interface { reason *string, ) error + UpdateEventLastCheckBlock( + ctx context.Context, + appIDs []int64, + event model.MonitoredEvent, + blockNumber uint64, + ) error + SaveNodeConfigRaw(ctx context.Context, key string, rawJSON []byte) error LoadNodeConfigRaw(ctx context.Context, key string) (rawJSON []byte, createdAt, updatedAt time.Time, err error) } -// transition epoch claims from computed to submitted. -func (s *Service) submitClaimsAndUpdateDatabase( - acceptedOrSubmittedEpochs map[int64]*model.Epoch, +// claims in flight are those that have been submitted but are waiting for a +// transaction confirmation. When confirmed, we update their status on the +// database. The epoch is now "submitted" and no longer "computed". +func (s *Service) checkClaimsInFlight( computedEpochs map[int64]*model.Epoch, apps map[int64]*model.Application, endBlock *big.Int, -) []error { - errs := []error{} - var err error - +) error { // check claims in flight. NOTE: map mutation + iteration is safe in Go for key, txHash := range s.claimsInFlight { ready, receipt, err := s.blockchain.pollTransaction(s.Context, txHash, endBlock) @@ -121,6 +126,14 @@ func (s *Service) submitClaimsAndUpdateDatabase( if !ready { continue } + if receipt.Status == 0 { + s.Logger.Warn("Claim submission reverted, retrying.", + "txHash", txHash, + "err", err, + ) + delete(s.claimsInFlight, key) + continue + } if computedEpoch, ok := computedEpochs[key]; ok { err = s.repository.UpdateEpochWithSubmittedClaim( s.Context, @@ -129,15 +142,14 @@ func (s *Service) submitClaimsAndUpdateDatabase( receipt.TxHash, ) - // NOTE: there is no point in trying the other claims on a database error - // so we just return and try again on the next tick + // NOTE: there is no point in trying the other applications on a database error + // so we just return and try again later (next tick) if err != nil { - errs = append(errs, err) - return errs + return err } // we expect apps[key] to always exist, - // but guard its use behind this `if` to ensure there is no panic if we are wrong. + // but guard its use behind `if` to ensure there is no panic if we are wrong. appAddress := common.Address{} if app, ok := apps[key]; ok { appAddress = app.IApplicationAddress @@ -148,26 +160,102 @@ func (s *Service) submitClaimsAndUpdateDatabase( "claim_hash", fmt.Sprintf("%x", computedEpoch.OutputsMerkleRoot), "last_block", computedEpoch.LastBlock, "tx", txHash) + + // epoch is no longer "computed" and is now "submitted". + // Processing will happen on the next tick iteration. delete(computedEpochs, key) } else { - s.Logger.Warn("expected claim in flight to be in currClaims.", + s.Logger.Warn("unexpected, claim in flight is not a computed epoch.", + "id", key, "tx", receipt.TxHash) } delete(s.claimsInFlight, key) } + return nil +} + +func (s *Service) findClaimSubmittedEventAndSucc( + ctx context.Context, + app *model.Application, + prevEpoch *model.Epoch, + currEpoch *model.Epoch, + fromBlock uint64, + toBlock uint64, +) ( + *iconsensus.IConsensus, + *iconsensus.IConsensusClaimSubmitted, + *iconsensus.IConsensusClaimSubmitted, + error, +) { + err := checkEpochSequenceConstraint(prevEpoch, currEpoch) + if err != nil { + err = s.setApplicationInoperable( + s.Context, + app, + "%v. epoch: %v (%v).", + err, + prevEpoch.Index, + prevEpoch.VirtualIndex, + ) + return nil, nil, nil, err + } + + ic, prevClaimSubmissionEvent, currClaimSubmissionEvent, err := + s.blockchain.findClaimSubmittedEventAndSucc(ctx, app, prevEpoch, fromBlock, toBlock) + if err != nil { + return nil, nil, nil, err + } + + if prevClaimSubmissionEvent == nil { + err = s.setApplicationInoperable( + s.Context, + app, + "application has an invalid epoch: %v (%v). No claim submission event to match.", + prevEpoch.Index, + prevEpoch.VirtualIndex, + ) + return nil, nil, nil, err + } + if !claimSubmittedEventMatches(app, prevEpoch, prevClaimSubmissionEvent) { + err = s.setApplicationInoperable( + s.Context, + app, + "application has an invalid epoch: %v (%v), missing claim submitted event (%v).", + prevEpoch.Index, + prevEpoch.VirtualIndex, + prevClaimSubmissionEvent.Raw.TxHash, + ) + return nil, nil, nil, err + } + return ic, prevClaimSubmissionEvent, currClaimSubmissionEvent, nil +} + +// transition epoch claims from computed to submitted. +func (s *Service) submitClaimsAndUpdateDatabase( + acceptedOrSubmittedEpochs map[int64]*model.Epoch, + computedEpochs map[int64]*model.Epoch, + apps map[int64]*model.Application, + defaultBlockNumber *big.Int, +) []error { + err := s.checkClaimsInFlight(computedEpochs, apps, defaultBlockNumber) + if err != nil { + return []error{err} + } + + errs := []error{} // check computed epochs. NOTE: map mutation + iteration is safe in Go for key, currEpoch := range computedEpochs { var ic *iconsensus.IConsensus - var prevClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted - var currClaimSubmissionEvent *iconsensus.IConsensusClaimSubmitted + var prevEvent *iconsensus.IConsensusClaimSubmitted + var currEvent *iconsensus.IConsensusClaimSubmitted if _, isClaimInFlight := s.claimsInFlight[key]; isClaimInFlight { continue } app := apps[key] // guaranteed to exist because of the query and database constraints - prevEpoch, previousEpochExists := acceptedOrSubmittedEpochs[key] + prevEpoch, prevEpochExists := acceptedOrSubmittedEpochs[key] // check address for changes if err := s.checkConsensusForAddressChange(app); err != nil { @@ -175,88 +263,49 @@ func (s *Service) submitClaimsAndUpdateDatabase( errs = append(errs, err) continue } - if previousEpochExists { - err := checkEpochSequenceConstraint(prevEpoch, currEpoch) - if err != nil { - err = s.setApplicationInoperable( - s.Context, - app, - "database mismatch on epochs. application: %v, epochs: %v (%v), %v (%v).", - app.IApplicationAddress, - prevEpoch.Index, - prevEpoch.VirtualIndex, - currEpoch.Index, - currEpoch.VirtualIndex, - ) - delete(computedEpochs, key) - errs = append(errs, err) - continue - } + if prevEpochExists { + fromBlock := prevEpoch.LastBlock + 1 + ic, prevEvent, currEvent, err = s.findClaimSubmittedEventAndSucc(s.Context, app, prevEpoch, currEpoch, fromBlock, defaultBlockNumber.Uint64()) + } else { + fromBlock := max(app.LastSubmittedClaimCheckBlock, currEpoch.LastBlock) + 1 + ic, currEvent, _, err = s.blockchain.findClaimSubmittedEventAndSucc(s.Context, app, currEpoch, fromBlock, defaultBlockNumber.Uint64()) + } + if err != nil { + delete(computedEpochs, key) + errs = append(errs, err) + continue + } - // the previous epoch must have a matching claim submission event. - // current epoch may or may not be present - ic, prevClaimSubmissionEvent, currClaimSubmissionEvent, err = - s.blockchain.findClaimSubmittedEventAndSucc(s.Context, app, prevEpoch, endBlock) - if err != nil { - delete(computedEpochs, key) - errs = append(errs, err) - continue - } - if prevClaimSubmissionEvent == nil { - err = s.setApplicationInoperable( - s.Context, - app, - "epoch has no matching event. application: %v, epoch: %v (%v).", - app.IApplicationAddress, - prevEpoch.Index, - prevEpoch.VirtualIndex, - ) - delete(computedEpochs, key) - errs = append(errs, err) - continue + if currEvent != nil { + checkFrom := uint64(0) + if currEvent.Raw.BlockNumber > 0 { + checkFrom = currEvent.Raw.BlockNumber - 1 } - if !claimSubmittedEventMatches(app, prevEpoch, prevClaimSubmissionEvent) { - s.Logger.Error("event mismatch", - "claim", prevEpoch, - "event", prevClaimSubmissionEvent, - "err", ErrEventMismatch, - ) - err = s.setApplicationInoperable( - s.Context, - app, - "epoch has an invalid event: %v, epoch: %v (%v). event: %v", - currEpoch.Index, - prevEpoch.Index, - prevEpoch.VirtualIndex, - prevClaimSubmissionEvent.Raw.TxHash, - ) - delete(computedEpochs, key) - errs = append(errs, err) - continue + if prevEvent != nil { + checkFrom = prevEvent.Raw.BlockNumber } - } else { - // first claim - ic, currClaimSubmissionEvent, _, err = - s.blockchain.findClaimSubmittedEventAndSucc(s.Context, app, currEpoch, endBlock) + err = s.repository.UpdateEventLastCheckBlock( + s.Context, + []int64{app.ID}, + model.MonitoredEvent_ClaimSubmitted, + checkFrom, + ) + // NOTE: there is no point in trying the other applications on a database error + // so we just return and try again later (next tick) if err != nil { - delete(computedEpochs, key) - errs = append(errs, err) - continue + return append(errs, err) } - } - - if currClaimSubmissionEvent != nil { s.Logger.Debug("Found ClaimSubmitted Event", - "app", currClaimSubmissionEvent.AppContract, - "claim_hash", fmt.Sprintf("%x", currClaimSubmissionEvent.OutputsMerkleRoot), - "last_block", currClaimSubmissionEvent.LastProcessedBlockNumber.Uint64(), + "app", currEvent.AppContract, + "claim_hash", fmt.Sprintf("%x", currEvent.OutputsMerkleRoot), + "last_block", currEvent.LastProcessedBlockNumber.Uint64(), ) - if !claimSubmittedEventMatches(app, currEpoch, currClaimSubmissionEvent) { + if !claimSubmittedEventMatches(app, currEpoch, currEvent) { err = s.setApplicationInoperable( s.Context, app, "computed claim does not match event. computed_claim=%v, current_event=%v", - currEpoch, currClaimSubmissionEvent, + currEpoch, currEvent, ) delete(computedEpochs, key) errs = append(errs, err) @@ -267,7 +316,7 @@ func (s *Service) submitClaimsAndUpdateDatabase( "claim_hash", fmt.Sprintf("%x", currEpoch.OutputsMerkleRoot), "last_block", currEpoch.LastBlock, ) - txHash := currClaimSubmissionEvent.Raw.TxHash + txHash := currEvent.Raw.TxHash err = s.repository.UpdateEpochWithSubmittedClaim( s.Context, currEpoch.ApplicationID, @@ -282,143 +331,180 @@ func (s *Service) submitClaimsAndUpdateDatabase( delete(s.claimsInFlight, key) s.Logger.Info("Claim previously submitted", "app", app.IApplicationAddress, - "event_block_number", currClaimSubmissionEvent.Raw.BlockNumber, + "event_block_number", currEvent.Raw.BlockNumber, "claim_hash", fmt.Sprintf("%x", currEpoch.OutputsMerkleRoot), "last_block", currEpoch.LastBlock, ) - } else if s.submissionEnabled { - if prevEpoch != nil && prevEpoch.Status != model.EpochStatus_ClaimAccepted { - s.Logger.Debug("Waiting previous claim to be accepted before submitting new one. Previous:", - "app", app.IApplicationAddress, - "claim_hash", fmt.Sprintf("%x", prevEpoch.OutputsMerkleRoot), - "last_block", prevEpoch.LastBlock, - ) - continue - } - s.Logger.Debug("Submitting claim to blockchain", - "app", app.IApplicationAddress, - "claim_hash", fmt.Sprintf("%x", currEpoch.OutputsMerkleRoot), - "last_block", currEpoch.LastBlock, + } else { + err = s.repository.UpdateEventLastCheckBlock( + s.Context, + []int64{app.ID}, + model.MonitoredEvent_ClaimSubmitted, + defaultBlockNumber.Uint64(), ) - txHash, err := s.blockchain.submitClaimToBlockchain(ic, app, currEpoch) + // NOTE: there is no point in trying the other applications on a database error + // so we just return and try again later (next tick) if err != nil { - delete(computedEpochs, key) - errs = append(errs, err) - continue + return append(errs, err) } - s.claimsInFlight[key] = txHash - } else { - s.Logger.Debug("Claim submission disabled. Doing nothing", - "app", app.IApplicationAddress, - "claim_hash", fmt.Sprintf("%x", currEpoch.OutputsMerkleRoot), - "last_block", currEpoch.LastBlock, - ) + if s.submissionEnabled { + if prevEpoch != nil && prevEpoch.Status != model.EpochStatus_ClaimAccepted { + s.Logger.Debug("Waiting previous claim to be accepted before submitting new one. Previous:", + "app", app.IApplicationAddress, + "claim_hash", fmt.Sprintf("%x", prevEpoch.OutputsMerkleRoot), + "last_block", prevEpoch.LastBlock, + ) + continue + } + s.Logger.Debug("Submitting claim to blockchain", + "app", app.IApplicationAddress, + "claim_hash", fmt.Sprintf("%x", currEpoch.OutputsMerkleRoot), + "last_block", currEpoch.LastBlock, + ) + txHash, err := s.blockchain.submitClaimToBlockchain(ic, app, currEpoch) + if err != nil { + delete(computedEpochs, key) + errs = append(errs, err) + continue + } + s.claimsInFlight[key] = txHash + } else { + s.Logger.Debug("Claim submission disabled. Doing nothing", + "app", app.IApplicationAddress, + "claim_hash", fmt.Sprintf("%x", currEpoch.OutputsMerkleRoot), + "last_block", currEpoch.LastBlock, + ) + + } } } return errs } +func (s *Service) findClaimAcceptedEventAndSucc( + ctx context.Context, + app *model.Application, + prevEpoch *model.Epoch, + currEpoch *model.Epoch, + fromBlock uint64, + toBlock uint64, +) ( + *iconsensus.IConsensus, + *iconsensus.IConsensusClaimAccepted, + *iconsensus.IConsensusClaimAccepted, + error, +) { + err := checkEpochSequenceConstraint(prevEpoch, currEpoch) + if err != nil { + err = s.setApplicationInoperable( + ctx, + app, + "%v. epoch: %v (%v).", + err, + prevEpoch.Index, + prevEpoch.VirtualIndex, + ) + return nil, nil, nil, err + } + + ic, prevClaimAcceptanceEvent, currClaimAcceptanceEvent, err := + s.blockchain.findClaimAcceptedEventAndSucc(ctx, app, prevEpoch, fromBlock, toBlock) + if err != nil { + return nil, nil, nil, err + } + + if prevClaimAcceptanceEvent == nil { + err = s.setApplicationInoperable( + ctx, + app, + "application has an invalid epoch: %v (%v), missing claim acceptance event.", + prevEpoch.Index, + prevEpoch.VirtualIndex, + ) + return nil, nil, nil, err + } + if !claimAcceptedEventMatches(app, prevEpoch, prevClaimAcceptanceEvent) { + err = s.setApplicationInoperable( + ctx, + app, + "application has an invalid epoch: %v (%v). event does not match: %v", + prevEpoch.Index, + prevEpoch.VirtualIndex, + prevClaimAcceptanceEvent.Raw.TxHash, + ) + return nil, nil, nil, err + } + return ic, prevClaimAcceptanceEvent, currClaimAcceptanceEvent, nil +} + // transition claims from submitted to accepted func (s *Service) acceptClaimsAndUpdateDatabase( acceptedEpochs map[int64]*model.Epoch, submittedEpochs map[int64]*model.Epoch, apps map[int64]*model.Application, - endBlock *big.Int, + defaultBlockNumber *big.Int, ) []error { errs := []error{} var err error // check submitted epochs. NOTE: map mutation + iteration is safe in Go - for key, submittedEpoch := range submittedEpochs { + for key, currEpoch := range submittedEpochs { var prevEvent *iconsensus.IConsensusClaimAccepted var currEvent *iconsensus.IConsensusClaimAccepted app := apps[key] - acceptedEpoch, prevExists := acceptedEpochs[key] + prevEpoch, prevEpochExists := acceptedEpochs[key] // check address for changes if err := s.checkConsensusForAddressChange(app); err != nil { delete(submittedEpochs, key) errs = append(errs, err) continue } - if prevExists { - err := checkEpochSequenceConstraint(acceptedEpoch, submittedEpoch) - if err != nil { - s.Logger.Error("epoch sequence check failed.", - "app", app.IApplicationAddress, - "previous_epoch_index", acceptedEpoch.Index, - "current_epoch_index", submittedEpoch.Index, - "err", err, - ) - err = s.setApplicationInoperable( - s.Context, - app, - "%v. application: %v", - err, - app.IApplicationAddress, - ) - delete(submittedEpochs, key) - errs = append(errs, err) - continue - } - // if an epoch was accepted, there must exist a matching event for it + if prevEpochExists { + fromBlock := prevEpoch.LastBlock + 1 _, prevEvent, currEvent, err = - s.blockchain.findClaimAcceptedEventAndSucc(s.Context, app, acceptedEpoch, endBlock) - if err != nil { - delete(submittedEpochs, key) - errs = append(errs, err) - continue - } - if prevEvent == nil { - s.Logger.Error("accepted epoch has no matching event", - "app", app.IApplicationAddress, - "claim", acceptedEpoch, - "err", ErrMissingEvent, - ) - delete(submittedEpochs, key) - errs = append(errs, ErrMissingEvent) - continue - } - if !claimAcceptedEventMatches(app, acceptedEpoch, prevEvent) { - s.Logger.Error("accepted epoch does not match event", - "app", app.IApplicationAddress, - "claim", acceptedEpoch, - "event", prevEvent, - "err", ErrEventMismatch, - ) - err = s.setApplicationInoperable( - s.Context, - app, - "epoch: %v does not match event with tx_hash: %v", - acceptedEpoch.Index, - prevEvent.Raw.TxHash, - ) - delete(submittedEpochs, key) - errs = append(errs, err) - continue - } + s.findClaimAcceptedEventAndSucc(s.Context, app, prevEpoch, currEpoch, fromBlock, defaultBlockNumber.Uint64()) } else { - // first claim + fromBlock := max(app.LastAcceptedClaimCheckBlock, currEpoch.LastBlock) + 1 _, currEvent, _, err = - s.blockchain.findClaimAcceptedEventAndSucc(s.Context, app, submittedEpoch, endBlock) - if err != nil { - delete(submittedEpochs, key) - errs = append(errs, err) - continue - } + s.blockchain.findClaimAcceptedEventAndSucc(s.Context, app, currEpoch, fromBlock, defaultBlockNumber.Uint64()) + } + if err != nil { + delete(submittedEpochs, key) + errs = append(errs, err) + continue } if currEvent != nil { + checkFrom := uint64(0) + if currEvent.Raw.BlockNumber > 0 { + checkFrom = currEvent.Raw.BlockNumber - 1 + } + if prevEvent != nil { + checkFrom = prevEvent.Raw.BlockNumber + } + err = s.repository.UpdateEventLastCheckBlock( + s.Context, + []int64{app.ID}, + model.MonitoredEvent_ClaimAccepted, + checkFrom, + ) + // NOTE: there is no point in trying the other applications on a database error + // so we just return and try again later (next tick) + if err != nil { + return append(errs, err) + } + s.Logger.Debug("Found ClaimAccepted Event", "app", currEvent.AppContract, "claim_hash", fmt.Sprintf("%x", currEvent.OutputsMerkleRoot), "last_block", currEvent.LastProcessedBlockNumber.Uint64(), ) - if !claimAcceptedEventMatches(app, submittedEpoch, currEvent) { + if !claimAcceptedEventMatches(app, currEpoch, currEvent) { s.Logger.Error("event mismatch", - "claim", submittedEpoch, + "claim", currEpoch, "event", currEvent, "err", ErrEventMismatch, ) @@ -426,8 +512,8 @@ func (s *Service) acceptClaimsAndUpdateDatabase( s.Context, app, "event mismatch for epoch %v, event tx_hash: %v", - acceptedEpoch.Index, - prevEvent.Raw.TxHash, + currEpoch.Index, + currEvent.Raw.TxHash, ) delete(submittedEpochs, key) errs = append(errs, err) @@ -435,11 +521,11 @@ func (s *Service) acceptClaimsAndUpdateDatabase( } s.Logger.Debug("Updating claim status to accepted", "app", app.IApplicationAddress, - "claim_hash", fmt.Sprintf("%x", submittedEpoch.OutputsMerkleRoot), - "last_block", submittedEpoch.LastBlock, + "claim_hash", fmt.Sprintf("%x", currEpoch.OutputsMerkleRoot), + "last_block", currEpoch.LastBlock, ) txHash := currEvent.Raw.TxHash - err = s.repository.UpdateEpochWithAcceptedClaim(s.Context, submittedEpoch.ApplicationID, submittedEpoch.Index) + err = s.repository.UpdateEpochWithAcceptedClaim(s.Context, currEpoch.ApplicationID, currEpoch.Index) if err != nil { delete(submittedEpochs, key) errs = append(errs, err) @@ -452,6 +538,19 @@ func (s *Service) acceptClaimsAndUpdateDatabase( "last_block", currEvent.LastProcessedBlockNumber.Uint64(), "tx", txHash, ) + } else { + err = s.repository.UpdateEventLastCheckBlock( + s.Context, + []int64{app.ID}, + model.MonitoredEvent_ClaimAccepted, + defaultBlockNumber.Uint64(), + ) + // NOTE: there is no point in trying the other applications on a database error + // so we just return and try again later (next tick) + if err != nil { + return append(errs, err) + } + } } return errs diff --git a/internal/claimer/claimer_test.go b/internal/claimer/claimer_test.go index c782ccef0..5d4402969 100644 --- a/internal/claimer/claimer_test.go +++ b/internal/claimer/claimer_test.go @@ -28,6 +28,11 @@ type claimerRepositoryMock struct { mock.Mock } +func (m *claimerRepositoryMock) UpdateEventLastCheckBlock(ctx context.Context, appIDs []int64, event model.MonitoredEvent, blockNumber uint64) error { + args := m.Called(ctx, appIDs, event, blockNumber) + return args.Error(0) +} + func (m *claimerRepositoryMock) SelectSubmittedClaimPairsPerApp(ctx context.Context) ( map[int64]*model.Epoch, map[int64]*model.Epoch, @@ -108,14 +113,15 @@ func (m *claimerBlockchainMock) findClaimSubmittedEventAndSucc( ctx context.Context, app *model.Application, epoch *model.Epoch, - endBlock *big.Int, + fromBlock uint64, + toBlock uint64, ) ( *iconsensus.IConsensus, *iconsensus.IConsensusClaimSubmitted, *iconsensus.IConsensusClaimSubmitted, error, ) { - args := m.Called(app, epoch, endBlock) + args := m.Called(ctx, app, epoch, fromBlock, toBlock) return args.Get(0).(*iconsensus.IConsensus), args.Get(1).(*iconsensus.IConsensusClaimSubmitted), args.Get(2).(*iconsensus.IConsensusClaimSubmitted), @@ -126,14 +132,15 @@ func (m *claimerBlockchainMock) findClaimAcceptedEventAndSucc( ctx context.Context, app *model.Application, epoch *model.Epoch, - endBlock *big.Int, + fromBlock uint64, + toBlock uint64, ) ( *iconsensus.IConsensus, *iconsensus.IConsensusClaimAccepted, *iconsensus.IConsensusClaimAccepted, error, ) { - args := m.Called(ctx, app, epoch, endBlock) + args := m.Called(ctx, app, epoch, fromBlock, toBlock) return args.Get(0).(*iconsensus.IConsensus), args.Get(1).(*iconsensus.IConsensusClaimAccepted), args.Get(2).(*iconsensus.IConsensusClaimAccepted), @@ -252,7 +259,8 @@ func makeSubmittedEvent(app *model.Application, epoch *model.Epoch) *iconsensus. AppContract: app.IApplicationAddress, OutputsMerkleRoot: *epoch.OutputsMerkleRoot, Raw: types.Log{ - TxHash: common.HexToHash(epoch.ClaimTransactionHash.Hex()), + TxHash: common.HexToHash(epoch.ClaimTransactionHash.Hex()), + BlockNumber: epoch.LastBlock + 1, }, } } @@ -263,7 +271,8 @@ func makeAcceptedEvent(app *model.Application, epoch *model.Epoch) *iconsensus.I AppContract: app.IApplicationAddress, OutputsMerkleRoot: *epoch.OutputsMerkleRoot, Raw: types.Log{ - TxHash: common.HexToHash(epoch.ClaimTransactionHash.Hex()), + TxHash: common.HexToHash(epoch.ClaimTransactionHash.Hex()), + BlockNumber: epoch.LastBlock + 1, }, } } @@ -287,7 +296,7 @@ func TestSubmitFirstClaim(t *testing.T) { defer r.AssertExpectations(t) defer b.AssertExpectations(t) - endBlock := big.NewInt(0) + endBlock := big.NewInt(40) app := makeApplication(0) currEpoch := makeComputedEpoch(app, 3) var prevEvent *iconsensus.IConsensusClaimSubmitted = nil @@ -295,10 +304,12 @@ func TestSubmitFirstClaim(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, currEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). Return(common.HexToHash("0x10"), nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -319,10 +330,12 @@ func TestSubmitClaimWithAntecessor(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). Return(common.HexToHash("0x10"), nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -335,7 +348,7 @@ func TestSkipSubmitFirstClaim(t *testing.T) { defer b.AssertExpectations(t) m.submissionEnabled = false - endBlock := big.NewInt(0) + endBlock := big.NewInt(40) app := makeApplication(0) currEpoch := makeComputedEpoch(app, 3) var prevEvent *iconsensus.IConsensusClaimSubmitted = nil @@ -343,8 +356,10 @@ func TestSkipSubmitFirstClaim(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, currEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -357,7 +372,7 @@ func TestSkipSubmitClaimWithAntecessor(t *testing.T) { defer b.AssertExpectations(t) m.submissionEnabled = false - endBlock := big.NewInt(0) + endBlock := big.NewInt(40) app := makeApplication(0) prevEpoch := makeAcceptedEpoch(app, 1) currEpoch := makeComputedEpoch(app, 3) @@ -366,8 +381,10 @@ func TestSkipSubmitClaimWithAntecessor(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -405,7 +422,7 @@ func TestUpdateFirstClaim(t *testing.T) { defer r.AssertExpectations(t) defer b.AssertExpectations(t) - endBlock := big.NewInt(0) + endBlock := big.NewInt(40) app := makeApplication(0) currEpoch := makeComputedEpoch(app, 3) var prevEvent *iconsensus.IConsensusClaimSubmitted = nil @@ -413,10 +430,12 @@ func TestUpdateFirstClaim(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, currEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, currEvent, prevEvent, nil).Once() r.On("UpdateEpochWithSubmittedClaim", mock.Anything, app.ID, currEpoch.Index, currEvent.Raw.TxHash). Return(nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -437,10 +456,12 @@ func TestUpdateClaimWithAntecessor(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() r.On("UpdateEpochWithSubmittedClaim", mock.Anything, app.ID, currEpoch.Index, currEvent.Raw.TxHash). Return(nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -458,10 +479,12 @@ func TestAcceptFirstClaim(t *testing.T) { var prevEvent *iconsensus.IConsensusClaimAccepted = nil currEvent := makeAcceptedEvent(app, currEpoch) - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, currEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimAccepted, mock.Anything). + Return(nil).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -481,10 +504,12 @@ func TestAcceptClaimWithAntecessor(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() r.On("UpdateEpochWithAcceptedClaim", mock.Anything, app.ID, currEpoch.Index). Return(nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimAccepted, mock.Anything). + Return(nil).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 0) @@ -536,10 +561,12 @@ func TestSubmitFailedClaim(t *testing.T) { Return(false, nilReceipt, expectedErr).Once() b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() b.On("submitClaimToBlockchain", mock.Anything, app, currEpoch). Return(common.HexToHash("0x10"), nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 0, len(errs)) @@ -567,12 +594,11 @@ func TestSubmitClaimWithAntecessorMismatch(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil). Once() - b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil). Once() r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). - Return(nil). - Once() + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -584,7 +610,7 @@ func TestSubmitClaimWithEventMismatch(t *testing.T) { defer r.AssertExpectations(t) defer b.AssertExpectations(t) - endBlock := big.NewInt(0) + endBlock := big.NewInt(40) app := makeApplication(0) prevEpoch := makeAcceptedEpoch(app, 1) currEpoch := makeComputedEpoch(app, 3) @@ -593,10 +619,12 @@ func TestSubmitClaimWithEventMismatch(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, wrongEvent, nil) r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). - Return(nil) + Return(nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimSubmitted, mock.Anything). + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -615,7 +643,7 @@ func TestSubmitClaimWithAntecessorOutOfOrder(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). - Return(nil) + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) assert.Equal(t, 1, len(errs)) @@ -635,10 +663,10 @@ func TestErrSubmittedMissingEvent(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimSubmittedEventAndSucc", app, prevEpoch, endBlock). + b.On("findClaimSubmittedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). - Return(nil) + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -659,8 +687,7 @@ func TestConsensusAddressChangedOnSubmittedClaims(t *testing.T) { Return(wrongConsensusAddress, nil). Once() r.On("UpdateApplicationState", mock.Anything, int64(0), model.ApplicationState_Inoperable, mock.Anything). - Return(nil). - Once() + Return(nil).Once() errs := m.submitClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, len(errs), 1) @@ -683,7 +710,7 @@ func TestFindClaimAcceptedEventAndSuccFailure0(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, currEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, currEpoch, currEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, expectedErr).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) @@ -706,7 +733,7 @@ func TestFindClaimAcceptedEventAndSuccFailure1(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, expectedErr).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) @@ -733,11 +760,10 @@ func TestAcceptClaimWithAntecessorMismatch(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). - Return(nil). - Once() + Return(nil).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -759,11 +785,12 @@ func TestAcceptClaimWithEventMismatch(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, wrongEvent, nil) r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). - Return(nil). - Once() + Return(nil).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimAccepted, mock.Anything). + Return(nil).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -785,7 +812,6 @@ func TestAcceptClaimWithAntecessorOutOfOrder(t *testing.T) { Return(nil). Once() - errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(wrongEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), big.NewInt(0)) assert.Equal(t, 1, len(errs)) } @@ -804,8 +830,10 @@ func TestErrAcceptedMissingEvent(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() + r.On("UpdateApplicationState", mock.Anything, mock.Anything, model.ApplicationState_Inoperable, mock.Anything). + Return(nil).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) @@ -827,10 +855,12 @@ func TestUpdateEpochWithAcceptedClaimFailed(t *testing.T) { b.On("getConsensusAddress", mock.Anything, app). Return(app.IConsensusAddress, nil).Once() - b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, endBlock). + b.On("findClaimAcceptedEventAndSucc", mock.Anything, app, prevEpoch, prevEpoch.LastBlock+1, endBlock.Uint64()). Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil).Once() r.On("UpdateEpochWithAcceptedClaim", mock.Anything, app.ID, currEpoch.Index). Return(expectedErr).Once() + r.On("UpdateEventLastCheckBlock", mock.Anything, []int64{app.ID}, model.MonitoredEvent_ClaimAccepted, mock.Anything). + Return(nil).Once() errs := m.acceptClaimsAndUpdateDatabase(makeEpochMap(prevEpoch), makeEpochMap(currEpoch), makeApplicationMap(app), endBlock) assert.Equal(t, 1, len(errs)) diff --git a/internal/model/models.go b/internal/model/models.go index bb20e3cfb..0c81b553e 100644 --- a/internal/model/models.go +++ b/internal/model/models.go @@ -17,27 +17,29 @@ import ( ) type Application struct { - ID int64 `sql:"primary_key" json:"-"` - Name string `json:"name"` - IApplicationAddress common.Address `json:"iapplication_address"` - IConsensusAddress common.Address `json:"iconsensus_address"` - IInputBoxAddress common.Address `json:"iinputbox_address"` - TemplateHash common.Hash `json:"template_hash"` - TemplateURI string `json:"-"` - EpochLength uint64 `json:"epoch_length"` - DataAvailability []byte `json:"data_availability"` - ConsensusType Consensus `json:"consensus_type"` - State ApplicationState `json:"state"` - Reason *string `json:"reason"` - IInputBoxBlock uint64 `json:"iinputbox_block"` - LastEpochCheckBlock uint64 `json:"last_epoch_check_block"` - LastInputCheckBlock uint64 `json:"last_input_check_block"` - LastOutputCheckBlock uint64 `json:"last_output_check_block"` - LastTournamentCheckBlock uint64 `json:"last_tournament_check_block"` - ProcessedInputs uint64 `json:"processed_inputs"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` - ExecutionParameters ExecutionParameters `json:"execution_parameters"` + ID int64 `sql:"primary_key" json:"-"` + Name string `json:"name"` + IApplicationAddress common.Address `json:"iapplication_address"` + IConsensusAddress common.Address `json:"iconsensus_address"` + IInputBoxAddress common.Address `json:"iinputbox_address"` + TemplateHash common.Hash `json:"template_hash"` + TemplateURI string `json:"-"` + EpochLength uint64 `json:"epoch_length"` + DataAvailability []byte `json:"data_availability"` + ConsensusType Consensus `json:"consensus_type"` + State ApplicationState `json:"state"` + Reason *string `json:"reason"` + IInputBoxBlock uint64 `json:"iinputbox_block"` + LastEpochCheckBlock uint64 `json:"last_epoch_check_block"` + LastInputCheckBlock uint64 `json:"last_input_check_block"` + LastOutputCheckBlock uint64 `json:"last_output_check_block"` + LastTournamentCheckBlock uint64 `json:"last_tournament_check_block"` + LastSubmittedClaimCheckBlock uint64 `json:"last_submitted_claim_check_block"` + LastAcceptedClaimCheckBlock uint64 `json:"last_accepted_claim_check_block"` + ProcessedInputs uint64 `json:"processed_inputs"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + ExecutionParameters ExecutionParameters `json:"execution_parameters"` } // HasDataAvailabilitySelector checks if the application's DataAvailability @@ -52,24 +54,28 @@ func (a *Application) MarshalJSON() ([]byte, error) { // Define a new structure that embeds the alias but overrides the hex fields. aux := &struct { *Alias - DataAvailability string `json:"data_availability"` - IInputBoxBlock string `json:"iinputbox_block"` - LastEpochCheckBlock string `json:"last_epoch_check_block"` - LastInputCheckBlock string `json:"last_input_check_block"` - LastOutputCheckBlock string `json:"last_output_check_block"` - LastTournamentCheckBlock string `json:"last_tournament_check_block"` - EpochLength string `json:"epoch_length"` - ProcessedInputs string `json:"processed_inputs"` + DataAvailability string `json:"data_availability"` + IInputBoxBlock string `json:"iinputbox_block"` + LastEpochCheckBlock string `json:"last_epoch_check_block"` + LastInputCheckBlock string `json:"last_input_check_block"` + LastOutputCheckBlock string `json:"last_output_check_block"` + LastTournamentCheckBlock string `json:"last_tournament_check_block"` + LastSubmittedClaimCheckBlock string `json:"last_submitted_claim_check_block"` + LastAcceptedClaimCheckBlock string `json:"last_accepted_claim_check_block"` + EpochLength string `json:"epoch_length"` + ProcessedInputs string `json:"processed_inputs"` }{ - Alias: (*Alias)(a), - DataAvailability: "0x" + hex.EncodeToString(a.DataAvailability), - IInputBoxBlock: fmt.Sprintf("0x%x", a.IInputBoxBlock), - LastEpochCheckBlock: fmt.Sprintf("0x%x", a.LastEpochCheckBlock), - LastInputCheckBlock: fmt.Sprintf("0x%x", a.LastInputCheckBlock), - LastOutputCheckBlock: fmt.Sprintf("0x%x", a.LastOutputCheckBlock), - LastTournamentCheckBlock: fmt.Sprintf("0x%x", a.LastTournamentCheckBlock), - EpochLength: fmt.Sprintf("0x%x", a.EpochLength), - ProcessedInputs: fmt.Sprintf("0x%x", a.ProcessedInputs), + Alias: (*Alias)(a), + DataAvailability: "0x" + hex.EncodeToString(a.DataAvailability), + IInputBoxBlock: fmt.Sprintf("0x%x", a.IInputBoxBlock), + LastEpochCheckBlock: fmt.Sprintf("0x%x", a.LastEpochCheckBlock), + LastInputCheckBlock: fmt.Sprintf("0x%x", a.LastInputCheckBlock), + LastOutputCheckBlock: fmt.Sprintf("0x%x", a.LastOutputCheckBlock), + LastTournamentCheckBlock: fmt.Sprintf("0x%x", a.LastTournamentCheckBlock), + LastSubmittedClaimCheckBlock: fmt.Sprintf("0x%x", a.LastSubmittedClaimCheckBlock), + LastAcceptedClaimCheckBlock: fmt.Sprintf("0x%x", a.LastAcceptedClaimCheckBlock), + EpochLength: fmt.Sprintf("0x%x", a.EpochLength), + ProcessedInputs: fmt.Sprintf("0x%x", a.ProcessedInputs), } return json.Marshal(aux) } @@ -79,14 +85,16 @@ func (a *Application) UnmarshalJSON(in []byte) error { aux := &struct { *Alias - DataAvailability string `json:"data_availability"` - IInputBoxBlock string `json:"iinputbox_block"` - LastInputCheckBlock string `json:"last_input_check_block"` - LastOutputCheckBlock string `json:"last_output_check_block"` - LastEpochCheckBlock string `json:"last_epoch_check_block"` - LastTournamentCheckBlock string `json:"last_tournament_check_block"` - EpochLength string `json:"epoch_length"` - ProcessedInputs string `json:"processed_inputs"` + DataAvailability string `json:"data_availability"` + IInputBoxBlock string `json:"iinputbox_block"` + LastInputCheckBlock string `json:"last_input_check_block"` + LastOutputCheckBlock string `json:"last_output_check_block"` + LastEpochCheckBlock string `json:"last_epoch_check_block"` + LastTournamentCheckBlock string `json:"last_tournament_check_block"` + EpochLength string `json:"epoch_length"` + ProcessedInputs string `json:"processed_inputs"` + LastSubmittedClaimCheckBlock string `json:"last_submitted_claim_check_block"` + LastAcceptedClaimCheckBlock string `json:"last_accepted_claim_check_block"` }{} var err error @@ -128,6 +136,16 @@ func (a *Application) UnmarshalJSON(in []byte) error { return err } + a.LastSubmittedClaimCheckBlock, err = ParseHexUint64(aux.LastSubmittedClaimCheckBlock) + if err != nil { + return err + } + + a.LastAcceptedClaimCheckBlock, err = ParseHexUint64(aux.LastAcceptedClaimCheckBlock) + if err != nil { + return err + } + a.EpochLength, err = ParseHexUint64(aux.EpochLength) if err != nil { return err diff --git a/internal/repository/postgres/application.go b/internal/repository/postgres/application.go index b18efd6d8..7c4eeabea 100644 --- a/internal/repository/postgres/application.go +++ b/internal/repository/postgres/application.go @@ -41,6 +41,8 @@ func (r *PostgresRepository) CreateApplication( table.Application.LastInputCheckBlock, table.Application.LastOutputCheckBlock, table.Application.LastTournamentCheckBlock, + table.Application.LastSubmittedClaimCheckBlock, + table.Application.LastAcceptedClaimCheckBlock, table.Application.ProcessedInputs, ). VALUES( @@ -59,6 +61,8 @@ func (r *PostgresRepository) CreateApplication( app.LastInputCheckBlock, app.LastOutputCheckBlock, app.LastTournamentCheckBlock, + app.LastSubmittedClaimCheckBlock, + app.LastAcceptedClaimCheckBlock, app.ProcessedInputs, ). RETURNING(table.Application.ID) @@ -159,6 +163,8 @@ func (r *PostgresRepository) GetApplication( table.Application.LastInputCheckBlock, table.Application.LastOutputCheckBlock, table.Application.LastTournamentCheckBlock, + table.Application.LastSubmittedClaimCheckBlock, + table.Application.LastAcceptedClaimCheckBlock, table.Application.ProcessedInputs, table.Application.CreatedAt, table.Application.UpdatedAt, @@ -209,6 +215,8 @@ func (r *PostgresRepository) GetApplication( &app.LastInputCheckBlock, &app.LastOutputCheckBlock, &app.LastTournamentCheckBlock, + &app.LastSubmittedClaimCheckBlock, + &app.LastAcceptedClaimCheckBlock, &app.ProcessedInputs, &app.CreatedAt, &app.UpdatedAt, @@ -286,6 +294,8 @@ func (r *PostgresRepository) UpdateApplication( table.Application.LastInputCheckBlock, table.Application.LastOutputCheckBlock, table.Application.LastTournamentCheckBlock, + table.Application.LastSubmittedClaimCheckBlock, + table.Application.LastAcceptedClaimCheckBlock, table.Application.ProcessedInputs, ). SET( @@ -305,6 +315,8 @@ func (r *PostgresRepository) UpdateApplication( app.LastInputCheckBlock, app.LastOutputCheckBlock, app.LastTournamentCheckBlock, + app.LastSubmittedClaimCheckBlock, + app.LastAcceptedClaimCheckBlock, app.ProcessedInputs, ). WHERE(table.Application.ID.EQ(postgres.Int(app.ID))) @@ -356,9 +368,9 @@ func getColumnForEvent(event model.MonitoredEvent) (postgres.ColumnFloat, error) case model.MonitoredEvent_NewInnerTournament: return table.Application.LastTournamentCheckBlock, nil case model.MonitoredEvent_ClaimSubmitted: - fallthrough + return table.Application.LastSubmittedClaimCheckBlock, nil case model.MonitoredEvent_ClaimAccepted: - fallthrough + return table.Application.LastAcceptedClaimCheckBlock, nil default: return nil, fmt.Errorf("invalid monitored event type: %v", event) } @@ -530,6 +542,8 @@ func (r *PostgresRepository) ListApplications( table.Application.LastInputCheckBlock, table.Application.LastOutputCheckBlock, table.Application.LastTournamentCheckBlock, + table.Application.LastSubmittedClaimCheckBlock, + table.Application.LastAcceptedClaimCheckBlock, table.Application.ProcessedInputs, table.Application.CreatedAt, table.Application.UpdatedAt, @@ -618,6 +632,8 @@ func (r *PostgresRepository) ListApplications( &app.LastInputCheckBlock, &app.LastOutputCheckBlock, &app.LastTournamentCheckBlock, + &app.LastSubmittedClaimCheckBlock, + &app.LastAcceptedClaimCheckBlock, &app.ProcessedInputs, &app.CreatedAt, &app.UpdatedAt, diff --git a/internal/repository/postgres/claimer.go b/internal/repository/postgres/claimer.go index cd13526ee..369492d91 100644 --- a/internal/repository/postgres/claimer.go +++ b/internal/repository/postgres/claimer.go @@ -61,6 +61,8 @@ func (r *PostgresRepository) selectOldestClaimPerApp( table.Application.IinputboxBlock, table.Application.LastInputCheckBlock, table.Application.LastOutputCheckBlock, + table.Application.LastSubmittedClaimCheckBlock, + table.Application.LastAcceptedClaimCheckBlock, table.Application.ProcessedInputs, table.Application.CreatedAt, table.Application.UpdatedAt, @@ -121,6 +123,8 @@ func (r *PostgresRepository) selectOldestClaimPerApp( &application.IInputBoxBlock, &application.LastInputCheckBlock, &application.LastOutputCheckBlock, + &application.LastSubmittedClaimCheckBlock, + &application.LastAcceptedClaimCheckBlock, &application.ProcessedInputs, &application.CreatedAt, &application.UpdatedAt, diff --git a/internal/repository/postgres/db/rollupsdb/public/table/application.go b/internal/repository/postgres/db/rollupsdb/public/table/application.go index 8a855c89c..460df9a6c 100644 --- a/internal/repository/postgres/db/rollupsdb/public/table/application.go +++ b/internal/repository/postgres/db/rollupsdb/public/table/application.go @@ -17,26 +17,28 @@ type applicationTable struct { postgres.Table // Columns - ID postgres.ColumnInteger - Name postgres.ColumnString - IapplicationAddress postgres.ColumnBytea - IconsensusAddress postgres.ColumnBytea - IinputboxAddress postgres.ColumnBytea - IinputboxBlock postgres.ColumnFloat - TemplateHash postgres.ColumnBytea - TemplateURI postgres.ColumnString - EpochLength postgres.ColumnFloat - DataAvailability postgres.ColumnBytea - ConsensusType postgres.ColumnString - State postgres.ColumnString - Reason postgres.ColumnString - LastEpochCheckBlock postgres.ColumnFloat - LastInputCheckBlock postgres.ColumnFloat - LastOutputCheckBlock postgres.ColumnFloat - LastTournamentCheckBlock postgres.ColumnFloat - ProcessedInputs postgres.ColumnFloat - CreatedAt postgres.ColumnTimestampz - UpdatedAt postgres.ColumnTimestampz + ID postgres.ColumnInteger + Name postgres.ColumnString + IapplicationAddress postgres.ColumnBytea + IconsensusAddress postgres.ColumnBytea + IinputboxAddress postgres.ColumnBytea + IinputboxBlock postgres.ColumnFloat + TemplateHash postgres.ColumnBytea + TemplateURI postgres.ColumnString + EpochLength postgres.ColumnFloat + DataAvailability postgres.ColumnBytea + ConsensusType postgres.ColumnString + State postgres.ColumnString + Reason postgres.ColumnString + LastEpochCheckBlock postgres.ColumnFloat + LastInputCheckBlock postgres.ColumnFloat + LastOutputCheckBlock postgres.ColumnFloat + LastTournamentCheckBlock postgres.ColumnFloat + LastSubmittedClaimCheckBlock postgres.ColumnFloat + LastAcceptedClaimCheckBlock postgres.ColumnFloat + ProcessedInputs postgres.ColumnFloat + CreatedAt postgres.ColumnTimestampz + UpdatedAt postgres.ColumnTimestampz AllColumns postgres.ColumnList MutableColumns postgres.ColumnList @@ -78,55 +80,59 @@ func newApplicationTable(schemaName, tableName, alias string) *ApplicationTable func newApplicationTableImpl(schemaName, tableName, alias string) applicationTable { var ( - IDColumn = postgres.IntegerColumn("id") - NameColumn = postgres.StringColumn("name") - IapplicationAddressColumn = postgres.ByteaColumn("iapplication_address") - IconsensusAddressColumn = postgres.ByteaColumn("iconsensus_address") - IinputboxAddressColumn = postgres.ByteaColumn("iinputbox_address") - IinputboxBlockColumn = postgres.FloatColumn("iinputbox_block") - TemplateHashColumn = postgres.ByteaColumn("template_hash") - TemplateURIColumn = postgres.StringColumn("template_uri") - EpochLengthColumn = postgres.FloatColumn("epoch_length") - DataAvailabilityColumn = postgres.ByteaColumn("data_availability") - ConsensusTypeColumn = postgres.StringColumn("consensus_type") - StateColumn = postgres.StringColumn("state") - ReasonColumn = postgres.StringColumn("reason") - LastEpochCheckBlockColumn = postgres.FloatColumn("last_epoch_check_block") - LastInputCheckBlockColumn = postgres.FloatColumn("last_input_check_block") - LastOutputCheckBlockColumn = postgres.FloatColumn("last_output_check_block") - LastTournamentCheckBlockColumn = postgres.FloatColumn("last_tournament_check_block") - ProcessedInputsColumn = postgres.FloatColumn("processed_inputs") - CreatedAtColumn = postgres.TimestampzColumn("created_at") - UpdatedAtColumn = postgres.TimestampzColumn("updated_at") - allColumns = postgres.ColumnList{IDColumn, NameColumn, IapplicationAddressColumn, IconsensusAddressColumn, IinputboxAddressColumn, IinputboxBlockColumn, TemplateHashColumn, TemplateURIColumn, EpochLengthColumn, DataAvailabilityColumn, ConsensusTypeColumn, StateColumn, ReasonColumn, LastEpochCheckBlockColumn, LastInputCheckBlockColumn, LastOutputCheckBlockColumn, LastTournamentCheckBlockColumn, ProcessedInputsColumn, CreatedAtColumn, UpdatedAtColumn} - mutableColumns = postgres.ColumnList{NameColumn, IapplicationAddressColumn, IconsensusAddressColumn, IinputboxAddressColumn, IinputboxBlockColumn, TemplateHashColumn, TemplateURIColumn, EpochLengthColumn, DataAvailabilityColumn, ConsensusTypeColumn, StateColumn, ReasonColumn, LastEpochCheckBlockColumn, LastInputCheckBlockColumn, LastOutputCheckBlockColumn, LastTournamentCheckBlockColumn, ProcessedInputsColumn, CreatedAtColumn, UpdatedAtColumn} - defaultColumns = postgres.ColumnList{CreatedAtColumn, UpdatedAtColumn} + IDColumn = postgres.IntegerColumn("id") + NameColumn = postgres.StringColumn("name") + IapplicationAddressColumn = postgres.ByteaColumn("iapplication_address") + IconsensusAddressColumn = postgres.ByteaColumn("iconsensus_address") + IinputboxAddressColumn = postgres.ByteaColumn("iinputbox_address") + IinputboxBlockColumn = postgres.FloatColumn("iinputbox_block") + TemplateHashColumn = postgres.ByteaColumn("template_hash") + TemplateURIColumn = postgres.StringColumn("template_uri") + EpochLengthColumn = postgres.FloatColumn("epoch_length") + DataAvailabilityColumn = postgres.ByteaColumn("data_availability") + ConsensusTypeColumn = postgres.StringColumn("consensus_type") + StateColumn = postgres.StringColumn("state") + ReasonColumn = postgres.StringColumn("reason") + LastEpochCheckBlockColumn = postgres.FloatColumn("last_epoch_check_block") + LastInputCheckBlockColumn = postgres.FloatColumn("last_input_check_block") + LastOutputCheckBlockColumn = postgres.FloatColumn("last_output_check_block") + LastTournamentCheckBlockColumn = postgres.FloatColumn("last_tournament_check_block") + LastSubmittedClaimCheckBlockColumn = postgres.FloatColumn("last_submitted_claim_check_block") + LastAcceptedClaimCheckBlockColumn = postgres.FloatColumn("last_accepted_claim_check_block") + ProcessedInputsColumn = postgres.FloatColumn("processed_inputs") + CreatedAtColumn = postgres.TimestampzColumn("created_at") + UpdatedAtColumn = postgres.TimestampzColumn("updated_at") + allColumns = postgres.ColumnList{IDColumn, NameColumn, IapplicationAddressColumn, IconsensusAddressColumn, IinputboxAddressColumn, IinputboxBlockColumn, TemplateHashColumn, TemplateURIColumn, EpochLengthColumn, DataAvailabilityColumn, ConsensusTypeColumn, StateColumn, ReasonColumn, LastEpochCheckBlockColumn, LastInputCheckBlockColumn, LastOutputCheckBlockColumn, LastTournamentCheckBlockColumn, LastSubmittedClaimCheckBlockColumn, LastAcceptedClaimCheckBlockColumn, ProcessedInputsColumn, CreatedAtColumn, UpdatedAtColumn} + mutableColumns = postgres.ColumnList{NameColumn, IapplicationAddressColumn, IconsensusAddressColumn, IinputboxAddressColumn, IinputboxBlockColumn, TemplateHashColumn, TemplateURIColumn, EpochLengthColumn, DataAvailabilityColumn, ConsensusTypeColumn, StateColumn, ReasonColumn, LastEpochCheckBlockColumn, LastInputCheckBlockColumn, LastOutputCheckBlockColumn, LastTournamentCheckBlockColumn, LastSubmittedClaimCheckBlockColumn, LastAcceptedClaimCheckBlockColumn, ProcessedInputsColumn, CreatedAtColumn, UpdatedAtColumn} + defaultColumns = postgres.ColumnList{CreatedAtColumn, UpdatedAtColumn} ) return applicationTable{ Table: postgres.NewTable(schemaName, tableName, alias, allColumns...), //Columns - ID: IDColumn, - Name: NameColumn, - IapplicationAddress: IapplicationAddressColumn, - IconsensusAddress: IconsensusAddressColumn, - IinputboxAddress: IinputboxAddressColumn, - IinputboxBlock: IinputboxBlockColumn, - TemplateHash: TemplateHashColumn, - TemplateURI: TemplateURIColumn, - EpochLength: EpochLengthColumn, - DataAvailability: DataAvailabilityColumn, - ConsensusType: ConsensusTypeColumn, - State: StateColumn, - Reason: ReasonColumn, - LastEpochCheckBlock: LastEpochCheckBlockColumn, - LastInputCheckBlock: LastInputCheckBlockColumn, - LastOutputCheckBlock: LastOutputCheckBlockColumn, - LastTournamentCheckBlock: LastTournamentCheckBlockColumn, - ProcessedInputs: ProcessedInputsColumn, - CreatedAt: CreatedAtColumn, - UpdatedAt: UpdatedAtColumn, + ID: IDColumn, + Name: NameColumn, + IapplicationAddress: IapplicationAddressColumn, + IconsensusAddress: IconsensusAddressColumn, + IinputboxAddress: IinputboxAddressColumn, + IinputboxBlock: IinputboxBlockColumn, + TemplateHash: TemplateHashColumn, + TemplateURI: TemplateURIColumn, + EpochLength: EpochLengthColumn, + DataAvailability: DataAvailabilityColumn, + ConsensusType: ConsensusTypeColumn, + State: StateColumn, + Reason: ReasonColumn, + LastEpochCheckBlock: LastEpochCheckBlockColumn, + LastInputCheckBlock: LastInputCheckBlockColumn, + LastOutputCheckBlock: LastOutputCheckBlockColumn, + LastTournamentCheckBlock: LastTournamentCheckBlockColumn, + LastSubmittedClaimCheckBlock: LastSubmittedClaimCheckBlockColumn, + LastAcceptedClaimCheckBlock: LastAcceptedClaimCheckBlockColumn, + ProcessedInputs: ProcessedInputsColumn, + CreatedAt: CreatedAtColumn, + UpdatedAt: UpdatedAtColumn, AllColumns: allColumns, MutableColumns: mutableColumns, diff --git a/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql b/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql index b9c3d5cc7..daaf82479 100644 --- a/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql +++ b/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql @@ -88,6 +88,8 @@ CREATE TABLE "application" "last_input_check_block" uint64 NOT NULL, "last_output_check_block" uint64 NOT NULL, "last_tournament_check_block" uint64 NOT NULL, + "last_submitted_claim_check_block" uint64 NOT NULL, + "last_accepted_claim_check_block" uint64 NOT NULL, "processed_inputs" uint64 NOT NULL, "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), "updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), diff --git a/internal/repository/repotest/application_test_cases.go b/internal/repository/repotest/application_test_cases.go index e8e39dbc8..ad4aaa084 100644 --- a/internal/repository/repotest/application_test_cases.go +++ b/internal/repository/repotest/application_test_cases.go @@ -623,6 +623,22 @@ func (s *ApplicationSuite) TestEventLastCheckBlock() { s.Require().NoError(err) s.Equal(uint64(20), block) + err = s.Repo.UpdateEventLastCheckBlock( + s.Ctx, []int64{app.ID}, MonitoredEvent_ClaimAccepted, 30) + s.Require().NoError(err) + block, err = s.Repo.GetEventLastCheckBlock( + s.Ctx, app.ID, MonitoredEvent_ClaimAccepted) + s.Require().NoError(err) + s.Equal(uint64(30), block) + + err = s.Repo.UpdateEventLastCheckBlock( + s.Ctx, []int64{app.ID}, MonitoredEvent_ClaimSubmitted, 40) + s.Require().NoError(err) + block, err = s.Repo.GetEventLastCheckBlock( + s.Ctx, app.ID, MonitoredEvent_ClaimSubmitted) + s.Require().NoError(err) + s.Equal(uint64(40), block) + // Tournament events all map to the tournament check block column tournamentEvents := []MonitoredEvent{ MonitoredEvent_CommitmentJoined, @@ -639,15 +655,6 @@ func (s *ApplicationSuite) TestEventLastCheckBlock() { s.Require().NoError(err) s.Equal(uint64(30), block) } - - // ClaimSubmitted and ClaimAccepted should return errors - _, err = s.Repo.GetEventLastCheckBlock( - s.Ctx, app.ID, MonitoredEvent_ClaimSubmitted) - s.Require().Error(err) - - _, err = s.Repo.GetEventLastCheckBlock( - s.Ctx, app.ID, MonitoredEvent_ClaimAccepted) - s.Require().Error(err) }) s.Run("UpdateMultipleAppIDs", func() {