Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
members = ["examples", "jito_protos", "proxy"]
resolver = "2"

[profile.debug-release]
inherits = "release"
debug = true

[workspace.package]
version = "0.2.12-triton"
description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://docs.jito.wtf/lowlatencytxnfeed/ for details."
Expand All @@ -14,6 +18,7 @@ ahash = "0.8"
arc-swap = "1.6"
bincode = "1.3.3"
borsh = "1.5.3"
bytes = "1.11.0"
clap = { version = "4", features = ["derive", "env"] }
crossbeam-channel = "0.5.8"
dashmap = "5"
Expand All @@ -24,6 +29,7 @@ jito-protos = { path = "jito_protos" }
lazy_static = "1.4.0"
libc = "0.2"
log = "0.4"
mio = "1.1.1"
prost = "0.13"
prost-types = "0.13"
prometheus = "0.14.0"
Expand Down
75 changes: 75 additions & 0 deletions data-sample.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
set 1:

shredstream_recv_interval_usec_bucket{le="1"} 41

shredstream_recv_interval_usec_bucket{le="5"} 63

shredstream_recv_interval_usec_bucket{le="10"} 125

shredstream_recv_interval_usec_bucket{le="25"} 47676

shredstream_recv_interval_usec_bucket{le="50"} 104430

shredstream_recv_interval_usec_bucket{le="100"} 162673

shredstream_recv_interval_usec_bucket{le="200"} 190777

shredstream_recv_interval_usec_bucket{le="500"} 205050

shredstream_recv_interval_usec_bucket{le="1000"} 210046

shredstream_recv_interval_usec_bucket{le="2000"} 212204

shredstream_recv_interval_usec_bucket{le="+Inf"} 214080



set 2:

shredstream_recv_interval_usec_bucket{le="1"} 0

shredstream_recv_interval_usec_bucket{le="5"} 0

shredstream_recv_interval_usec_bucket{le="10"} 22

shredstream_recv_interval_usec_bucket{le="25"} 864700

shredstream_recv_interval_usec_bucket{le="50"} 1059516

shredstream_recv_interval_usec_bucket{le="100"} 1334130

shredstream_recv_interval_usec_bucket{le="200"} 1473381

shredstream_recv_interval_usec_bucket{le="500"} 1545124

shredstream_recv_interval_usec_bucket{le="1000"} 1569639

shredstream_recv_interval_usec_bucket{le="2000"} 1580383

shredstream_recv_interval_usec_bucket{le="+Inf"} 1589948



set 3 :

shredstream_recv_interval_usec_bucket{le="1"} 0

shredstream_recv_interval_usec_bucket{le="5"} 0

shredstream_recv_interval_usec_bucket{le="10"} 2

shredstream_recv_interval_usec_bucket{le="25"} 129306

shredstream_recv_interval_usec_bucket{le="50"} 159982

shredstream_recv_interval_usec_bucket{le="100"} 202752

shredstream_recv_interval_usec_bucket{le="200"} 225469

shredstream_recv_interval_usec_bucket{le="500"} 238215

shredstream_recv_interval_usec_bucket{le="1000"} 242741

shredstream_recv_interval_usec_bucket{le="2000"} 244727

shredstream_recv_interval_usec_bucket{le="+Inf"} 246249
12 changes: 12 additions & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@ authors = { workspace = true }
homepage = { workspace = true }
edition = { workspace = true }

[[bin]]
name = "triton-shredproxy"
path = "src/main2.rs"

[[bin]]
name = "jito-shredstream-proxy"
path = "src/main.rs"



