Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions broker/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-li
# vls-protocol-client = { path = "../../vls/vls-protocol-client" }
# vls-proxy = { path = "../../vls/vls-proxy" }

lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "1abce4dedfc6be8cb261e4faa11d9a753ee323ce" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "1abce4dedfc6be8cb261e4faa11d9a753ee323ce" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "64d5c8aa166c4ff46b0817bc4011f39a3c949de7" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "64d5c8aa166c4ff46b0817bc4011f39a3c949de7" }
# lss-connector = { path = "../../sphinx-rs/lss-connector" }
# sphinx-signer = { path = "../../sphinx-rs/signer" }

Expand Down
40 changes: 26 additions & 14 deletions broker/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use anyhow::Result;
use rocket::tokio::sync::{mpsc, oneshot};
use serde::{Deserialize, Serialize};
use sphinx_signer::sphinx_glyph::types::SignerType;
use std::collections::HashMap;

#[derive(Debug, Serialize, Deserialize)]
pub struct Connections {
pub pubkey: Option<String>,
pub clients: HashMap<String, bool>,
pub clients: HashMap<String, SignerType>,
pub current: Option<String>,
}

Expand All @@ -27,21 +28,16 @@ impl Connections {
pub fn set_current(&mut self, cid: String) {
self.current = Some(cid);
}
fn add_client(&mut self, cid: &str) {
self.clients.insert(cid.to_string(), true);
pub fn add_client(&mut self, cid: &str, signer_type: SignerType) {
self.clients.insert(cid.to_string(), signer_type);
self.current = Some(cid.to_string());
}
fn remove_client(&mut self, cid: &str) {
pub fn remove_client(&mut self, cid: &str) {
self.clients.remove(cid);
if self.current == Some(cid.to_string()) {
self.current = None;
}
}
pub fn client_action(&mut self, cid: &str, connected: bool) {
if connected {
self.add_client(cid);
} else {
self.remove_client(cid);
if let Some(id) = &self.current {
if id == cid {
self.current = None;
}
}
}
}
Expand All @@ -58,6 +54,7 @@ pub struct ChannelRequest {
pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>,
pub cid: Option<String>, // if it exists, only try the one client
pub signer_type: Option<SignerType>, // if it exists, only try clients of these types
}
impl ChannelRequest {
pub fn new(topic: &str, message: Vec<u8>) -> (Self, oneshot::Receiver<ChannelReply>) {
Expand All @@ -67,6 +64,7 @@ impl ChannelRequest {
message,
reply_tx,
cid: None,
signer_type: None,
};
(cr, reply_rx)
}
Expand All @@ -81,6 +79,7 @@ impl ChannelRequest {
message,
reply_tx,
cid: None,
signer_type: None,
};
let _ = sender.send(req).await;
let reply = reply_rx.await?;
Expand All @@ -98,13 +97,14 @@ impl ChannelRequest {
message,
reply_tx,
cid: Some(cid.to_string()),
signer_type: None,
};
let _ = sender.send(req).await;
let reply = reply_rx.await?;
Ok(reply.reply)
}
pub fn for_cid(&mut self, cid: &str) {
self.cid = Some(cid.to_string())
self.cid = Some(cid.to_string());
}
pub fn new_for(
cid: &str,
Expand All @@ -115,6 +115,18 @@ impl ChannelRequest {
cr.for_cid(cid);
(cr, reply_rx)
}
pub fn for_type(&mut self, signer_type: SignerType) {
self.signer_type = Some(signer_type);
}
pub fn new_for_type(
signer_type: SignerType,
topic: &str,
message: Vec<u8>,
) -> (Self, oneshot::Receiver<ChannelReply>) {
let (mut cr, reply_rx) = ChannelRequest::new(topic, message);
cr.for_type(signer_type);
(cr, reply_rx)
}
}

// mpsc reply
Expand Down
39 changes: 31 additions & 8 deletions broker/src/looper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use bitcoin::blockdata::constants::ChainHash;
use log::*;
use rocket::tokio::sync::mpsc;
use secp256k1::PublicKey;
use sphinx_signer::{parser, sphinx_glyph::topics};
use sphinx_signer::{
parser,
sphinx_glyph::{topics, types::SignerType},
};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -123,7 +126,7 @@ impl<C: 'static + Client> SignerLoop<C> {
}
msg => {
let mut catch_init = false;
if let Message::HsmdInit(m) = msg {
if let Message::HsmdInit(ref m) = msg {
catch_init = true;
if let Some(set) = settings {
if ChainHash::using_genesis_block(set.network).as_bytes()
Expand All @@ -135,15 +138,27 @@ impl<C: 'static + Client> SignerLoop<C> {
panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup");
}
}
let reply = self.handle_message(raw_msg, catch_init)?;
let reply = if let Message::PreapproveInvoice(_)
| Message::PreapproveKeysend(_) = msg
{
self.handle_message(raw_msg, catch_init, Some(SignerType::ReceiveSend))?
} else {
// None for signer_type means no restrictions on which signer type to send the message to
self.handle_message(raw_msg, catch_init, None)?
};
// Write the reply to CLN
self.client.write_vec(reply)?;
}
}
}
}

fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> {
fn handle_message(
&mut self,
message: Vec<u8>,
catch_init: bool,
signer_type: Option<SignerType>,
) -> Result<Vec<u8>> {
// wait until not busy
loop {
match try_to_get_busy() {
Expand All @@ -166,15 +181,15 @@ impl<C: 'static + Client> SignerLoop<C> {
)?;
// send to signer
log::info!("SEND ON {}", topics::VLS);
let (res_topic, res) = self.send_request_wait(topics::VLS, md)?;
let (res_topic, res) = self.send_request_wait(topics::VLS, md, signer_type)?;
log::info!("GOT ON {}", res_topic);
let the_res = if res_topic == topics::LSS_RES {
// send reply to LSS to store muts
let lss_reply = self.send_lss(res)?;
log::info!("LSS REPLY LEN {}", &lss_reply.len());
// send to signer for HMAC validation, and get final reply
log::info!("SEND ON {}", topics::LSS_MSG);
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply)?;
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply, None)?;
log::info!("GOT ON {}, send to CLN", res_topic2);
if res_topic2 != topics::VLS_RES {
log::warn!("got a topic NOT on {}", topics::VLS_RES);
Expand Down Expand Up @@ -213,9 +228,17 @@ impl<C: 'static + Client> SignerLoop<C> {

// returns (topic, payload)
// might halt if signer is offline
fn send_request_wait(&mut self, topic: &str, message: Vec<u8>) -> Result<(String, Vec<u8>)> {
fn send_request_wait(
&mut self,
topic: &str,
message: Vec<u8>,
signer_type: Option<SignerType>,
) -> Result<(String, Vec<u8>)> {
// Send a request to the MQTT handler to send to signer
let (request, reply_rx) = ChannelRequest::new(topic, message);
let (request, reply_rx) = match signer_type {
Some(st) => ChannelRequest::new_for_type(st, topic, message),
None => ChannelRequest::new(topic, message),
};
// This can fail if MQTT shuts down
self.chan
.sender
Expand Down
25 changes: 17 additions & 8 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,27 +163,36 @@ pub async fn broker_setup(
let conns_ = conns.clone();
std::thread::spawn(move || {
log::info!("=> waiting first connection...");
while let Ok((cid, connected)) = status_rx.recv() {
while let Ok((cid, connected, signer_type_opt)) = status_rx.recv() {
log::info!("=> connection status: {}: {}", cid, connected);
let mut cs = conns_.lock().unwrap();
// drop it from list until ready
cs.client_action(&cid, false);
cs.remove_client(&cid);
drop(cs);
if connected {
// In mqtt.rs, we always send a signer type if connected == true
let signer_type = signer_type_opt.unwrap();
let (dance_complete_tx, dance_complete_rx) = std_oneshot::channel::<bool>();
let _ = conn_tx.blocking_send((cid.clone(), dance_complete_tx));
let dance_complete = dance_complete_rx.recv().unwrap_or_else(|e| {
log::info!(
log::warn!(
"dance_complete channel died before receiving response: {}",
e
);
false
});
log::info!("adding client to the list? {}", dance_complete);
let mut cs = conns_.lock().unwrap();
cs.client_action(&cid, dance_complete);
log::debug!("List: {:?}, action: {}", cs, dance_complete);
drop(cs);
if dance_complete {
log::info!(
"adding client to the list: {}, type: {:?}",
&cid,
signer_type
);
let mut cs = conns_.lock().unwrap();
cs.add_client(&cid, signer_type);
drop(cs);
} else {
log::warn!("dance failed, client not added to the list");
}
}
}
});
Expand Down
51 changes: 42 additions & 9 deletions broker/src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::util::Settings;
use rocket::tokio::{sync::broadcast, sync::mpsc};
use rumqttd::{local::LinkTx, AuthMsg, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token;
use sphinx_signer::sphinx_glyph::topics;
use sphinx_signer::sphinx_glyph::{topics, types::SignerType};
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand All @@ -15,7 +15,7 @@ pub fn start_broker(
settings: Settings,
mut receiver: mpsc::Receiver<ChannelRequest>,
mut init_receiver: mpsc::Receiver<ChannelRequest>,
status_sender: std::sync::mpsc::Sender<(String, bool)>,
status_sender: std::sync::mpsc::Sender<(String, bool, Option<SignerType>)>,
error_sender: broadcast::Sender<Vec<u8>>,
auth_sender: std::sync::mpsc::Sender<AuthMsg>,
connections: Arc<Mutex<Connections>>,
Expand All @@ -39,18 +39,19 @@ pub fn start_broker(
});

// connected/disconnected status alerts
let (internal_status_tx, internal_status_rx) = std::sync::mpsc::channel::<(bool, String)>();
let (internal_status_tx, internal_status_rx) =
std::sync::mpsc::channel::<(bool, String, Option<SignerType>)>();

// track connections
let link_tx_ = link_tx.clone();
let _conns_task = std::thread::spawn(move || {
while let Ok((is, cid)) = internal_status_rx.recv() {
while let Ok((is, cid, signer_type)) = internal_status_rx.recv() {
if is {
subs(&cid, link_tx_.clone());
} else {
unsubs(&cid, link_tx_.clone());
}
let _ = status_sender.send((cid, is));
let _ = status_sender.send((cid, is, signer_type));
}
});

Expand Down Expand Up @@ -112,9 +113,25 @@ pub fn start_broker(
let topic_end = ts[1].to_string();

if topic.ends_with(topics::HELLO) {
let _ = internal_status_tx.send((true, cid));
let signer_type = match f.publish.payload.get(0) {
Some(byte) => match SignerType::from_byte(*byte) {
Ok(signer_type) => signer_type,
Err(e) => {
log::warn!("Could not deserialize signer type: {}", e);
continue;
}
},
// This is the ReceiveSend signer type
None => SignerType::default(),
};
log::debug!(
"caught hello message for id: {}, type: {:?}",
cid,
signer_type
);
let _ = internal_status_tx.send((true, cid, Some(signer_type)));
} else if topic.ends_with(topics::BYE) {
let _ = internal_status_tx.send((false, cid));
let _ = internal_status_tx.send((false, cid, None));
} else {
// VLS, CONTROL, LSS
let pld = f.publish.payload.to_vec();
Expand Down Expand Up @@ -174,10 +191,25 @@ fn pub_and_wait(
} else {
let current = current.unwrap();
// Try the current connection
let mut rep = pub_timeout(&current, &msg.topic, &msg.message, &msg_rx, link_tx);
// This returns None if 1) signer_type is set, and not equal to the current signer
// 2) If pub_timeout times out
let mut rep = if client_list.get(&current).unwrap()
== msg
.signer_type
.as_ref()
.unwrap_or(client_list.get(&current).unwrap())
{
pub_timeout(&current, &msg.topic, &msg.message, &msg_rx, link_tx)
} else {
None
};

// If that failed, try looking for some other signer
if rep.is_none() {
for cid in client_list.into_keys().filter(|k| k != &current) {
// If signer_type is set, we also filter for only these types
for (cid, _) in client_list.into_iter().filter(|(k, v)| {
k != &current && v == msg.signer_type.as_ref().unwrap_or(v)
}) {
rep = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
if rep.is_some() {
let mut cs = conns_.lock().unwrap();
Expand All @@ -199,6 +231,7 @@ fn pub_and_wait(
break;
} else {
log::debug!("couldn't reach any clients...");
std::thread::sleep(Duration::from_secs(1));
}
if let Some(max) = retries {
log::debug!("counter: {}, retries: {}", counter, max);
Expand Down