Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 114 additions & 39 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type TransactionStreamerConfig struct {
ExecuteMessageLoopDelay time.Duration `koanf:"execute-message-loop-delay" reload:"hot"`
SyncTillBlock uint64 `koanf:"sync-till-block"`
TrackBlockMetadataFrom uint64 `koanf:"track-block-metadata-from"`
BulkDigestMessage bool `koanf:"bulk-digest-message"`
}

type TransactionStreamerConfigFetcher func() *TransactionStreamerConfig
Expand All @@ -89,6 +90,7 @@ var DefaultTransactionStreamerConfig = TransactionStreamerConfig{
ExecuteMessageLoopDelay: time.Millisecond * 100,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
BulkDigestMessage: false,
}

var TestTransactionStreamerConfig = TransactionStreamerConfig{
Expand All @@ -97,6 +99,7 @@ var TestTransactionStreamerConfig = TransactionStreamerConfig{
ExecuteMessageLoopDelay: time.Millisecond,
SyncTillBlock: 0,
TrackBlockMetadataFrom: 0,
BulkDigestMessage: false,
}

func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -105,6 +108,7 @@ func TransactionStreamerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Duration(prefix+".execute-message-loop-delay", DefaultTransactionStreamerConfig.ExecuteMessageLoopDelay, "delay when polling calls to execute messages")
f.Uint64(prefix+".sync-till-block", DefaultTransactionStreamerConfig.SyncTillBlock, "node will not sync past this block")
f.Uint64(prefix+".track-block-metadata-from", DefaultTransactionStreamerConfig.TrackBlockMetadataFrom, "block number to start saving blockmetadata, 0 to disable")
f.Bool(prefix+".bulk-digest-message", DefaultTransactionStreamerConfig.BulkDigestMessage, "call batched DigestMessage variant")
}

func NewTransactionStreamer(
Expand Down Expand Up @@ -1359,58 +1363,129 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
return false
}

log.Info("ExecuteNextMsg exec pre check", "execHeadMsgIdx", execHeadMsgIdx, "consensus", consensusHeadMsgIdx, "syncTillMessage", s.syncTillMessage)

if execHeadMsgIdx >= consensusHeadMsgIdx ||
(s.syncTillMessage > 0 && execHeadMsgIdx >= s.syncTillMessage) {
return false
}
msgIdxToExecute := execHeadMsgIdx + 1

