From a535e2d618e5badbe2a07c7a1dc16b780cb259a7 Mon Sep 17 00:00:00 2001 From: Damian Orzechowski Date: Tue, 28 Oct 2025 14:20:46 +0100 Subject: [PATCH] Bulk version of DigestMessage --- arbnode/transaction_streamer.go | 153 ++++++++++++++++++------- execution/gethexec/node.go | 10 ++ execution/interface.go | 5 + execution/nethexec/compare_client.go | 7 ++ execution/nethexec/execution_client.go | 13 +++ execution/nethexec/nethrpcclient.go | 28 +++++ 6 files changed, 177 insertions(+), 39 deletions(-) diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index cf1675a30a1..d44f1c825e0 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -79,6 +79,7 @@ type TransactionStreamerConfig struct { ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"` SyncTillBlock uint64 `koanf:"sync-till-block"` TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"` + BulkDigestMessage bool `koanf:"bulk-digest-message"` } type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig @@ -89,6 +90,7 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{ ExecuteMessageLoopDelay: time.Millisecond * 100, SyncTillBlock: 0, TrackBlockMetadataFrom: 0, + BulkDigestMessage: false, } var TestTransactionStreamerConfig = TransactionStreamerConfig{ @@ -97,6 +99,7 @@ var TestTransactionStreamerConfig = TransactionStreamerConfig{ ExecuteMessageLoopDelay: time.Millisecond, SyncTillBlock: 0, TrackBlockMetadataFrom: 0, + BulkDigestMessage: false, } func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -105,6 +108,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages") f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block") f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number to start saving blockmetadata, 0 to disable") + f.Bool(prefix+".bulk-digest-message", DefaultTransactionStreamerConfig.BulkDigestMessage, "call batched DigestMessage variant") } func NewTransactionStreamer( @@ -1359,58 +1363,129 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool { return false } + log.Info("ExecuteNextMsg exec pre check", "execHeadMsgIdx", execHeadMsgIdx, "consensus", consensusHeadMsgIdx, "syncTillMessage", s.syncTillMessage) + if execHeadMsgIdx >= consensusHeadMsgIdx || (s.syncTillMessage > 0 && execHeadMsgIdx >= s.syncTillMessage) { return false } - msgIdxToExecute := execHeadMsgIdx + 1 - msgAndBlockInfo, err := s.getMessageWithMetadataAndBlockInfo(msgIdxToExecute) - if err != nil { - log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute", msgIdxToExecute) - return false - } - var msgForPrefetch *arbostypes.MessageWithMetadata - if msgIdxToExecute+1 <= consensusHeadMsgIdx { - msg, err := s.GetMessage(msgIdxToExecute + 1) + if s.config().BulkDigestMessage { + var msgAndBlockInfos []*arbostypes.MessageWithMetadataAndBlockInfo + var msgsOnly []*arbostypes.MessageWithMetadata + var msgForPrefetch *arbostypes.MessageWithMetadata + msgIdxToExecute := execHeadMsgIdx + 1 + + var batchSize arbutil.MessageIndex = 100 + if batchSize > (consensusHeadMsgIdx - execHeadMsgIdx) { + batchSize = consensusHeadMsgIdx - execHeadMsgIdx + } + + log.Info("ExecuteNextMsg batch size", "batchSize", batchSize, "consensus", consensusHeadMsgIdx, "execHeadMsgIdx", execHeadMsgIdx) + + for i := execHeadMsgIdx + 1; i <= execHeadMsgIdx+batchSize; i++ { + msgAndBlockInfo, err := s.getMessageWithMetadataAndBlockInfo(i) + if err != nil { + log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute", i) + return false + } + msgAndBlockInfos = append(msgAndBlockInfos, msgAndBlockInfo) + msgsOnly = append(msgsOnly, &msgAndBlockInfo.MessageWithMeta) + + //if msgIdxToExecute+1 <= consensusHeadMsgIdx { + // msg, err := s.GetMessage(msgIdxToExecute + 1) + // if err != nil { + // log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute+1", msgIdxToExecute+1) + // return false + // } + // msgForPrefetch = msg + //} + } + + msgResult, err := s.exec.DigestMessages(msgIdxToExecute, msgsOnly, msgForPrefetch).Await(ctx) if err != nil { - log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute+1", msgIdxToExecute+1) + logger := log.Warn + if (prevHeadMsgIdx == nil) || (*prevHeadMsgIdx < consensusHeadMsgIdx) { + logger = log.Debug + } + logger("ExecuteNextMsg failed to send message to execEngine", "err", err, "msgIdxToExecute", msgIdxToExecute) return false } - msgForPrefetch = msg - } - msgResult, err := s.exec.DigestMessage(msgIdxToExecute, &msgAndBlockInfo.MessageWithMeta, msgForPrefetch).Await(ctx) - if err != nil { - logger := log.Warn - if (prevHeadMsgIdx == nil) || (*prevHeadMsgIdx < consensusHeadMsgIdx) { - logger = log.Debug + + for r := 0; r < len(msgResult.Results); r++ { + s.checkResult(msgIdxToExecute+arbutil.MessageIndex(r), &msgResult.Results[r], msgAndBlockInfos[r]) + + batch := s.db.NewBatch() + err = s.storeResult(msgIdxToExecute+arbutil.MessageIndex(r), msgResult.Results[r], batch) + if err != nil { + log.Error("ExecuteNextMsg failed to store result", "err", err) + return false + } + err = batch.Write() + if err != nil { + log.Error("ExecuteNextMsg failed to store result", "err", err) + return false + } + + msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: msgAndBlockInfos[r].MessageWithMeta, + BlockHash: &msgResult.Results[r].BlockHash, + BlockMetadata: msgAndBlockInfos[r].BlockMetadata, + } + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute+arbutil.MessageIndex(r)) + } + log.Info("ExecuteNextMsg return", "return", execHeadMsgIdx+batchSize <= consensusHeadMsgIdx) + return execHeadMsgIdx+batchSize <= consensusHeadMsgIdx + } else { + msgIdxToExecute := execHeadMsgIdx + 1 + + msgAndBlockInfo, err := s.getMessageWithMetadataAndBlockInfo(msgIdxToExecute) + if err != nil { + log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute", msgIdxToExecute) + return false + } + var msgForPrefetch *arbostypes.MessageWithMetadata + if msgIdxToExecute+1 <= consensusHeadMsgIdx { + msg, err := s.GetMessage(msgIdxToExecute + 1) + if err != nil { + log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute+1", msgIdxToExecute+1) + return false + } + msgForPrefetch = msg + } + msgResult, err := s.exec.DigestMessage(msgIdxToExecute, &msgAndBlockInfo.MessageWithMeta, msgForPrefetch).Await(ctx) + if err != nil { + logger := log.Warn + if (prevHeadMsgIdx == nil) || (*prevHeadMsgIdx < consensusHeadMsgIdx) { + logger = log.Debug + } + logger("ExecuteNextMsg failed to send message to execEngine", "err", err, "msgIdxToExecute", msgIdxToExecute) + return false } - logger("ExecuteNextMsg failed to send message to execEngine", "err", err, "msgIdxToExecute", msgIdxToExecute) - return false - } - s.checkResult(msgIdxToExecute, msgResult, msgAndBlockInfo) + s.checkResult(msgIdxToExecute, msgResult, msgAndBlockInfo) - batch := s.db.NewBatch() - err = s.storeResult(msgIdxToExecute, *msgResult, batch) - if err != nil { - log.Error("ExecuteNextMsg failed to store result", "err", err) - return false - } - err = batch.Write() - if err != nil { - log.Error("ExecuteNextMsg failed to store result", "err", err) - return false - } + batch := s.db.NewBatch() + err = s.storeResult(msgIdxToExecute, *msgResult, batch) + if err != nil { + log.Error("ExecuteNextMsg failed to store result", "err", err) + return false + } + err = batch.Write() + if err != nil { + log.Error("ExecuteNextMsg failed to store result", "err", err) + return false + } - msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{ - MessageWithMeta: msgAndBlockInfo.MessageWithMeta, - BlockHash: &msgResult.BlockHash, - BlockMetadata: msgAndBlockInfo.BlockMetadata, - } - s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute) + msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: msgAndBlockInfo.MessageWithMeta, + BlockHash: &msgResult.BlockHash, + BlockMetadata: msgAndBlockInfo.BlockMetadata, + } + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute) - return msgIdxToExecute+1 <= consensusHeadMsgIdx + return msgIdxToExecute+1 <= consensusHeadMsgIdx + } } func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struct{}) time.Duration { diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 1b9a84d310b..f4b3f649a2b 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -475,6 +475,16 @@ func (n *ExecutionNode) DigestMessage(index arbutil.MessageIndex, msg *arbostype }() return &promise } + +func (n *ExecutionNode) DigestMessages(index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.BulkMessageResult] { + promise := containers.NewPromise[*execution.BulkMessageResult](nil) + go func() { + var res *execution.BulkMessageResult = nil + promise.Produce(res) + }() + return &promise +} + func (n *ExecutionNode) Reorg(newHeadMsgIdx arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*execution.MessageResult] { return containers.NewReadyPromise(n.ExecEngine.Reorg(newHeadMsgIdx, newMessages, oldMessages)) } diff --git a/execution/interface.go b/execution/interface.go index 3e691fdd339..a7e662d4292 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -21,6 +21,10 @@ type MessageResult struct { SendRoot common.Hash } +type BulkMessageResult struct { + Results []MessageResult +} + type RecordResult struct { Index arbutil.MessageIndex BlockHash common.Hash @@ -39,6 +43,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken") // always needed type ExecutionClient interface { DigestMessage(msgIdx arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*MessageResult] + DigestMessages(msgIdx arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*BulkMessageResult] Reorg(msgIdxOfFirstMsgToAdd arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*MessageResult] HeadMessageIndex() containers.PromiseInterface[arbutil.MessageIndex] ResultAtMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[*MessageResult] diff --git a/execution/nethexec/compare_client.go b/execution/nethexec/compare_client.go index 97a3dcfb319..5e91ab577e4 100644 --- a/execution/nethexec/compare_client.go +++ b/execution/nethexec/compare_client.go @@ -108,6 +108,13 @@ func (w *compareExecutionClient) DigestMessage(index arbutil.MessageIndex, msg * return result } +func (w *compareExecutionClient) DigestMessages(index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.BulkMessageResult] { + promise := containers.NewPromise[*execution.BulkMessageResult](nil) + var res *execution.BulkMessageResult = nil + promise.Produce(res) + return &promise +} + func (w *compareExecutionClient) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*execution.MessageResult] { start := time.Now() log.Info("CompareExecutionClient: Reorg", "count", count, "newMessagesCount", len(newMessages), "oldMessagesCount", len(oldMessages)) diff --git a/execution/nethexec/execution_client.go b/execution/nethexec/execution_client.go index 875a115479e..c2520f4f726 100644 --- a/execution/nethexec/execution_client.go +++ b/execution/nethexec/execution_client.go @@ -49,6 +49,19 @@ func (p *nethermindExecutionClient) DigestMessage(index arbutil.MessageIndex, ms return &promise } +func (p *nethermindExecutionClient) DigestMessages(index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.BulkMessageResult] { + promise := containers.NewPromise[*execution.BulkMessageResult](nil) + go func() { + res := p.rpcClient.DigestMessages(context.Background(), index, msg, msgForPrefetch) + if res == nil { + promise.ProduceError(fmt.Errorf("external DigestMessage returned nil")) + return + } + promise.Produce(res) + }() + return &promise +} + func (p *nethermindExecutionClient) SetFinalityData(ctx context.Context, safeFinalityData *arbutil.FinalityData, finalizedFinalityData *arbutil.FinalityData, validatedFinalityData *arbutil.FinalityData) containers.PromiseInterface[struct{}] { promise := containers.NewPromise[struct{}](nil) go func() { diff --git a/execution/nethexec/nethrpcclient.go b/execution/nethexec/nethrpcclient.go index c65a97385ab..c66ffb6dea2 100644 --- a/execution/nethexec/nethrpcclient.go +++ b/execution/nethexec/nethrpcclient.go @@ -34,6 +34,12 @@ type messageParams struct { MessageForPrefetch *arbostypes.MessageWithMetadata `json:"messageForPrefetch,omitempty"` } +type bulkMessageParams struct { + StartIndex arbutil.MessageIndex `json:"startIndex"` + Messages []*arbostypes.MessageWithMetadata `json:"messages"` + MessageForPrefetch *arbostypes.MessageWithMetadata `json:"messageForPrefetch,omitempty"` +} + type initializeMessageParams struct { InitialL1BaseFee *big.Int `json:"initialL1BaseFee"` SerializedChainConfig []byte `json:"serializedChainConfig"` @@ -142,6 +148,28 @@ func (c *nethRpcClient) DigestMessage(ctx context.Context, index arbutil.Message return &result } +func (c *nethRpcClient) DigestMessages(ctx context.Context, index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) *execution.BulkMessageResult { + params := bulkMessageParams{ + StartIndex: index, + Messages: msg, + MessageForPrefetch: msgForPrefetch, + } + + //log.Debug("Making JSON-RPC call to DigestMessage", + // "url", c.url, + // "index", index, + // "messageType", msg.Message.Header.Kind, + //) + + var result execution.BulkMessageResult + if err := c.client.CallContext(ctx, &result, "DigestMessages", params); err != nil { + log.Error("Failed to call DigestMessage", "error", err) + return nil + } + + return &result +} + func (c *nethRpcClient) DigestInitMessage(ctx context.Context, initialL1BaseFee *big.Int, serializedChainConfig []byte) *execution.MessageResult { var result execution.MessageResult