Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto)
import DMQ.Configuration
import DMQ.Configuration.CLIOptions (parseCLIOptions)
import DMQ.Configuration.Topology (readTopologyFileOrError)
import DMQ.Policy qualified as Policy
import DMQ.Diffusion.Applications (diffusionApplications)
import DMQ.Diffusion.Arguments
import DMQ.Diffusion.NodeKernel
Expand Down Expand Up @@ -179,7 +180,7 @@ runDMQ commandLineConfig = do
(encodeRemoteAddress (maxBound @NodeToNodeVersion))
(decodeRemoteAddress (maxBound @NodeToNodeVersion)))
dmqLimitsAndTimeouts
defaultSigDecisionPolicy
Policy.sigDecisionPolicy
ntcValidationTracer = if validationTracer
then WithEventType "NtC Validation" >$< tracer
else nullTracer
Expand Down
1 change: 1 addition & 0 deletions dmq-node/dmq-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ library
DMQ.Configuration
DMQ.Configuration.CLIOptions
DMQ.Configuration.Topology
DMQ.Policy
DMQ.Diffusion.Applications
DMQ.Diffusion.Arguments
DMQ.Diffusion.NodeKernel
Expand Down
16 changes: 0 additions & 16 deletions dmq-node/src/DMQ/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ module DMQ.Configuration
, I (..)
, readConfigurationFileOrError
, mkDiffusionConfiguration
, defaultSigDecisionPolicy
, defaultConfiguration
, NoExtraConfig (..)
, NoExtraFlags (..)
Expand Down Expand Up @@ -73,7 +72,6 @@ import Ouroboros.Network.PeerSelection.LedgerPeers.Type
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
import Ouroboros.Network.Snocket (LocalAddress (..), RemoteAddress)
import Ouroboros.Network.TxSubmission.Inbound.V2 (TxDecisionPolicy (..))

import DMQ.Configuration.Topology (NoExtraConfig (..), NoExtraFlags (..))

Expand Down Expand Up @@ -611,20 +609,6 @@ mkDiffusionConfiguration
}


-- TODO: review this once we know what is the size of a `Sig`.
-- TODO: parts of should be configurable
defaultSigDecisionPolicy :: TxDecisionPolicy
defaultSigDecisionPolicy = TxDecisionPolicy {
maxNumTxIdsToRequest = 10,
maxUnacknowledgedTxIds = 40,
txsSizeInflightPerPeer = 100_000,
maxTxsSizeInflight = 250_000,
txInflightMultiplicity = 1,
bufferedTxsMinLifetime = 0,
scoreRate = 0.1,
scoreMax = 15 * 60
}

data ConfigurationError =
NoAddressInformation -- ^ dmq was not configured with IPv4 or IPv6 address
deriving Show
Expand Down
3 changes: 2 additions & 1 deletion dmq-node/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..),
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool

import DMQ.Configuration
import DMQ.Policy qualified as Policy
import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId)
import DMQ.Tracer

Expand Down Expand Up @@ -207,7 +208,7 @@ withNodeKernel tracer
then WithEventType "SigSubmission.Logic" >$< tracer
else nullTracer)
nullTracer
defaultSigDecisionPolicy
Policy.sigDecisionPolicy
sigChannelVar
sigSharedTxStateVar)
$ \sigLogicThread ->
Expand Down
7 changes: 2 additions & 5 deletions dmq-node/src/DMQ/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import Network.TypedProtocol.Codec (AnnotatedCodec, Codec)
import Cardano.KESAgent.KES.Crypto (Crypto (..))

import DMQ.Configuration (Configuration, Configuration' (..), I (..))
import DMQ.Policy qualified as Policy
import DMQ.Diffusion.NodeKernel (NodeKernel (..))
import DMQ.NodeToNode.Version
import DMQ.Protocol.SigSubmission.Codec
Expand Down Expand Up @@ -586,11 +587,7 @@ data LimitsAndTimeouts crypto addr =
dmqLimitsAndTimeouts :: LimitsAndTimeouts crypto addr
dmqLimitsAndTimeouts =
LimitsAndTimeouts {
sigSubmissionLimits =
MiniProtocolLimits {
-- TODO
maximumIngressQueue = maxBound
}
sigSubmissionLimits = Policy.sigSubmissionIngressLimit
, sigSubmissionTimeLimits = timeLimitsSigSubmission
, sigSubmissionSizeLimits = byteLimitsSigSubmission size

Expand Down
59 changes: 59 additions & 0 deletions dmq-node/src/DMQ/Policy.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{-# LANGUAGE NumericUnderscores #-}

module DMQ.Policy
( sigDecisionPolicy
, sigSubmissionIngressLimit
) where

import DMQ.Protocol.SigSubmission.Type (NumTxIdsToReq)
import Network.Mux.Types (MiniProtocolLimits (..))
import Ouroboros.Network.SizeInBytes (SizeInBytes)
import Ouroboros.Network.TxSubmission.Inbound.V2

-- TODO: make it configurable
maxSigSize :: SizeInBytes
maxSigSize = 2800

-- | Maximum numbers signatures in-flight per peer.
--
-- TODO: make it configurable
maxSigsInflight :: NumTxIdsToReq
maxSigsInflight = 33

-- | The `TxDecisionPolicy` used by `SigSubmission`.
--
-- * `maxUnacknowledgedTxIds` is 4 times `maxSigsInflight` (@132@).
-- * `txsSizeInflightPerPeer` is `maxSigSize * maxSigsInflight` (@92.4kB@).
-- * By default there are 25 hot peers, so at most
-- @
-- 25 * txsSizeInflightPerPeer
-- @
-- bytes (@2310kB@) in-flight for NodeToNodeV_2 or higher.
--
sigDecisionPolicy :: TxDecisionPolicy
sigDecisionPolicy = TxDecisionPolicy {
maxNumTxIdsToRequest = maxSigsInflight,
maxUnacknowledgedTxIds = 4 * maxSigsInflight,
txsSizeInflightPerPeer = maxSigSize * fromIntegral maxSigsInflight,
maxTxsSizeInflight = 250_000,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the text in the comment as maxTxsSizeInflight inflight limit instead of the lower 250k?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maxTxSizeInflight will go away soon, so I phrased it to be future-proof.

txInflightMultiplicity = 1,
bufferedTxsMinLifetime = 0,
scoreRate = 0.1,
scoreMax = 15 * 60
}


-- | Ingress limit for SigSubmission mini-protocol, derived from
-- `sigDecisionPolicy` (`txsSizeInflightPerPeer`).
--
sigSubmissionIngressLimit :: MiniProtocolLimits
sigSubmissionIngressLimit = MiniProtocolLimits {
maximumIngressQueue = addMargin (fromIntegral sigSizeInflight)
}
where
TxDecisionPolicy { txsSizeInflightPerPeer = sigSizeInflight }
= sigDecisionPolicy

addMargin :: Int -> Int
addMargin = \x -> x + x `div` 10

Loading