msgAndBlockInfo, err := s.getMessageWithMetadataAndBlockInfo(msgIdxToExecute)
if err != nil {
log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute", msgIdxToExecute)
return false
}
var msgForPrefetch *arbostypes.MessageWithMetadata
if msgIdxToExecute+1 <= consensusHeadMsgIdx {
msg, err := s.GetMessage(msgIdxToExecute + 1)
if s.config().BulkDigestMessage {
var msgAndBlockInfos []*arbostypes.MessageWithMetadataAndBlockInfo
var msgsOnly []*arbostypes.MessageWithMetadata
var msgForPrefetch *arbostypes.MessageWithMetadata
msgIdxToExecute := execHeadMsgIdx + 1

var batchSize arbutil.MessageIndex = 100
if batchSize > (consensusHeadMsgIdx - execHeadMsgIdx) {
batchSize = consensusHeadMsgIdx - execHeadMsgIdx
}

log.Info("ExecuteNextMsg batch size", "batchSize", batchSize, "consensus", consensusHeadMsgIdx, "execHeadMsgIdx", execHeadMsgIdx)

for i := execHeadMsgIdx + 1; i <= execHeadMsgIdx+batchSize; i++ {
msgAndBlockInfo, err := s.getMessageWithMetadataAndBlockInfo(i)
if err != nil {
log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute", i)
return false
}
msgAndBlockInfos = append(msgAndBlockInfos, msgAndBlockInfo)
msgsOnly = append(msgsOnly, &msgAndBlockInfo.MessageWithMeta)

//if msgIdxToExecute+1 <= consensusHeadMsgIdx {
// msg, err := s.GetMessage(msgIdxToExecute + 1)
// if err != nil {
// log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute+1", msgIdxToExecute+1)
// return false
// }
// msgForPrefetch = msg
//}
}

msgResult, err := s.exec.DigestMessages(msgIdxToExecute, msgsOnly, msgForPrefetch).Await(ctx)
if err != nil {
log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute+1", msgIdxToExecute+1)
logger := log.Warn
if (prevHeadMsgIdx == nil) || (*prevHeadMsgIdx < consensusHeadMsgIdx) {
logger = log.Debug
}
logger("ExecuteNextMsg failed to send message to execEngine", "err", err, "msgIdxToExecute", msgIdxToExecute)
return false
}
msgForPrefetch = msg
}
msgResult, err := s.exec.DigestMessage(msgIdxToExecute, &msgAndBlockInfo.MessageWithMeta, msgForPrefetch).Await(ctx)
if err != nil {
logger := log.Warn
if (prevHeadMsgIdx == nil) || (*prevHeadMsgIdx < consensusHeadMsgIdx) {
logger = log.Debug

for r := 0; r < len(msgResult.Results); r++ {
s.checkResult(msgIdxToExecute+arbutil.MessageIndex(r), &msgResult.Results[r], msgAndBlockInfos[r])

batch := s.db.NewBatch()
err = s.storeResult(msgIdxToExecute+arbutil.MessageIndex(r), msgResult.Results[r], batch)
if err != nil {
log.Error("ExecuteNextMsg failed to store result", "err", err)
return false
}
err = batch.Write()
if err != nil {
log.Error("ExecuteNextMsg failed to store result", "err", err)
return false
}

msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: msgAndBlockInfos[r].MessageWithMeta,
BlockHash: &msgResult.Results[r].BlockHash,
BlockMetadata: msgAndBlockInfos[r].BlockMetadata,
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute+arbutil.MessageIndex(r))
}
log.Info("ExecuteNextMsg return", "return", execHeadMsgIdx+batchSize <= consensusHeadMsgIdx)
return execHeadMsgIdx+batchSize <= consensusHeadMsgIdx
} else {
msgIdxToExecute := execHeadMsgIdx + 1

msgAndBlockInfo, err := s.getMessageWithMetadataAndBlockInfo(msgIdxToExecute)
if err != nil {
log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute", msgIdxToExecute)
return false
}
var msgForPrefetch *arbostypes.MessageWithMetadata
if msgIdxToExecute+1 <= consensusHeadMsgIdx {
msg, err := s.GetMessage(msgIdxToExecute + 1)
if err != nil {
log.Error("ExecuteNextMsg failed to readMessage", "err", err, "msgIdxToExecute+1", msgIdxToExecute+1)
return false
}
msgForPrefetch = msg
}
msgResult, err := s.exec.DigestMessage(msgIdxToExecute, &msgAndBlockInfo.MessageWithMeta, msgForPrefetch).Await(ctx)
if err != nil {
logger := log.Warn
if (prevHeadMsgIdx == nil) || (*prevHeadMsgIdx < consensusHeadMsgIdx) {
logger = log.Debug
}
logger("ExecuteNextMsg failed to send message to execEngine", "err", err, "msgIdxToExecute", msgIdxToExecute)
return false
}
logger("ExecuteNextMsg failed to send message to execEngine", "err", err, "msgIdxToExecute", msgIdxToExecute)
return false
}

s.checkResult(msgIdxToExecute, msgResult, msgAndBlockInfo)
s.checkResult(msgIdxToExecute, msgResult, msgAndBlockInfo)

batch := s.db.NewBatch()
err = s.storeResult(msgIdxToExecute, *msgResult, batch)
if err != nil {
log.Error("ExecuteNextMsg failed to store result", "err", err)
return false
}
err = batch.Write()
if err != nil {
log.Error("ExecuteNextMsg failed to store result", "err", err)
return false
}
batch := s.db.NewBatch()
err = s.storeResult(msgIdxToExecute, *msgResult, batch)
if err != nil {
log.Error("ExecuteNextMsg failed to store result", "err", err)
return false
}
err = batch.Write()
if err != nil {
log.Error("ExecuteNextMsg failed to store result", "err", err)
return false
}

msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: msgAndBlockInfo.MessageWithMeta,
BlockHash: &msgResult.BlockHash,
BlockMetadata: msgAndBlockInfo.BlockMetadata,
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute)
msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: msgAndBlockInfo.MessageWithMeta,
BlockHash: &msgResult.BlockHash,
BlockMetadata: msgAndBlockInfo.BlockMetadata,
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute)

return msgIdxToExecute+1 <= consensusHeadMsgIdx
return msgIdxToExecute+1 <= consensusHeadMsgIdx
}
}

func (s *TransactionStreamer) executeMessages(ctx context.Context, ignored struct{}) time.Duration {
Expand Down
10 changes: 10 additions & 0 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,16 @@ func (n *ExecutionNode) DigestMessage(index arbutil.MessageIndex, msg *arbostype
}()
return &promise
}