[dependencies]
ahash = { workspace = true }
arc-swap = { workspace = true }
bincode = { workspace = true }
borsh = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
Expand All @@ -21,6 +32,7 @@ jito-protos = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
libc = { workspace = true }
mio = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand Down
67 changes: 39 additions & 28 deletions proxy/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crossbeam_channel::{Receiver, RecvError};
use dashmap::DashMap;
use itertools::Itertools;
use jito_protos::shredstream::{Entry as PbEntry, TraceShred};
use log::{debug, error, info, warn};
use libc;
use log::{debug, error, info, warn};
use prost::Message;
use socket2::{Domain, Protocol, Socket, Type};
use solana_client::client_error::reqwest;
Expand All @@ -35,15 +35,13 @@ use solana_streamer::{
use tokio::sync::broadcast::Sender;

use crate::{
ShredstreamProxyError,
deshred::{self, ComparableShred, ShredsStateTracker},
prom::{
observe_dedup_time, observe_send_packet_count, observe_send_duration,
observe_recv_interval, observe_recv_packet_count,
inc_packets_received, inc_packets_deduped, inc_packets_forwarded,
inc_packets_forward_failed, inc_packets_by_source,
inc_packets_by_source, inc_packets_deduped, inc_packets_forward_failed,
inc_packets_forwarded, inc_packets_received, observe_dedup_time, observe_recv_interval,
observe_recv_packet_count, observe_send_duration, observe_send_packet_count,
},
resolve_hostname_port,
resolve_hostname_port, ShredstreamProxyError,
};

// values copied from https://github.com/solana-labs/solana/blob/33bde55bbdde13003acf45bb6afe6db4ab599ae4/core/src/sigverify_shreds.rs#L20
Expand Down Expand Up @@ -169,7 +167,6 @@ pub fn start_forwarder_threads(
let reconstruct_tx = reconstruct_tx.clone();
let exit = exit.clone();


let send_thread = Builder::new()
.name(format!("ssPxyTx_{thread_id}"))
.spawn(move || {
Expand Down Expand Up @@ -259,8 +256,7 @@ pub fn start_forwarder_threads(

///
/// Try to create an IPv6 UDP socket bound to the given address.
///
fn try_create_ipv6_socket(addr: SocketAddr) -> Result<UdpSocket, std::io::Error> {
pub fn try_create_ipv6_socket(addr: SocketAddr) -> Result<UdpSocket, std::io::Error> {
let ipv6_socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
ipv6_socket.set_multicast_hops_v6(IP_MULTICAST_TTL)?;
ipv6_socket.bind(&addr.into())?;
Expand All @@ -280,16 +276,14 @@ fn recv_from_channel_and_send_multiple_dest<F>(
reconstruct_tx: &crossbeam_channel::Sender<PacketBatch>,
debug_trace_shred: bool,
metrics: &ShredMetrics,
) -> Result<(), ShredstreamProxyError>
) -> Result<(), ShredstreamProxyError>
where
F: Fn(IpAddr, SocketAddr) -> bool,
{
let packet_batch = maybe_packet_batch.map_err(ShredstreamProxyError::RecvError)?;
let trace_shred_received_time = SystemTime::now();
let batch_len = packet_batch.len() as u64;
metrics
.received
.fetch_add(batch_len, Ordering::Relaxed);
metrics.received.fetch_add(batch_len, Ordering::Relaxed);
inc_packets_received(batch_len);
observe_recv_packet_count(batch_len as f64);
debug!(
Expand All @@ -310,7 +304,9 @@ where
&mut packet_batch_vec,
);
let t_dedup_usecs = t.elapsed().as_micros() as u64;
metrics.dedup_time_spent.fetch_add(t_dedup_usecs, Ordering::Relaxed);
metrics
.dedup_time_spent
.fetch_add(t_dedup_usecs, Ordering::Relaxed);
observe_dedup_time(t_dedup_usecs as f64);
inc_packets_deduped(num_deduped);

Expand All @@ -326,10 +322,12 @@ where
*discarded += is_discarded as u64;
*not_discarded += (!is_discarded) as u64;
})
.or_insert_with(|| {
(is_discarded as u64, (!is_discarded) as u64)
});
let status = if is_discarded { "discarded" } else { "forwarded" };
.or_insert_with(|| (is_discarded as u64, (!is_discarded) as u64));
let status = if is_discarded {
"discarded"
} else {
"forwarded"
};
inc_packets_by_source(&addr.to_string(), status, 1);
});
});
Expand Down Expand Up @@ -358,10 +356,14 @@ where
.fetch_add(packets_with_dest.len() as u64, Ordering::Relaxed);
metrics.send_batch_count.fetch_add(1, Ordering::Relaxed);
const MAX_IOV: usize = libc::UIO_MAXIOV as usize;
let max_iov_count = packets_with_dest.len() / MAX_IOV;
let max_iov_count = packets_with_dest.len() / MAX_IOV;
let unsaturated_iov_count = packets_with_dest.len() % MAX_IOV;
metrics.saturated_iov_count.fetch_add(max_iov_count as u64, Ordering::Relaxed);
metrics.unsaturated_iov_count.fetch_add(unsaturated_iov_count as u64, Ordering::Relaxed);
metrics
.saturated_iov_count
.fetch_add(max_iov_count as u64, Ordering::Relaxed);
metrics
.unsaturated_iov_count
.fetch_add(unsaturated_iov_count as u64, Ordering::Relaxed);
observe_send_packet_count(packets_with_dest.len() as f64);
match batch_send(send_socket, &packets_with_dest) {
Ok(_) => {
Expand All @@ -387,7 +389,9 @@ where
}
}
let t_send_usecs = t.elapsed().as_micros() as u64;
metrics.batch_send_time_spent.fetch_add(t_send_usecs, Ordering::Relaxed);
metrics
.batch_send_time_spent
.fetch_add(t_send_usecs, Ordering::Relaxed);
observe_send_duration(t_send_usecs as f64);
});

Expand Down Expand Up @@ -645,7 +649,11 @@ impl ShredMetrics {

datapoint_info!(
"shredstream_proxy-sendmmsg_iov_metrics",
("max_iov_count", self.saturated_iov_count.load(Ordering::Relaxed), i64),
(
"max_iov_count",
self.saturated_iov_count.load(Ordering::Relaxed),
i64
),
(
"unsaturated_iov_count",
self.unsaturated_iov_count.load(Ordering::Relaxed),
Expand All @@ -654,12 +662,16 @@ impl ShredMetrics {
);

datapoint_info!(
"shredstream_proxy-batch_send_metrics",
"shredstream_proxy-batch_send_metrics",
(
"send_batch_size_sum", self.send_batch_size_sum.load(Ordering::Relaxed), i64
"send_batch_size_sum",
self.send_batch_size_sum.load(Ordering::Relaxed),
i64
),
(
"send_batch_count", self.send_batch_count.load(Ordering::Relaxed), i64
"send_batch_count",
self.send_batch_count.load(Ordering::Relaxed),
i64
)
);

Expand All @@ -677,7 +689,6 @@ impl ShredMetrics {
),
);


if self.enabled_grpc_service {
datapoint_info!(
"shredstream_proxy-service_metrics",
Expand Down
3 changes: 3 additions & 0 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ mod multicast_config;
mod server;
mod token_authenticator;
mod prom;
mod recv_mmsg;
mod mem;
mod triton_forwarder;

#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
Expand Down
Loading
Loading