func (n *ExecutionNode) DigestMessages(index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.BulkMessageResult] {
promise := containers.NewPromise[*execution.BulkMessageResult](nil)
go func() {
var res *execution.BulkMessageResult = nil
promise.Produce(res)
}()
return &promise
}

func (n *ExecutionNode) Reorg(newHeadMsgIdx arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*execution.MessageResult] {
return containers.NewReadyPromise(n.ExecEngine.Reorg(newHeadMsgIdx, newMessages, oldMessages))
}
Expand Down
5 changes: 5 additions & 0 deletions execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type MessageResult struct {
SendRoot common.Hash
}

type BulkMessageResult struct {
Results []MessageResult
}

type RecordResult struct {
Index arbutil.MessageIndex
BlockHash common.Hash
Expand All @@ -39,6 +43,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken")
// always needed
type ExecutionClient interface {
DigestMessage(msgIdx arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*MessageResult]
DigestMessages(msgIdx arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*BulkMessageResult]
Reorg(msgIdxOfFirstMsgToAdd arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*MessageResult]
HeadMessageIndex() containers.PromiseInterface[arbutil.MessageIndex]
ResultAtMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[*MessageResult]
Expand Down
7 changes: 7 additions & 0 deletions execution/nethexec/compare_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ func (w *compareExecutionClient) DigestMessage(index arbutil.MessageIndex, msg *
return result
}

func (w *compareExecutionClient) DigestMessages(index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.BulkMessageResult] {
promise := containers.NewPromise[*execution.BulkMessageResult](nil)
var res *execution.BulkMessageResult = nil
promise.Produce(res)
return &promise
}

func (w *compareExecutionClient) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*execution.MessageResult] {
start := time.Now()
log.Info("CompareExecutionClient: Reorg", "count", count, "newMessagesCount", len(newMessages), "oldMessagesCount", len(oldMessages))
Expand Down
13 changes: 13 additions & 0 deletions execution/nethexec/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ func (p *nethermindExecutionClient) DigestMessage(index arbutil.MessageIndex, ms
return &promise
}

func (p *nethermindExecutionClient) DigestMessages(index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.BulkMessageResult] {
promise := containers.NewPromise[*execution.BulkMessageResult](nil)
go func() {
res := p.rpcClient.DigestMessages(context.Background(), index, msg, msgForPrefetch)
if res == nil {
promise.ProduceError(fmt.Errorf("external DigestMessage returned nil"))
return
}
promise.Produce(res)
}()
return &promise
}

func (p *nethermindExecutionClient) SetFinalityData(ctx context.Context, safeFinalityData *arbutil.FinalityData, finalizedFinalityData *arbutil.FinalityData, validatedFinalityData *arbutil.FinalityData) containers.PromiseInterface[struct{}] {
promise := containers.NewPromise[struct{}](nil)
go func() {
Expand Down
28 changes: 28 additions & 0 deletions execution/nethexec/nethrpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ type messageParams struct {
MessageForPrefetch *arbostypes.MessageWithMetadata `json:"messageForPrefetch,omitempty"`
}

type bulkMessageParams struct {
StartIndex arbutil.MessageIndex `json:"startIndex"`
Messages []*arbostypes.MessageWithMetadata `json:"messages"`
MessageForPrefetch *arbostypes.MessageWithMetadata `json:"messageForPrefetch,omitempty"`
}

type initializeMessageParams struct {
InitialL1BaseFee *big.Int `json:"initialL1BaseFee"`
SerializedChainConfig []byte `json:"serializedChainConfig"`
Expand Down Expand Up @@ -142,6 +148,28 @@ func (c *nethRpcClient) DigestMessage(ctx context.Context, index arbutil.Message
return &result
}

func (c *nethRpcClient) DigestMessages(ctx context.Context, index arbutil.MessageIndex, msg []*arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) *execution.BulkMessageResult {
params := bulkMessageParams{
StartIndex: index,
Messages: msg,
MessageForPrefetch: msgForPrefetch,
}

//log.Debug("Making JSON-RPC call to DigestMessage",
// "url", c.url,
// "index", index,
// "messageType", msg.Message.Header.Kind,
//)

var result execution.BulkMessageResult
if err := c.client.CallContext(ctx, &result, "DigestMessages", params); err != nil {
log.Error("Failed to call DigestMessage", "error", err)
return nil
}

return &result
}

func (c *nethRpcClient) DigestInitMessage(ctx context.Context, initialL1BaseFee *big.Int, serializedChainConfig []byte) *execution.MessageResult {
var result execution.MessageResult

Expand Down