diff --git a/Cargo.lock b/Cargo.lock index cdc663f5..31da8d7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3706,10 +3706,32 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "p2panda" +version = "0.5.1" +source = "git+https://github.com/p2panda/p2panda?rev=534363cfc33d2e3275748bf8b0a9307e6eb261c0#534363cfc33d2e3275748bf8b0a9307e6eb261c0" +dependencies = [ + "futures-util", + "hex", + "p2panda-core", + "p2panda-net", + "p2panda-store", + "p2panda-stream", + "p2panda-sync", + "pin-project", + "rand 0.10.0", + "rand_chacha 0.10.0", + "serde", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "p2panda-core" version = "0.5.1" -source = "git+https://github.com/p2panda/p2panda?rev=a1308c116c1bd345f2cd6af7df41d2a6e0a4a682#a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" +source = "git+https://github.com/p2panda/p2panda?rev=534363cfc33d2e3275748bf8b0a9307e6eb261c0#534363cfc33d2e3275748bf8b0a9307e6eb261c0" dependencies = [ "blake3", "ciborium", @@ -3724,12 +3746,13 @@ dependencies = [ [[package]] name = "p2panda-discovery" version = "0.5.1" -source = "git+https://github.com/p2panda/p2panda?rev=a1308c116c1bd345f2cd6af7df41d2a6e0a4a682#a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" +source = "git+https://github.com/p2panda/p2panda?rev=534363cfc33d2e3275748bf8b0a9307e6eb261c0#534363cfc33d2e3275748bf8b0a9307e6eb261c0" dependencies = [ "blake3", "futures-util", - "rand 0.9.2", - "rand_chacha 0.9.0", + "p2panda-core", + "p2panda-store", + "rand 0.10.0", "serde", "thiserror 2.0.18", "tokio", @@ -3738,7 +3761,7 @@ dependencies = [ [[package]] name = "p2panda-net" version = "0.5.1" -source = "git+https://github.com/p2panda/p2panda?rev=a1308c116c1bd345f2cd6af7df41d2a6e0a4a682#a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" +source = "git+https://github.com/p2panda/p2panda?rev=534363cfc33d2e3275748bf8b0a9307e6eb261c0#534363cfc33d2e3275748bf8b0a9307e6eb261c0" dependencies = [ "ciborium", "futures-channel", @@ -3752,8 +3775,8 @@ dependencies = [ "p2panda-store", "p2panda-sync", "ractor", - "rand 0.9.2", - "rand_chacha 0.9.0", + "rand 0.10.0", + "rand_chacha 0.10.0", "serde", "thiserror 2.0.18", "tokio", @@ -3765,35 +3788,32 @@ dependencies = [ [[package]] name = "p2panda-store" version = "0.5.1" -source = "git+https://github.com/p2panda/p2panda?rev=a1308c116c1bd345f2cd6af7df41d2a6e0a4a682#a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" +source = "git+https://github.com/p2panda/p2panda?rev=534363cfc33d2e3275748bf8b0a9307e6eb261c0#534363cfc33d2e3275748bf8b0a9307e6eb261c0" dependencies = [ - "ciborium", - "hex", "p2panda-core", + "serde", "sqlx", "thiserror 2.0.18", - "trait-variant", + "tokio", ] [[package]] name = "p2panda-stream" version = "0.5.1" -source = "git+https://github.com/p2panda/p2panda?rev=a1308c116c1bd345f2cd6af7df41d2a6e0a4a682#a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" +source = "git+https://github.com/p2panda/p2panda?rev=534363cfc33d2e3275748bf8b0a9307e6eb261c0#534363cfc33d2e3275748bf8b0a9307e6eb261c0" dependencies = [ - "ciborium", - "futures-channel", - "futures-util", + "futures-core", "p2panda-core", "p2panda-store", "pin-project", - "pin-utils", "thiserror 2.0.18", + "tokio", ] [[package]] name = "p2panda-sync" version = "0.5.1" -source = "git+https://github.com/p2panda/p2panda?rev=a1308c116c1bd345f2cd6af7df41d2a6e0a4a682#a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" +source = "git+https://github.com/p2panda/p2panda?rev=534363cfc33d2e3275748bf8b0a9307e6eb261c0#534363cfc33d2e3275748bf8b0a9307e6eb261c0" dependencies = [ "futures", "futures-util", @@ -4467,6 +4487,7 @@ dependencies = [ "hex", "indexmap", "loro", + "p2panda-core", "rand 0.10.0", "reflection-node", "serde", @@ -4480,17 +4501,10 @@ name = "reflection-node" version = "0.2.0" dependencies = [ "chrono", - "ciborium", - "hex", + "p2panda", "p2panda-core", - "p2panda-discovery", - "p2panda-net", "p2panda-store", - "p2panda-stream", - "p2panda-sync", - "rand_chacha 0.10.0", "serde", - "serde_bytes", "sqlx", "test-log", "thiserror 2.0.18", @@ -5838,17 +5852,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "trait-variant" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70977707304198400eb4835a78f6a9f928bf41bba420deb8fdb175cd965d77a7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.114", -] - [[package]] name = "try-lock" version = "0.2.5" diff --git a/reflection-doc/Cargo.toml b/reflection-doc/Cargo.toml index 48cd592f..5c496588 100644 --- a/reflection-doc/Cargo.toml +++ b/reflection-doc/Cargo.toml @@ -14,6 +14,7 @@ glib = "0.21" hex = "0.4.3" indexmap = "2.13.0" loro = { tag = "loro-crdt@1.10.6", git = "https://github.com/loro-dev/loro.git" } +p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "534363cfc33d2e3275748bf8b0a9307e6eb261c0" } rand = "0.10.0" reflection-node = { path = "../reflection-node" } serde = { version = "1.0.228", features = ["derive"] } diff --git a/reflection-doc/src/document.rs b/reflection-doc/src/document.rs index c9bf999a..2d880a25 100644 --- a/reflection-doc/src/document.rs +++ b/reflection-doc/src/document.rs @@ -9,11 +9,9 @@ use glib::{Properties, clone}; pub use hex::FromHexError; use loro::{ExportMode, LoroDoc, LoroText, event::Diff}; use p2panda_core::cbor::{decode_cbor, encode_cbor}; -use reflection_node::p2panda_core; -use reflection_node::topic::{ - SubscribableTopic, Subscription as TopicSubscription, - SubscriptionError as TopicSubscriptionError, -}; +use p2panda_core::{self, Topic}; +use reflection_node::subscription::Subscription as TopicSubscription; +use reflection_node::traits::{SubscribableTopic, SubscriptionError as TopicSubscriptionError}; use tracing::error; use crate::author::Author; @@ -37,6 +35,18 @@ impl From<[u8; 32]> for DocumentId { } } +impl From for DocumentId { + fn from(value: Topic) -> Self { + Self(value.to_bytes()) + } +} + +impl From for Topic { + fn from(value: DocumentId) -> Self { + Topic::from(value.0) + } +} + impl fmt::Display for DocumentId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.to_hex()) @@ -298,7 +308,7 @@ mod imp { #[weak] subscription, async move { - if let Err(error) = subscription.send_ephemeral(cursor_bytes).await { + if let Err(error) = subscription.publish_ephemeral(cursor_bytes).await { error!("Failed to send cursor position: {}", error); } } @@ -495,7 +505,7 @@ mod imp { subscription, async move { // Broadcast a "text delta" to all peers - if let Err(error) = subscription.send_delta(delta_bytes).await { + if let Err(error) = subscription.publish_delta(delta_bytes).await { error!( "Failed to send delta of document to the network: {}", error @@ -828,7 +838,7 @@ impl Document { .export(ExportMode::Snapshot) .expect("encoded crdt snapshot"); - if let Err(error) = subscription.send_snapshot(snapshot_bytes).await { + if let Err(error) = subscription.publish_snapshot(snapshot_bytes).await { error!( "Failed to send snapshot of document to the network: {}", error @@ -872,7 +882,7 @@ impl Document { .expect("crdt_doc to be set") .export(ExportMode::Snapshot) .expect("encoded crdt snapshot"); - if let Err(error) = subscription.send_snapshot(snapshot_bytes).await { + if let Err(error) = subscription.publish_snapshot(snapshot_bytes).await { error!( "Failed to send snapshot of document to the network: {}", error diff --git a/reflection-doc/src/documents.rs b/reflection-doc/src/documents.rs index d9dc6096..03e17bbc 100644 --- a/reflection-doc/src/documents.rs +++ b/reflection-doc/src/documents.rs @@ -70,7 +70,7 @@ impl Documents { pub(crate) async fn load(&self, service: &Service) -> Result<(), StartupError> { let public_key = service.private_key().public_key(); - let documents = service.node().topics::().await?; + let documents = service.node().topics().await?; let mut list = self.imp().list.write().unwrap(); assert!(list.is_empty()); @@ -102,14 +102,14 @@ impl Documents { let obj = Document::with_state( service, - Some(&document.id), + Some(&document.id.into()), document.name.as_deref(), last_accessed.as_ref(), ); obj.authors().load(authors); - list.insert(document.id, obj); + list.insert(document.id.into(), obj); } drop(list); diff --git a/reflection-doc/src/lib.rs b/reflection-doc/src/lib.rs index 8fabd3ac..c0e9db71 100644 --- a/reflection-doc/src/lib.rs +++ b/reflection-doc/src/lib.rs @@ -5,11 +5,11 @@ pub mod documents; pub mod service; pub mod identity { + use std::fmt; use std::hash::Hash; - use reflection_node::p2panda_core; - pub use reflection_node::p2panda_core::identity::IdentityError; - use std::fmt; + use p2panda_core; + pub use p2panda_core::identity::IdentityError; #[derive(Clone, Debug, glib::Boxed)] #[boxed_type(name = "ReflectionPrivateKey", nullable)] diff --git a/reflection-doc/src/service.rs b/reflection-doc/src/service.rs index 6b737f3b..3970dd46 100644 --- a/reflection-doc/src/service.rs +++ b/reflection-doc/src/service.rs @@ -1,29 +1,26 @@ +use std::sync::{Mutex, OnceLock}; + use gio::prelude::{FileExt, ListModelExtManual, NetworkMonitorExt}; use glib::object::ObjectExt; use glib::subclass::prelude::*; use glib::{Properties, clone}; -use reflection_node::p2panda_core::Hash; -use std::sync::{Mutex, OnceLock}; +use p2panda_core::Hash; +use reflection_node::node; +use reflection_node::node::{Node, NodeError}; +use reflection_node::subscription::SubscriptionError; use thiserror::Error; use tracing::error; +use crate::document::{Document, DocumentId}; +use crate::documents::Documents; use crate::identity::PrivateKey; -use crate::{ - document::{Document, DocumentId}, - documents::Documents, -}; -use reflection_node::{ - node, - node::{Node, NodeError}, - topic::TopicError, -}; #[derive(Error, Debug)] pub enum StartupError { #[error(transparent)] Node(#[from] NodeError), #[error(transparent)] - Topic(#[from] TopicError), + Topic(#[from] SubscriptionError), } #[derive(Debug, Copy, Clone, PartialEq, Eq, glib::Enum, Default)] diff --git a/reflection-node/Cargo.toml b/reflection-node/Cargo.toml index ee817e73..0ab0ebff 100644 --- a/reflection-node/Cargo.toml +++ b/reflection-node/Cargo.toml @@ -12,21 +12,16 @@ authors = [ test_utils = [] [dependencies] -thiserror = "2.0.18" chrono = "0.4.43" -ciborium = "0.2.2" -p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" } -p2panda-discovery = { git = "https://github.com/p2panda/p2panda", rev = "a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" } -p2panda-net = { git = "https://github.com/p2panda/p2panda", rev = "a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" } -p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "a1308c116c1bd345f2cd6af7df41d2a6e0a4a682", features = ["sqlite"], default-features = false } -p2panda-stream = { git = "https://github.com/p2panda/p2panda", rev = "a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" } -p2panda-sync = { git = "https://github.com/p2panda/p2panda", rev = "a1308c116c1bd345f2cd6af7df41d2a6e0a4a682" } +p2panda = { git = "https://github.com/p2panda/p2panda", rev = "534363cfc33d2e3275748bf8b0a9307e6eb261c0" } +p2panda-core = { git = "https://github.com/p2panda/p2panda", rev = "534363cfc33d2e3275748bf8b0a9307e6eb261c0" } +p2panda-store = { git = "https://github.com/p2panda/p2panda", rev = "534363cfc33d2e3275748bf8b0a9307e6eb261c0" } serde = { version = "1.0.228", features = ["derive"] } -serde_bytes = "0.11.19" sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite", "chrono"], default-features = false } +thiserror = "2.0.18" tokio = { version = "1.49.0", features = ["rt", "sync"] } tokio-stream = "0.1.18" tracing = "0.1" + +[dev-dependencies] test-log = { version = "0.2.19", default-features = false, features = ["trace", "color"] } -hex = "0.4.3" -rand_chacha = "0.10.0" diff --git a/reflection-node/migrations/20250418140035_create_tables.sql b/reflection-node/migrations/20250418140035_create_tables.sql index 186eb4c8..1f3ba8e1 100644 --- a/reflection-node/migrations/20250418140035_create_tables.sql +++ b/reflection-node/migrations/20250418140035_create_tables.sql @@ -10,4 +10,4 @@ CREATE TABLE IF NOT EXISTS topics ( id TEXT NOT NULL PRIMARY KEY, name TEXT, last_accessed INTEGER -); \ No newline at end of file +); diff --git a/reflection-node/src/author_tracker.rs b/reflection-node/src/author_tracker.rs index f665f2b1..8d5b86d3 100644 --- a/reflection-node/src/author_tracker.rs +++ b/reflection-node/src/author_tracker.rs @@ -1,53 +1,44 @@ use std::collections::HashMap; use std::ops::DerefMut; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, Instant}; -use crate::ephemerial_operation::EphemerialOperation; -use crate::node_inner::MessageType; -use crate::node_inner::NodeInner; -use crate::topic::SubscribableTopic; use chrono::Utc; -use p2panda_core::cbor::{DecodeError, decode_cbor, encode_cbor}; -use p2panda_core::{PrivateKey, PublicKey}; -use p2panda_net::gossip::GossipHandle; +use p2panda::streams::EphemeralStreamPublisher; +use p2panda_core::PublicKey; +use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, RwLock}; -use tracing::error; +use tracing::{error, warn}; + +use crate::message::EphemeralMessage; +use crate::node::NodeInner; +use crate::traits::SubscribableTopic; const OFFLINE_TIMEOUT: Duration = Duration::from_secs(60); -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub enum AuthorMessage { +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AuthorTrackerMessage { Hello, Ping, Bye, } -impl std::fmt::Display for AuthorMessage { +impl std::fmt::Display for AuthorTrackerMessage { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - AuthorMessage::Hello => write!(f, "Hello message"), - AuthorMessage::Ping => write!(f, "Ping message"), - AuthorMessage::Bye => write!(f, "Bye message"), + AuthorTrackerMessage::Hello => write!(f, "Hello message"), + AuthorTrackerMessage::Ping => write!(f, "Ping message"), + AuthorTrackerMessage::Bye => write!(f, "Bye message"), } } } -impl TryFrom<&[u8]> for AuthorMessage { - type Error = DecodeError; - - fn try_from(value: &[u8]) -> Result { - let (res, _): (AuthorMessage, SystemTime) = decode_cbor(value)?; - - Ok(res) - } -} - pub struct AuthorTracker { last_ping: Mutex>, subscribable_topic: Arc, node: Arc, - tx: RwLock>, + tx: RwLock>>, } impl AuthorTracker { @@ -60,21 +51,23 @@ impl AuthorTracker { }) } - pub async fn set_topic_tx(&self, tx: Option) { + pub async fn set_topic_tx(&self, tx: Option>) { let mut tx_guard = self.tx.write().await; + // Send good bye message to the network if let Some(tx) = tx_guard.as_ref() { - send_message(&self.node.private_key, tx, AuthorMessage::Bye).await; + send_message(tx, AuthorTrackerMessage::Bye).await; } - // Set all authors that the tracker has seen to offline, authors the tracker hasn't seen are already offline + // Set all authors that the tracker has seen to offline, authors the tracker hasn't seen + // are already offline let old_authors = std::mem::take(self.last_ping.lock().await.deref_mut()); for author in old_authors.into_keys() { self.subscribable_topic.author_left(author); self.set_last_seen(author).await; } - let this_author = self.node.private_key.public_key(); + let this_author = self.node.public_key; if tx.is_some() { self.subscribable_topic.author_joined(this_author); } else { @@ -85,23 +78,23 @@ impl AuthorTracker { *tx_guard = tx; } - pub async fn received(&self, message: AuthorMessage, author: PublicKey) { + pub async fn received(&self, author: PublicKey, message: AuthorTrackerMessage) { match message { - AuthorMessage::Hello => { + AuthorTrackerMessage::Hello => { self.join(author).await; } - AuthorMessage::Ping => { + AuthorTrackerMessage::Ping => { self.ping(author).await; } - AuthorMessage::Bye => { + AuthorTrackerMessage::Bye => { self.left(author).await; } } } - async fn send(&self, message: AuthorMessage) { + async fn send(&self, message: AuthorTrackerMessage) { if let Some(tx) = self.tx.read().await.as_ref() { - send_message(&self.node.private_key, tx, message).await; + send_message(tx, message).await; } } @@ -112,7 +105,7 @@ impl AuthorTracker { // Send a ping to the network to ensure that the new author knows we exist // Normally we send a ping every `OFFLINE_TIMEOUT / 2` - self.send(AuthorMessage::Ping).await; + self.send(AuthorTrackerMessage::Ping).await; } async fn ping(&self, author: PublicKey) { @@ -133,7 +126,7 @@ impl AuthorTracker { pub async fn spawn(&self) { // Send a hello to the network so other authors know we joined the topic - self.send(AuthorMessage::Hello).await; + self.send(AuthorTrackerMessage::Hello).await; let mut interval = tokio::time::interval(OFFLINE_TIMEOUT / 2); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -144,7 +137,7 @@ impl AuthorTracker { interval.tick().await; // Send a ping to the network so that we won't be marked as offline - self.send(AuthorMessage::Ping).await; + self.send(AuthorTrackerMessage::Ping).await; let mut expired = Vec::new(); self.last_ping.lock().await.retain(|author, instant| { if instant.elapsed() > OFFLINE_TIMEOUT { @@ -174,25 +167,11 @@ impl AuthorTracker { } } -async fn send_message(private_key: &PrivateKey, tx: &GossipHandle, message: AuthorMessage) { - // FIXME: We need to add the current time to the message, - // because iroh doesn't broadcast twice the same message message. - let author_message = match encode_cbor(&(&message, SystemTime::now())) { - Ok(result) => result, - Err(error) => { - error!("Failed to encode {message} as CBOR: {error}"); - return; - } - }; - let operation = EphemerialOperation::new(author_message, private_key); - let bytes = match encode_cbor(&MessageType::AuthorEphemeral(operation)) { - Ok(result) => result, - Err(error) => { - error!("Failed to encode {message} as CBOR: {error}"); - return; - } - }; - if let Err(error) = tx.publish(bytes).await { - error!("Failed to sent {message} to the network: {error}"); +async fn send_message( + tx: &EphemeralStreamPublisher, + message: AuthorTrackerMessage, +) { + if let Err(err) = tx.publish(EphemeralMessage::AuthorTracker(message)).await { + warn!("error occurred when sending ephemeral message: {err}"); } } diff --git a/reflection-node/src/database.rs b/reflection-node/src/database.rs new file mode 100644 index 00000000..ef55390a --- /dev/null +++ b/reflection-node/src/database.rs @@ -0,0 +1,73 @@ +use std::path::PathBuf; +use std::pin::Pin; + +use p2panda_store::sqlite::migrations as p2panda_migrations; +use sqlx::error::BoxDynError; +use sqlx::migrate::{Migration, MigrationSource, Migrator}; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; +use tracing::info; + +/// Establishes a SQLite connection pool. +/// +/// If no database path is given the database is created in memory. +pub async fn database_pool(db_file: Option) -> Result { + let connection_options = SqliteConnectOptions::new() + .shared_cache(true) + .create_if_missing(true); + + let pool = if let Some(db_file) = db_file { + info!("database file location: {db_file:?}"); + let connection_options = connection_options.filename(db_file); + SqlitePool::connect_with(connection_options).await? + } else { + let connection_options = connection_options.in_memory(true); + // FIXME: we need to set max connection to 1 for in memory sqlite DB. Probably has to + // do something with this issue: https://github.com/launchbadge/sqlx/issues/2510 + let pool_options = SqlitePoolOptions::new().max_connections(1); + pool_options.connect_with(connection_options).await? + }; + + Ok(pool) +} + +/// Run migration for p2panda and for the our TopicStore. +pub async fn run_migrations(pool: &SqlitePool) -> Result<(), sqlx::migrate::MigrateError> { + Migrator::new(CombinedMigrationSource::new(vec![ + p2panda_migrations(), + sqlx::migrate!(), + ])) + .await? + .run(pool) + .await?; + + Ok(()) +} + +type BoxFuture<'a, T> = Pin + Send + 'a>>; + +/// Combine multiple `sqlx::migrate::Migrator` into a single `sqlx::migrate::MigrationSource` +/// +/// See for more details: https://github.com/launchbadge/sqlx/discussions/3407 +#[derive(Debug)] +pub struct CombinedMigrationSource { + migrators: Vec, +} + +impl CombinedMigrationSource { + pub fn new(migrators: Vec) -> CombinedMigrationSource { + Self { migrators } + } +} + +impl<'s> MigrationSource<'s> for CombinedMigrationSource { + fn resolve(self) -> BoxFuture<'s, Result, BoxDynError>> { + Box::pin(async move { + Ok(self + .migrators + .iter() + .flat_map(|migrator| migrator.iter()) + .cloned() + .collect()) + }) + } +} diff --git a/reflection-node/src/ephemerial_operation.rs b/reflection-node/src/ephemerial_operation.rs deleted file mode 100644 index 1fd6aa3f..00000000 --- a/reflection-node/src/ephemerial_operation.rs +++ /dev/null @@ -1,34 +0,0 @@ -use p2panda_core::identity::{PrivateKey, PublicKey, Signature}; - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct EphemerialOperation { - #[serde(with = "serde_bytes")] - body: Vec, - author: PublicKey, - signature: Signature, -} - -impl EphemerialOperation { - pub fn new(body: Vec, author: &PrivateKey) -> Self { - Self { - signature: author.sign(&body), - body, - author: author.public_key(), - } - } - - /// Validates the signature and unpacks the operation - pub fn validate_and_unpack(self) -> Option<(PublicKey, Vec)> { - let EphemerialOperation { - body, - author, - signature, - } = self; - - if self.author.verify(&body[..], &signature) { - Some((author, body)) - } else { - None - } - } -} diff --git a/reflection-node/src/lib.rs b/reflection-node/src/lib.rs index d1cddf8a..ff05f7d5 100644 --- a/reflection-node/src/lib.rs +++ b/reflection-node/src/lib.rs @@ -1,30 +1,20 @@ mod author_tracker; -mod ephemerial_operation; -mod network; +mod database; +mod message; pub mod node; -mod node_inner; -mod operation; -mod operation_store; -mod subscription_inner; -pub mod topic; +pub mod subscription; mod topic_store; -mod utils; - -pub use p2panda_core; -pub use topic::SubscribableTopic; +pub mod traits; #[cfg(test)] mod tests { use std::sync::Arc; - use p2panda_core::Hash; - use p2panda_core::PrivateKey; - use p2panda_core::PublicKey; + use p2panda_core::{Hash, PrivateKey, PublicKey, Topic}; use tokio::sync::{Mutex, mpsc}; - use crate::node::ConnectionMode; - use crate::node::Node; - use crate::topic::SubscribableTopic; + use crate::node::{ConnectionMode, Node}; + use crate::traits::{SubscribableTopic, SubscriptionError}; #[tokio::test] #[test_log::test] @@ -35,10 +25,10 @@ mod tests { let id: [u8; 32] = [0; 32]; let _sub = node.subscribe(id, TestTopic::new()).await; - let topics = node.topics::<[u8; 32]>().await.unwrap(); + let topics = node.topics().await.unwrap(); assert_eq!(topics.len(), 1); - assert_eq!(topics.first().unwrap().id, id); + assert_eq!(topics.first().unwrap().id, id.into()); node.shutdown().await.unwrap(); } @@ -71,31 +61,29 @@ mod tests { fn author_joined(&self, _author: PublicKey) {} fn author_left(&self, _author: PublicKey) {} fn ephemeral_bytes_received(&self, _author: PublicKey, _data: Vec) {} - fn error(&self, _error: crate::topic::SubscriptionError) {} + fn error(&self, _error: SubscriptionError) {} } #[tokio::test] #[test_log::test] async fn subscribe_topic() { - let private_key = PrivateKey::new(); let network_id = Hash::new(b"reflection"); - let node = Node::new(private_key, network_id, None).await.unwrap(); + let topic_id: Topic = [1; 32].into(); + + let node = Node::new(PrivateKey::new(), network_id, None) + .await + .unwrap(); node.set_connection_mode(ConnectionMode::Network) .await .unwrap(); let test_topic = TestTopic::new(); - let id: [u8; 32] = [0; 32]; - let subscription = node.subscribe(id, test_topic).await.unwrap(); + let subscription = node.subscribe(topic_id, test_topic).await.unwrap(); - let topics = node.topics::<[u8; 32]>().await.unwrap(); - assert_eq!(topics.len(), 1); - assert_eq!(topics.first().unwrap().id, id); - - let private_key2 = PrivateKey::new(); - let network_id2 = Hash::new(b"reflection"); - let node2 = Node::new(private_key2, network_id2, None).await.unwrap(); + let node2 = Node::new(PrivateKey::new(), network_id, None) + .await + .unwrap(); node2 .set_connection_mode(ConnectionMode::Network) .await @@ -103,15 +91,17 @@ mod tests { let test_topic2 = TestTopic::new(); - let _subscription2 = node2.subscribe(id, test_topic2.clone()).await.unwrap(); + let _subscription2 = node2 + .subscribe(topic_id, test_topic2.clone()) + .await + .unwrap(); - let topics2 = node2.topics::<[u8; 32]>().await.unwrap(); - assert_eq!(topics2.len(), 1); - assert_eq!(topics2.first().unwrap().id, id); + // TODO: Need to sleep here to make sure tx already exists. + tokio::time::sleep(std::time::Duration::from_secs(3)).await; let test_snapshot = "test".as_bytes().to_vec(); subscription - .send_snapshot(test_snapshot.clone()) + .publish_snapshot(test_snapshot.clone()) .await .unwrap(); diff --git a/reflection-node/src/message.rs b/reflection-node/src/message.rs new file mode 100644 index 00000000..ed32603d --- /dev/null +++ b/reflection-node/src/message.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; + +use crate::author_tracker::AuthorTrackerMessage; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "t", content = "c")] +pub(crate) enum EphemeralMessage { + /// Custom message to be forwarded to the application-layer. + #[serde(rename = "app")] + Application(Vec), + + /// Message used to track online status of authors. + #[serde(rename = "author_tracker")] + AuthorTracker(AuthorTrackerMessage), +} diff --git a/reflection-node/src/network.rs b/reflection-node/src/network.rs deleted file mode 100644 index c9aee3f2..00000000 --- a/reflection-node/src/network.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::sync::LazyLock; - -use p2panda_net::Discovery; -use p2panda_net::discovery::DiscoveryError; -use thiserror::Error; -use tracing::error; - -use p2panda_core::Hash; -use p2panda_core::PrivateKey; -use p2panda_net::address_book::{AddressBook, AddressBookError}; -use p2panda_net::addrs::NodeInfo; -use p2panda_net::gossip::{Gossip, GossipError}; -use p2panda_net::iroh_endpoint::{Endpoint, EndpointAddr, EndpointError, RelayUrl}; -use p2panda_net::iroh_mdns::{MdnsDiscovery, MdnsDiscoveryError, MdnsDiscoveryMode}; - -use crate::operation::ReflectionExtensions; -use crate::operation_store::OperationStore; -use crate::topic_store::{LogId, TopicStore}; - -static RELAY_URL: LazyLock = LazyLock::new(|| { - "https://euc1-1.relay.n0.iroh-canary.iroh.link" - .parse() - .expect("valid relay URL") -}); - -static BOOTSTRAP_NODE: LazyLock = LazyLock::new(|| { - let endpoint_addr = EndpointAddr::new( - "9f63a15ab95959a992af96bf72fbc3e7dc98eeb4799f788bb07b20125053e795" - .parse() - .expect("valid bootstrap node id"), - ) - .with_relay_url(RELAY_URL.clone()); - NodeInfo::from(endpoint_addr).bootstrap() -}); - -pub type LogSync = p2panda_net::sync::LogSync< - p2panda_store::SqliteStore, - LogId, - ReflectionExtensions, - TopicStore, ->; -pub type LogSyncError = p2panda_net::sync::LogSyncError; - -#[derive(Error, Debug)] -pub enum NetworkError { - #[error(transparent)] - Gossip(#[from] GossipError), - #[error(transparent)] - LogSync(#[from] LogSyncError), - #[error(transparent)] - AddressBook(#[from] AddressBookError), - #[error(transparent)] - MdnsDiscovery(#[from] MdnsDiscoveryError), - #[error(transparent)] - Discovery(#[from] DiscoveryError), - #[error(transparent)] - Endpoint(#[from] EndpointError), -} - -#[allow(dead_code)] -pub struct Network { - pub(crate) mdns_discovery: MdnsDiscovery, - pub(crate) discovery: Discovery, - pub(crate) gossip: Gossip, - pub(crate) log_sync: LogSync, - pub(crate) endpoint: Endpoint, -} - -// FIXME: Endpoint, LogSync, MdnsDiscovery, and Gossip should implement debug -impl std::fmt::Debug for Network { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Network").finish() - } -} - -impl Network { - pub async fn new( - private_key: &PrivateKey, - network_id: &Hash, - topic_store: &TopicStore, - operation_store: &OperationStore, - ) -> Result { - let address_book = AddressBook::builder().spawn().await?; - - if cfg!(not(any(test, feature = "test_utils"))) - && let Err(error) = address_book.insert_node_info(BOOTSTRAP_NODE.clone()).await - { - error!("Failed to add bootstrap node to the address book: {error}"); - } - - let mut builder = Endpoint::builder(address_book.clone()) - .network_id(network_id.into()) - .private_key(private_key.clone()); - - if cfg!(not(any(test, feature = "test_utils"))) { - builder = builder.relay_url(RELAY_URL.clone()); - } - - let endpoint = builder.spawn().await?; - - let mdns_discovery = MdnsDiscovery::builder(address_book.clone(), endpoint.clone()) - .mode(MdnsDiscoveryMode::Active) - .spawn() - .await?; - - let discovery = Discovery::builder(address_book.clone(), endpoint.clone()) - .spawn() - .await?; - - let gossip = Gossip::builder(address_book.clone(), endpoint.clone()) - .spawn() - .await?; - - let log_sync = LogSync::builder( - operation_store.clone_inner(), - topic_store.clone(), - endpoint.clone(), - gossip.clone(), - ) - .spawn() - .await?; - - Ok(Network { - mdns_discovery, - discovery, - gossip, - log_sync, - endpoint, - }) - } -} diff --git a/reflection-node/src/node.rs b/reflection-node/src/node.rs index d1b27025..8c35f1b2 100644 --- a/reflection-node/src/node.rs +++ b/reflection-node/src/node.rs @@ -1,30 +1,47 @@ -use std::path::Path; -use std::sync::Arc; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, LazyLock}; use chrono::{DateTime, Utc}; -use p2panda_core::{Hash, PrivateKey}; -use p2panda_net::TopicId; +use p2panda::node::{NetworkId, RelayUrl, SpawnError}; +use p2panda_core::{PrivateKey, PublicKey}; use thiserror::Error; +use tokio::sync::{Notify, RwLock}; use tracing::info; -use crate::network::NetworkError; -use crate::node_inner::NodeInner; -use crate::topic::{SubscribableTopic, Subscription, TopicError}; +use crate::database::{database_pool, run_migrations}; +use crate::subscription::{Subscription, SubscriptionError, SubscriptionInner}; pub use crate::topic_store::Author; -use crate::topic_store::StoreTopic; +use crate::topic_store::{StoreTopic, TopicStore}; +use crate::traits::SubscribableTopic; + +static RELAY_URL: LazyLock = LazyLock::new(|| { + "https://euc1-1.relay.n0.iroh-canary.iroh.link" + .parse() + .expect("valid relay URL") +}); + +static BOOTSTRAP_NODE_ID: LazyLock = LazyLock::new(|| { + "9f63a15ab95959a992af96bf72fbc3e7dc98eeb4799f788bb07b20125053e795" + .parse() + .expect("valid bootstrap node id") +}); #[derive(Debug, Error)] pub enum NodeError { #[error(transparent)] RuntimeStartup(#[from] std::io::Error), + #[error(transparent)] RuntimeSpawn(#[from] tokio::task::JoinError), + #[error(transparent)] Datebase(#[from] sqlx::Error), + #[error(transparent)] DatebaseMigration(#[from] sqlx::migrate::MigrateError), + #[error(transparent)] - Network(#[from] NetworkError), + NodeSpawn(#[from] SpawnError), } #[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] @@ -36,8 +53,8 @@ pub enum ConnectionMode { } #[derive(Clone, Debug)] -pub struct Topic { - pub id: ID, +pub struct Topic { + pub id: p2panda_core::Topic, pub name: Option, pub last_accessed: Option>, pub authors: Vec, @@ -69,7 +86,7 @@ pub struct Node { impl Node { pub async fn new( private_key: PrivateKey, - network_id: Hash, + network_id: impl Into, db_location: Option<&Path>, ) -> Result { let runtime = if let Ok(handle) = tokio::runtime::Handle::try_current() { @@ -82,10 +99,14 @@ impl Node { ) }; - let db_file = db_location.map(|location| location.join("database.sqlite")); - let inner = runtime - .spawn(async move { NodeInner::new(network_id, private_key, db_file).await }) - .await??; + let inner = { + let network_id = network_id.into(); + let db_file = db_location.map(|location| location.join("database.sqlite")); + + runtime + .spawn(NodeInner::new(private_key, network_id, db_file)) + .await?? + }; Ok(Self { inner: Arc::new(inner), @@ -97,30 +118,30 @@ impl Node { &self, connection_mode: ConnectionMode, ) -> Result<(), NodeError> { - let inner_clone = self.inner.clone(); + let inner = self.inner.clone(); self.runtime - .spawn(async move { inner_clone.set_connection_mode(connection_mode).await }) + .spawn(async move { inner.set_connection_mode(connection_mode).await }) .await??; Ok(()) } pub async fn shutdown(&self) -> Result<(), NodeError> { - let inner_clone = self.inner.clone(); + let inner = self.inner.clone(); self.runtime .spawn(async move { - inner_clone.shutdown().await; + inner.shutdown().await; }) .await?; Ok(()) } - pub async fn topics>(&self) -> Result>, TopicError> { - let inner_clone = self.inner.clone(); + pub async fn topics(&self) -> Result, SubscriptionError> { + let inner = self.inner.clone(); let topics = self .runtime - .spawn(async move { inner_clone.topic_store.topics().await }) + .spawn(async move { inner.topic_store.topics().await }) .await??; let topics = topics @@ -133,7 +154,7 @@ impl Node { authors, } = topic; Topic { - id: id.into(), + id, name, last_accessed, authors, @@ -144,30 +165,121 @@ impl Node { Ok(topics) } - pub async fn subscribe, T: SubscribableTopic + 'static>( + pub async fn subscribe( &self, - id: ID, - topic_handle: T, - ) -> Result, TopicError> { - let id: TopicId = id.into(); - let topic_handle = Arc::new(topic_handle); - let inner_clone = self.inner.clone(); + id: impl Into, + subscribable_topic: T, + ) -> Result, SubscriptionError> + where + T: SubscribableTopic + 'static, + { + let id = id.into(); + let subscribable_topic = Arc::new(subscribable_topic); + let inner = self.inner.clone(); let inner_subscription = self .runtime - .spawn(async move { inner_clone.subscribe(id, topic_handle).await }) + .spawn(async move { inner.subscribe(id, subscribable_topic).await }) .await??; let subscription = Subscription::new(self.runtime.clone(), inner_subscription).await; - info!("Subscribed to topic {}", hex::encode(id)); + info!(%id, "subscribed to topic"); Ok(subscription) } - pub async fn delete_topic>(&self, id: ID) -> Result<(), TopicError> { - let id: TopicId = id.into(); - let inner_clone = self.inner.clone(); + pub async fn delete_topic( + &self, + id: impl Into, + ) -> Result<(), SubscriptionError> { + let id = id.into(); + let inner = self.inner.clone(); self.runtime - .spawn(async move { inner_clone.delete_topic(id).await }) + .spawn(async move { inner.delete_topic(id).await }) .await? } } + +#[derive(Debug)] +pub(crate) struct NodeInner { + pub(crate) network: RwLock, + pub(crate) shutdown_notifier: Notify, + pub(crate) topic_store: TopicStore, + pub(crate) public_key: PublicKey, +} + +impl NodeInner { + pub async fn new( + private_key: PrivateKey, + network_id: impl Into, + db_file: Option, + ) -> Result { + let public_key = private_key.public_key(); + + let pool = database_pool(db_file).await?; + run_migrations(&pool).await?; + + let topic_store = TopicStore::from_pool(pool.clone()); + + let mut builder = p2panda::Node::builder() + .network_id(network_id.into()) + .private_key(private_key) + .database_pool(pool); + + // Don't connect to any servers during testing. + if cfg!(not(test)) { + builder = builder + .bootstrap(*BOOTSTRAP_NODE_ID, RELAY_URL.clone()) + .relay_url(RELAY_URL.clone()); + } + + let node = builder.spawn().await?; + + Ok(Self { + network: RwLock::new(node), + shutdown_notifier: Notify::new(), + topic_store, + public_key, + }) + } + + pub async fn set_connection_mode( + &self, + _connection_mode: ConnectionMode, + ) -> Result<(), NodeError> { + // TODO: This is a no-op currently and requires work in `p2panda-net` upstream. + // See related issue: https://github.com/p2panda/p2panda/issues/1093 + Ok(()) + } + + pub async fn shutdown(&self) { + // Wake up all subscriptions that may still exist. + self.shutdown_notifier.notify_waiters(); + } + + pub async fn subscribe( + self: Arc, + id: impl Into, + subscribable_topic: Arc, + ) -> Result, SubscriptionError> + where + T: SubscribableTopic + 'static, + { + let id = id.into(); + + self.topic_store.add_topic(&id).await?; + + // Add ourselves as an author to the topic store. + self.topic_store.add_author(&id, &self.public_key).await?; + + Ok(SubscriptionInner::new(self.clone(), id, subscribable_topic)) + } + + pub async fn delete_topic( + self: Arc, + id: impl Into, + ) -> Result<(), SubscriptionError> { + let id = id.into(); + self.topic_store.delete_topic(&id).await?; + Ok(()) + } +} diff --git a/reflection-node/src/node_inner.rs b/reflection-node/src/node_inner.rs deleted file mode 100644 index ba951f82..00000000 --- a/reflection-node/src/node_inner.rs +++ /dev/null @@ -1,148 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; - -use crate::ephemerial_operation::EphemerialOperation; -use crate::network::{Network, NetworkError}; -use crate::node::{ConnectionMode, NodeError}; -use crate::operation_store::OperationStore; -use crate::subscription_inner::SubscriptionInner; -use crate::topic::{SubscribableTopic, TopicError}; -use crate::topic_store::TopicStore; -use crate::utils::CombinedMigrationSource; - -use p2panda_core::{Hash, PrivateKey}; -use p2panda_net::TopicId; -use p2panda_store::sqlite::store::migrations as operation_store_migrations; -use sqlx::{migrate::Migrator, sqlite}; -use tokio::sync::{Notify, RwLock}; -use tracing::info; - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub(crate) enum MessageType { - Ephemeral(EphemerialOperation), - AuthorEphemeral(EphemerialOperation), -} - -#[derive(Debug)] -pub struct NodeInner { - pub(crate) operation_store: OperationStore, - pub(crate) topic_store: TopicStore, - pub(crate) private_key: PrivateKey, - pub(crate) network_id: Hash, - pub(crate) network: RwLock>, - pub(crate) network_notifier: Notify, -} - -impl NodeInner { - pub async fn new( - network_id: Hash, - private_key: PrivateKey, - db_file: Option, - ) -> Result { - let connection_options = sqlx::sqlite::SqliteConnectOptions::new() - .shared_cache(true) - .create_if_missing(true); - let pool = if let Some(db_file) = db_file { - info!("Database file location: {db_file:?}"); - let connection_options = connection_options.filename(db_file); - sqlx::sqlite::SqlitePool::connect_with(connection_options).await? - } else { - let connection_options = connection_options.in_memory(true); - // FIXME: we need to set max connection to 1 for in memory sqlite DB. - // Probably has to do something with this issue: https://github.com/launchbadge/sqlx/issues/2510 - let pool_options = sqlite::SqlitePoolOptions::new().max_connections(1); - pool_options.connect_with(connection_options).await? - }; - - // Run migration for p2panda OperationStore and for the our TopicStore - Migrator::new(CombinedMigrationSource::new(vec![ - operation_store_migrations(), - sqlx::migrate!(), - ])) - .await? - .run(&pool) - .await?; - - let operation_store = OperationStore::new(pool.clone()); - let topic_store = TopicStore::new(pool); - - Ok(Self { - operation_store, - topic_store, - private_key, - network_id, - network: RwLock::new(None), - network_notifier: Notify::new(), - }) - } - - pub async fn set_connection_mode( - &self, - connection_mode: ConnectionMode, - ) -> Result<(), NetworkError> { - // Subscriptions will tear down the network subscription and drop the read lock, - // so that we can acquire the write lock and then shutdown the network. - self.network_notifier.notify_waiters(); - - let mut network_guard = self.network.write().await; - - match connection_mode { - ConnectionMode::None => { - *network_guard = None; - } - ConnectionMode::Bluetooth => { - unimplemented!("Bluetooth is currently not implemented") - } - ConnectionMode::Network => { - let network = Network::new( - &self.private_key, - &self.network_id, - &self.topic_store, - &self.operation_store, - ) - .await?; - - *network_guard = Some(network); - } - } - - Ok(()) - } - - pub async fn shutdown(&self) { - // Wake up all subscriptions that may still exist - self.network_notifier.notify_waiters(); - self.network.write().await.take(); - } - - pub async fn subscribe( - self: Arc, - id: TopicId, - subscribable_topic: Arc, - ) -> Result, TopicError> { - self.topic_store.add_topic(&id).await?; - // Add ourselves as an author to the topic store. - self.topic_store - .add_author(&id, &self.private_key.public_key()) - .await?; - let stored_operations = self - .topic_store - .operations_for_topic(&self.operation_store, &id) - .await?; - - for operation in stored_operations { - // Send all stored operation bytes to the app, - // it doesn't matter if the app already knows some or all of them - if let Some(body) = operation.body { - subscribable_topic.bytes_received(operation.header.public_key, body.to_bytes()); - } - } - - Ok(SubscriptionInner::new(self.clone(), id, subscribable_topic)) - } - - pub async fn delete_topic(self: Arc, id: TopicId) -> Result<(), TopicError> { - self.topic_store.delete_topic(&id).await?; - Ok(()) - } -} diff --git a/reflection-node/src/operation.rs b/reflection-node/src/operation.rs deleted file mode 100644 index beb5c99b..00000000 --- a/reflection-node/src/operation.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::hash::Hash as StdHash; - -use p2panda_core::{Extension, Header, PruneFlag}; -use p2panda_net::TopicId; -use serde::{Deserialize, Serialize}; - -use crate::topic_store::LogId; - -/// Custom extensions for p2panda header. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct ReflectionExtensions { - /// If flag is true we can remove all previous operations in this log. - /// - /// This usually indicates that a "snapshot" has been inserted into the body of this operation, - /// containing all required state to reconstruct the full version including all previous edits - /// of this topic. - /// - /// In our case of a text-editor, this would be the encoded payload of a state-based CRDT. - #[serde( - rename = "p", - skip_serializing_if = "PruneFlag::is_not_set", - default = "PruneFlag::default" - )] - pub prune_flag: PruneFlag, - - /// Operations can be organised in separate logs. With a "log id" we can declare where this - /// operation belongs to. - /// - /// We organise two logs per author per topic, one for "short lived" / ephemeral deltas - /// (small text changes) and one for persisted snapshots (full topic history). These are two - /// distinct "log types". - #[serde(rename = "t")] - pub log_type: LogType, - - /// Identifier of the topic this operation relates to. - #[serde(rename = "d")] - pub topic: TopicId, -} - -#[derive(Copy, Clone, Default, Debug, PartialEq, Eq, StdHash, Serialize, Deserialize)] -pub enum LogType { - Snapshot, - #[default] - Delta, -} - -impl Extension for ReflectionExtensions { - fn extract(header: &Header) -> Option { - Some(header.extensions.prune_flag.clone()) - } -} - -impl Extension for ReflectionExtensions { - fn extract(header: &Header) -> Option { - Some(header.extensions.log_type) - } -} - -impl Extension for ReflectionExtensions { - fn extract(header: &Header) -> Option { - Some(header.extensions.topic) - } -} - -impl Extension for ReflectionExtensions { - fn extract(header: &Header) -> Option { - let log_type: LogType = header.extension()?; - let id: TopicId = header.extension()?; - - Some(LogId::new(log_type, &id)) - } -} diff --git a/reflection-node/src/operation_store.rs b/reflection-node/src/operation_store.rs deleted file mode 100644 index 44b72191..00000000 --- a/reflection-node/src/operation_store.rs +++ /dev/null @@ -1,129 +0,0 @@ -use std::sync::Arc; -use std::time::{SystemTime, SystemTimeError}; - -use crate::topic_store::LogId; -use p2panda_core::{Body, Header, Operation, PrivateKey, PruneFlag}; -use p2panda_net::TopicId; -use p2panda_store::{ - LogStore, OperationStore as TraitOperationStore, SqliteStore, SqliteStoreError, -}; -use thiserror::Error; -use tokio::sync::Semaphore; - -use crate::operation::{LogType, ReflectionExtensions}; - -#[derive(Debug, Error)] -pub enum CreationError { - #[error(transparent)] - SytemTime(#[from] SystemTimeError), - #[error(transparent)] - Store(#[from] SqliteStoreError), -} - -#[derive(Debug)] -pub struct OperationStore { - inner: SqliteStore, - // FIXME: This makes sure we only create one operation at the time and not in parallel - // Since we would mess up the sequence of operations - semaphore_operation_store: Arc, -} - -impl OperationStore { - pub fn new(pool: sqlx::SqlitePool) -> Self { - Self { - inner: SqliteStore::new(pool), - semaphore_operation_store: Arc::new(Semaphore::new(1)), - } - } - - pub fn clone_inner(&self) -> SqliteStore { - self.inner.clone() - } - - pub fn inner(&self) -> &SqliteStore { - &self.inner - } - - /// Creates, signs and stores new operation in the author's append-only log. - /// - /// If no topic is specified we create a new operation in a new log. The resulting hash of the - /// header can be used to identify that new topic. - pub async fn create_operation( - &self, - private_key: &PrivateKey, - log_type: LogType, - topic: TopicId, - body: Option<&[u8]>, - prune_flag: bool, - ) -> Result, CreationError> { - let _permit = self - .semaphore_operation_store - .acquire() - .await - .expect("OperationStore semaphore not to be closed"); - - let body = body.map(Body::new); - let public_key = private_key.public_key(); - - let log_id = LogId::new(log_type, &topic); - let latest_operation = self.inner.latest_operation(&public_key, &log_id).await?; - - let (seq_num, backlink) = match latest_operation { - Some((header, _)) => (header.seq_num + 1, Some(header.hash())), - None => (0, None), - }; - - let timestamp = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH)? - .as_secs(); - - let extensions = ReflectionExtensions { - prune_flag: PruneFlag::new(prune_flag), - log_type, - topic, - }; - - let mut header = Header { - version: 1, - public_key, - signature: None, - payload_size: body.as_ref().map_or(0, |body| body.size()), - payload_hash: body.as_ref().map(|body| body.hash()), - timestamp, - seq_num, - backlink, - previous: vec![], - extensions, - }; - header.sign(private_key); - - let operation = Operation { - hash: header.hash(), - header, - body, - }; - - let mut inner_clone = self.clone_inner(); - inner_clone - .insert_operation( - operation.hash, - &operation.header, - operation.body.as_ref(), - operation.header.to_bytes().as_slice(), - &log_id, - ) - .await?; - - if prune_flag { - inner_clone - .delete_operations( - &operation.header.public_key, - &log_id, - operation.header.seq_num, - ) - .await?; - } - - Ok(operation) - } -} diff --git a/reflection-node/src/persistent_operation.rs b/reflection-node/src/persistent_operation.rs deleted file mode 100644 index dd90a215..00000000 --- a/reflection-node/src/persistent_operation.rs +++ /dev/null @@ -1,64 +0,0 @@ -use p2panda_core::{ - Body, Header, Operation, - cbor::{DecodeError, decode_cbor}, -}; -use thiserror::Error; - -use crate::topic::TopicId; -use crate::operation::ReflectionExtensions; - -type OperationWithRawHeader = (Header, Option, Vec); - -#[derive(Debug, Error)] -pub enum UnpackError { - #[error(transparent)] - Cbor(#[from] DecodeError), - #[error("Operation with invalid topic id")] - InvalidTopicId, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct PersistentOperation { - #[serde(with = "serde_bytes")] - header: Vec, - body: Option, -} - -impl PersistentOperation { - pub fn new(operation: Operation) -> Self { - Self { - header: operation.header.to_bytes(), - body: operation.body.map(|body| body.to_bytes().into()), - } - } - - pub fn from_serialized(header: Vec, body: Option>) -> Self { - Self { - header, - body: body.map(Into::into), - } - } - - /// Validates and unpacks the operation - pub fn validate_and_unpack( - self, - id: TopicId, - ) -> Result { - let PersistentOperation { header, body } = self; - - // The header is serialized by Header::to_bytes() as cbor - let header_deserialized: Header = decode_cbor(&header[..])?; - let body_deserialized = body.map(|body| Body::from(body.into_vec())); - - let Some(operation_id): Option = header_deserialized.extension() - else { - return Err(UnpackError::InvalidTopicId); - }; - - if operation_id != id { - return Err(UnpackError::InvalidTopicId); - } - - Ok((header_deserialized, body_deserialized, header)) - } -} diff --git a/reflection-node/src/subscription.rs b/reflection-node/src/subscription.rs new file mode 100644 index 00000000..2f5a7d29 --- /dev/null +++ b/reflection-node/src/subscription.rs @@ -0,0 +1,394 @@ +use std::mem::take; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +use chrono::Utc; +use p2panda::node::CreateStreamError; +use p2panda::streams::{EphemeralStreamPublisher, Offset, StreamEvent, StreamPublisher}; +use p2panda_core::Topic; +use thiserror::Error; +use tokio::sync::{RwLock, oneshot}; +use tokio::task::{AbortHandle, JoinError}; +use tokio_stream::StreamExt; +use tracing::{error, info}; + +use crate::author_tracker::AuthorTracker; +use crate::message::EphemeralMessage; +use crate::node::NodeInner; +use crate::traits::SubscribableTopic; + +#[derive(Debug, Error)] +pub enum SubscriptionError { + #[error(transparent)] + Runtime(#[from] JoinError), + + #[error(transparent)] + TopicStore(#[from] sqlx::Error), + + #[error(transparent)] + StreamPublish(#[from] p2panda::streams::PublishError), + + #[error(transparent)] + EphemeralStreamPublish(#[from] p2panda::streams::EphemeralPublishError), + + #[error("streams to publish data into network are not available due to a setup error")] + BrokenStream, +} + +pub struct Subscription { + inner: Arc>, + runtime: tokio::runtime::Handle, + network_monitor_task: AbortHandle, +} + +impl Drop for Subscription { + fn drop(&mut self) { + self.network_monitor_task.abort(); + } +} + +impl Subscription +where + T: SubscribableTopic + 'static, +{ + pub(crate) async fn new(runtime: tokio::runtime::Handle, inner: SubscriptionInner) -> Self { + let (ready_tx, ready_rx) = oneshot::channel(); + + // Spawn task to establish streams to publish and subscribe to messages, the same task will + // also await a shutdown signal to drop the streams. + let inner = Arc::new(inner); + let inner_clone = inner.clone(); + let network_monitor_task = runtime + .spawn(async move { + inner_clone.spawn_network_monitor(ready_tx).await; + }) + .abort_handle(); + + // Wait until streams with network have been established. + let _ = ready_rx.await; + + Subscription { + inner, + runtime, + network_monitor_task, + } + } + + pub async fn publish_delta(&self, data: Vec) -> Result<(), SubscriptionError> { + let inner = self.inner.clone(); + self.runtime + .spawn(async move { inner.publish_delta(data).await }) + .await? + } + + pub async fn publish_snapshot(&self, data: Vec) -> Result<(), SubscriptionError> { + let inner = self.inner.clone(); + self.runtime + .spawn(async move { inner.publish_snapshot(data).await }) + .await? + } + + pub async fn publish_ephemeral(&self, data: Vec) -> Result<(), SubscriptionError> { + let inner = self.inner.clone(); + self.runtime + .spawn(async move { inner.publish_ephemeral(data).await }) + .await? + } + + pub async fn unsubscribe(self) -> Result<(), SubscriptionError> { + self.network_monitor_task.abort(); + + let inner = self.inner.clone(); + self.runtime + .spawn(async move { inner.unsubscribe().await }) + .await??; + + info!("unsubscribed from topic {}", self.inner.id); + + Ok(()) + } + + /// Set the name for a given topic. + /// + /// This information will be written to the database. + pub async fn set_name(&self, name: Option) -> Result<(), SubscriptionError> { + let inner = self.inner.clone(); + self.runtime + .spawn(async move { inner.set_name(name).await }) + .await? + } +} + +pub(crate) struct SubscriptionInner { + tx: RwLock>>>, + ephemeral_tx: RwLock>>, + node: Arc, + id: Topic, + subscribable_topic: Arc, + author_tracker: Arc>, + abort_handles: RwLock>, +} + +impl Drop for SubscriptionInner { + fn drop(&mut self) { + for handle in self.abort_handles.get_mut() { + handle.abort(); + } + } +} + +impl SubscriptionInner +where + T: SubscribableTopic + 'static, +{ + pub fn new(node: Arc, id: Topic, subscribable_topic: Arc) -> Self { + let author_tracker = AuthorTracker::new(node.clone(), subscribable_topic.clone()); + + SubscriptionInner { + tx: RwLock::new(None), + ephemeral_tx: RwLock::new(None), + node, + id, + abort_handles: RwLock::new(Vec::new()), + subscribable_topic, + author_tracker, + } + } + + pub async fn spawn_network_monitor(&self, ready_signal: oneshot::Sender<()>) { + // Hold a read lock to the network, so that the network won't be dropped or shutdown. + let network_guard = self.node.network.read().await; + + let result = setup_streams( + &self.node, + network_guard.deref(), + self.id, + &self.subscribable_topic, + &self.author_tracker, + ) + .await; + + match result { + Ok((tx, ephemeral_tx, abort_handles)) => { + *self.tx.write().await = Some(tx); + *self.ephemeral_tx.write().await = Some(ephemeral_tx); + *self.abort_handles.write().await = abort_handles; + } + Err(error) => { + self.subscribable_topic.error(error.into()); + } + } + + drop(network_guard); + + // Inform caller that we're done with setting up the streams. They are ready now to be used + // for publishing and receiving messages. + let _ = ready_signal.send(()); + + // Wait until we've received signal from node to shut down. + let shutdown_notification = self.node.shutdown_notifier.notified(); + shutdown_notification.await; + + let _ = self.unsubscribe().await; + } + + pub async fn unsubscribe(&self) -> Result<(), SubscriptionError> { + let mut tx_guard = self.tx.write().await; + let mut ephemeral_tx_guard = self.ephemeral_tx.write().await; + let mut abort_handles_guard = self.abort_handles.write().await; + + let tx = take(tx_guard.deref_mut()); + let ephemeral_tx = take(ephemeral_tx_guard.deref_mut()); + let abort_handles = take(abort_handles_guard.deref_mut()); + + self.node + .topic_store + .set_last_accessed_for_topic(&self.id, Some(Utc::now())) + .await?; + + teardown_streams( + &self.id, + &self.author_tracker, + tx, + ephemeral_tx, + abort_handles, + ) + .await; + + Ok(()) + } + + pub async fn publish_delta(&self, data: Vec) -> Result<(), SubscriptionError> { + if let Some(tx) = self.tx.read().await.as_ref() { + info!("delta operation sent for topic with id {}", self.id); + tx.publish(data).await?; + } else { + return Err(SubscriptionError::BrokenStream); + } + + Ok(()) + } + + pub async fn publish_snapshot(&self, data: Vec) -> Result<(), SubscriptionError> { + if let Some(tx) = self.tx.read().await.as_ref() { + info!("snapshot saved for topic with id {}", self.id); + + // Append an operation to our log and set the prune flag to true. This will remove + // previous entries. + tx.prune(Some(data)).await?; + } else { + return Err(SubscriptionError::BrokenStream); + } + + Ok(()) + } + + pub async fn publish_ephemeral(&self, data: Vec) -> Result<(), SubscriptionError> { + if let Some(ephemeral_tx) = self.ephemeral_tx.read().await.as_ref() { + ephemeral_tx + .publish(EphemeralMessage::Application(data)) + .await?; + } else { + return Err(SubscriptionError::BrokenStream); + } + + Ok(()) + } + + pub async fn set_name(&self, name: Option) -> Result<(), SubscriptionError> { + self.node + .topic_store + .set_name_for_topic(&self.id, name) + .await?; + + Ok(()) + } +} + +async fn setup_streams( + node: &Arc, + network: &p2panda::Node, + id: Topic, + subscribable_topic: &Arc, + author_tracker: &Arc>, +) -> Result< + ( + StreamPublisher>, + EphemeralStreamPublisher, + Vec, + ), + CreateStreamError, +> +where + T: SubscribableTopic + 'static, +{ + let mut abort_handles = Vec::with_capacity(3); + + // 1. Handle incoming operations from eventually consistent topic stream. + + // Always start from re-playing _all_ operations in the beginning. This is due to Reflection + // not keeping materialised document state around and we need to repeat materialising the + // document at the beginning (in memory). This cost is acceptable since we're frequently + // pruning the log and the number of operations to process is rather small. + let offset = Offset::Start; + + let (topic_tx, mut topic_rx) = network.stream_from::>(id, offset).await?; + + let node_clone = node.clone(); + let subscribable_topic_clone = subscribable_topic.clone(); + let abort_handle = tokio::spawn(async move { + while let Some(event) = topic_rx.next().await { + match event { + StreamEvent::Processed(operation) => { + let author = operation.author(); + + // When we discover a new author we need to add them to our topic store. + if let Err(error) = node_clone.topic_store.add_author(&id, &author).await { + error!("can't store author to database: {error}"); + } + + // Forward the message payload up to the app layer. + subscribable_topic_clone.bytes_received(author, operation.message().to_owned()); + } + StreamEvent::DecodingFailed { error, .. } => { + error!("failed decoding incoming operation from stream: {error}"); + } + StreamEvent::ReplayFailed { error, .. } => { + error!("error occurred while replaying operation stream: {error}"); + } + StreamEvent::SyncStarted { .. } | StreamEvent::SyncEnded { .. } => { + // TODO: Handle sync events. + } + } + } + }) + .abort_handle(); + + abort_handles.push(abort_handle); + + // 2. Handle incoming messages from ephemeral topic stream. + + let (ephemeral_tx, mut ephemeral_rx) = network.ephemeral_stream::(id).await?; + + author_tracker + .set_topic_tx(Some(ephemeral_tx.clone())) + .await; + + let author_tracker_clone = author_tracker.clone(); + let subscribable_topic_clone = subscribable_topic.clone(); + let abort_handle = tokio::spawn(async move { + while let Some(message) = ephemeral_rx.next().await { + match message.body() { + EphemeralMessage::Application(bytes) => { + subscribable_topic_clone + .ephemeral_bytes_received(message.author(), bytes.to_owned()); + } + EphemeralMessage::AuthorTracker(tracker) => { + author_tracker_clone + .received(message.author(), tracker.to_owned()) + .await; + } + } + } + }) + .abort_handle(); + + abort_handles.push(abort_handle); + + // 3. Run task to track online status of authors. + + let author_tracker_clone = author_tracker.clone(); + let abort_handle = tokio::spawn(async move { + author_tracker_clone.spawn().await; + }) + .abort_handle(); + + abort_handles.push(abort_handle); + + info!("network streams set up for topic {}", id); + + Ok((topic_tx, ephemeral_tx, abort_handles)) +} + +async fn teardown_streams( + id: &Topic, + author_tracker: &Arc>, + tx: Option>>, + ephemeral_tx: Option>, + abort_handles: Vec, +) where + T: SubscribableTopic + 'static, +{ + for handle in abort_handles { + handle.abort(); + } + + author_tracker.set_topic_tx(None).await; + + if tx.is_some() { + info!("network streams torn down for topic {}", id); + } + + drop(tx); + drop(ephemeral_tx); +} diff --git a/reflection-node/src/subscription_inner.rs b/reflection-node/src/subscription_inner.rs deleted file mode 100644 index 931f2c7a..00000000 --- a/reflection-node/src/subscription_inner.rs +++ /dev/null @@ -1,469 +0,0 @@ -use std::mem::take; -use std::ops::{Deref, DerefMut, Drop}; -use std::sync::Arc; - -use chrono::Utc; -use p2panda_core::Operation; -use p2panda_core::{ - Body, Header, - cbor::{decode_cbor, encode_cbor}, -}; -use p2panda_net::{TopicId, gossip::GossipHandle}; -use p2panda_stream::IngestExt; -use p2panda_sync::protocols::TopicLogSyncEvent as Event; -use tokio::{ - sync::{RwLock, mpsc}, - task::{AbortHandle, spawn}, -}; -use tokio_stream::{StreamExt, wrappers::ReceiverStream}; -use tracing::{error, info, warn}; - -use crate::author_tracker::{AuthorMessage, AuthorTracker}; -use crate::ephemerial_operation::EphemerialOperation; -use crate::network::Network; -use crate::node_inner::MessageType; -use crate::node_inner::NodeInner; -use crate::operation::{LogType, ReflectionExtensions}; -use crate::topic::{SubscribableTopic, SubscriptionError, TopicError}; - -pub type SyncHandle = - p2panda_net::sync::SyncHandle, Event>; - -pub struct SubscriptionInner { - ephemeral_tx: RwLock>, - tx: RwLock>, - pub(crate) node: Arc, - pub(crate) id: TopicId, - pub(crate) subscribable_topic: Arc, - author_tracker: Arc>, - abort_handles: RwLock>, -} - -impl Drop for SubscriptionInner { - fn drop(&mut self) { - for handle in self.abort_handles.get_mut() { - handle.abort(); - } - } -} - -impl SubscriptionInner { - pub fn new(node: Arc, id: TopicId, subscribable_topic: Arc) -> Self { - let author_tracker = AuthorTracker::new(node.clone(), subscribable_topic.clone()); - SubscriptionInner { - tx: RwLock::new(None), - ephemeral_tx: RwLock::new(None), - node, - id, - abort_handles: RwLock::new(Vec::new()), - subscribable_topic, - author_tracker, - } - } - - pub async fn spawn_network_monitor(&self) { - // We need to hold a read lock to the network, so that the network won't be dropped - // or shutdown. - let mut notify = Some(self.node.network_notifier.notified()); - let mut network_guard = Some(self.node.network.read().await); - - let (tx, ephemeral_tx, abort_handles) = - if let Some(network) = network_guard.as_ref().unwrap().deref() { - match setup_network( - &self.node, - network, - self.id, - &self.subscribable_topic, - &self.author_tracker, - ) - .await - { - Ok((sync_handle, gossip_handle, abort_handles)) => { - (Some(sync_handle), Some(gossip_handle), abort_handles) - } - Err(error) => { - self.subscribable_topic.error(error); - (None, None, Vec::new()) - } - } - } else { - (None, None, Vec::new()) - }; - - *self.tx.write().await = tx; - *self.ephemeral_tx.write().await = ephemeral_tx; - *self.abort_handles.write().await = abort_handles; - - loop { - if let Some(notify) = notify { - notify.await; - } - - let mut abort_handles_guard = self.abort_handles.write().await; - let mut tx_guard = self.tx.write().await; - let mut ephemeral_tx_guard = self.ephemeral_tx.write().await; - - let old_tx = take(tx_guard.deref_mut()); - let old_ephemeral_tx = take(ephemeral_tx_guard.deref_mut()); - let old_abort_handles = take(abort_handles_guard.deref_mut()); - - teardown_network( - &self.id, - &self.author_tracker, - old_tx, - old_ephemeral_tx, - old_abort_handles, - ) - .await; - // Release network lock and get a new one, so that the network can be change between them - network_guard.take(); - notify = Some(self.node.network_notifier.notified()); - network_guard = Some(self.node.network.read().await); - - let (tx, ephemeral_tx, abort_handles) = - if let Some(network) = network_guard.as_ref().unwrap().deref() { - match setup_network( - &self.node, - network, - self.id, - &self.subscribable_topic, - &self.author_tracker, - ) - .await - { - Ok((sync_handle, gossip_handle, abort_handles)) => { - (Some(sync_handle), Some(gossip_handle), abort_handles) - } - Err(error) => { - self.subscribable_topic.error(error); - (None, None, Vec::new()) - } - } - } else { - (None, None, Vec::new()) - }; - - *tx_guard = tx; - *ephemeral_tx_guard = ephemeral_tx; - *abort_handles_guard = abort_handles; - } - } - - pub async fn unsubscribe(&self) -> Result<(), TopicError> { - let mut tx_guard = self.tx.write().await; - let mut ephemeral_tx_guard = self.ephemeral_tx.write().await; - let mut abort_handles_guard = self.abort_handles.write().await; - - let tx = take(tx_guard.deref_mut()); - let ephemeral_tx = take(ephemeral_tx_guard.deref_mut()); - let abort_handles = take(abort_handles_guard.deref_mut()); - - self.node - .topic_store - .set_last_accessed_for_topic(&self.id, Some(Utc::now())) - .await?; - - teardown_network( - &self.id, - &self.author_tracker, - tx, - ephemeral_tx, - abort_handles, - ) - .await; - - Ok(()) - } - - pub async fn send_delta(&self, data: Vec) -> Result<(), TopicError> { - let operation = - // Append one operation to our "ephemeral" delta log. - self.node.operation_store - .create_operation( - &self.node.private_key, - LogType::Delta, - self.id, - Some(&data), - false, - ) - .await?; - - info!( - "Delta operation sent for topic with id {}", - hex::encode(self.id) - ); - - if let Some(tx) = self.tx.read().await.as_ref() { - tx.publish(operation).await?; - } - - Ok(()) - } - - pub async fn send_snapshot(&self, data: Vec) -> Result<(), TopicError> { - // Append an operation to our "snapshot" log and set the prune flag to - // true. This will remove previous snapshots. - // - // Snapshots are not broadcasted on the gossip overlay as they would be - // too large. Peers will sync them up when they join the topic. - self.node - .operation_store - .create_operation( - &self.node.private_key, - LogType::Snapshot, - self.id, - Some(&data), - true, - ) - .await?; - - // Append an operation to our "ephemeral" delta log and set the prune - // flag to true. - // - // This signals removing all previous "delta" operations now. This is - // some sort of garbage collection whenever we snapshot. Snapshots - // already contain all history, there is no need to keep duplicate - // "delta" data around. - let operation = self - .node - .operation_store - .create_operation(&self.node.private_key, LogType::Delta, self.id, None, true) - .await?; - - info!("Snapshot saved for topic with id {}", hex::encode(self.id)); - - if let Some(tx) = self.tx.read().await.as_ref() { - tx.publish(operation).await?; - } - - Ok(()) - } - - pub async fn send_ephemeral(&self, data: Vec) -> Result<(), TopicError> { - if let Some(ephemeral_tx) = self.ephemeral_tx.read().await.as_ref() { - let operation = EphemerialOperation::new(data, &self.node.private_key); - let bytes = encode_cbor(&MessageType::Ephemeral(operation))?; - ephemeral_tx.publish(bytes).await?; - } - - Ok(()) - } - - /// Set the name for a given topic - /// - /// This information will be written to the database - pub async fn set_name(&self, name: Option) -> Result<(), TopicError> { - self.node - .topic_store - .set_name_for_topic(&self.id, name) - .await?; - - Ok(()) - } -} - -async fn setup_network( - node: &Arc, - network: &Network, - id: TopicId, - subscribable_topic: &Arc, - author_tracker: &Arc>, -) -> Result<(SyncHandle, GossipHandle, Vec), SubscriptionError> { - let mut abort_handles = Vec::with_capacity(3); - - let stream = network.log_sync.stream(id, true).await?; - let mut topic_rx = stream.subscribe().await?; - let topic_tx = stream; - - let (persistent_tx, persistent_rx) = - mpsc::channel::<(Header, Option, Vec)>(128); - - let abort_handle = spawn(async move { - while let Some(event) = topic_rx.next().await { - let event = match event { - Ok(event) => event, - Err(error) => { - error!("Error while receiving sync message: {error}"); - continue; - } - }; - match event.event() { - Event::Operation(operation) => { - match validate_and_unpack(operation.as_ref().to_owned(), id) { - Ok(data) => { - persistent_tx.send(data).await.unwrap(); - } - Err(err) => { - error!("Failed to unpack operation: {err}"); - } - } - } - _ => { - // TODO: Handle sync events - } - } - } - }) - .abort_handle(); - - abort_handles.push(abort_handle); - - let ephemeral_stream = network.gossip.stream(id).await?; - let mut ephemeral_rx = ephemeral_stream.subscribe(); - let ephemeral_tx = ephemeral_stream; - - author_tracker.set_topic_tx(Some(ephemeral_tx)).await; - - let author_tracker_clone = author_tracker.clone(); - let subscribable_topic_clone = subscribable_topic.clone(); - let abort_handle = spawn(async move { - while let Some(bytes) = ephemeral_rx.next().await { - let bytes = match bytes { - Ok(bytes) => bytes, - Err(error) => { - error!("Error while receiving ephemeral message: {error}"); - continue; - } - }; - match decode_cbor(&bytes[..]) { - Ok(MessageType::Ephemeral(operation)) => { - if let Some((author, body)) = operation.validate_and_unpack() { - subscribable_topic_clone.ephemeral_bytes_received(author, body); - } else { - warn!("Got ephemeral operation with a bad signature"); - } - } - Ok(MessageType::AuthorEphemeral(operation)) => { - if let Some((author, body)) = operation.validate_and_unpack() { - match AuthorMessage::try_from(&body[..]) { - Ok(message) => { - author_tracker_clone.received(message, author).await; - } - Err(error) => { - warn!("Failed to deserialize AuthorMessage: {error}"); - } - } - } else { - warn!("Got internal ephemeral operation with a bad signature"); - } - } - Err(err) => { - error!("Failed to decode gossip message: {err}"); - } - } - } - }) - .abort_handle(); - - abort_handles.push(abort_handle); - - let stream = ReceiverStream::new(persistent_rx); - - // Ingest does multiple things for us: - // - // - Validate operation- and log integrity and authenticity - // - De-duplicate already known operations - // - Out-of-order buffering - // - Pruning when flag is set - // - Persist operation in store - let mut stream = stream - // NOTE(adz): The persisting part should happen later, we want to check the payload on - // application layer first. In general "ingest" does too much at once and is - // inflexible. Related issue: https://github.com/p2panda/p2panda/issues/696 - .ingest(node.operation_store.clone_inner(), 128) - .filter_map(|result| match result { - Ok(operation) => Some(operation), - Err(err) => { - error!("ingesting operation failed: {err}"); - None - } - }); - - let node = node.clone(); - let subscribable_topic_clone = subscribable_topic.clone(); - // Send checked and ingested operations for this topic to application layer. - let abort_handle = spawn(async move { - while let Some(operation) = stream.next().await { - // When we discover a new author we need to add them to our topic store. - if let Err(error) = node - .topic_store - .add_author(&id, &operation.header.public_key) - .await - { - error!("Can't store author to database: {error}"); - } - - // Forward the payload up to the app. - if let Some(body) = operation.body { - subscribable_topic_clone - .bytes_received(operation.header.public_key, body.to_bytes()); - } - } - }) - .abort_handle(); - - abort_handles.push(abort_handle); - let author_tracker_clone = author_tracker.clone(); - let abort_handle = spawn(async move { - author_tracker_clone.spawn().await; - }) - .abort_handle(); - - abort_handles.push(abort_handle); - - info!("Network subscription set up for topic {}", hex::encode(id)); - - let ephemeral_tx = network.gossip.stream(id).await?; - - Ok((topic_tx, ephemeral_tx, abort_handles)) -} - -async fn teardown_network( - id: &TopicId, - author_tracker: &Arc>, - tx: Option, - ephemeral_tx: Option, - abort_handles: Vec, -) { - for handle in abort_handles { - handle.abort(); - } - - author_tracker.set_topic_tx(None).await; - - if tx.is_some() { - info!( - "Network subscription torn down for topic {}", - hex::encode(id) - ); - } - drop(tx); - drop(ephemeral_tx); -} - -type OperationWithRawHeader = (Header, Option, Vec); - -#[derive(Debug, thiserror::Error)] -pub enum UnpackError { - #[error(transparent)] - Cbor(#[from] p2panda_core::cbor::DecodeError), - #[error("Operation with invalid topic id")] - InvalidTopicId, -} - -fn validate_and_unpack( - operation: p2panda_core::Operation, - id: TopicId, -) -> Result { - let p2panda_core::Operation:: { header, body, .. } = operation; - - let Some(operation_id): Option = header.extension() else { - return Err(UnpackError::InvalidTopicId); - }; - - if operation_id != id { - return Err(UnpackError::InvalidTopicId); - } - - Ok((header.clone(), body, header.to_bytes())) -} diff --git a/reflection-node/src/topic.rs b/reflection-node/src/topic.rs deleted file mode 100644 index 20a6a4f4..00000000 --- a/reflection-node/src/topic.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::sync::Arc; - -use crate::operation::ReflectionExtensions; -use crate::operation_store::CreationError; - -use crate::network::LogSyncError; -use crate::subscription_inner::SubscriptionInner; -use p2panda_core::{Operation, PublicKey}; -use p2panda_sync::protocols::TopicLogSyncEvent; - -use p2panda_net::gossip::GossipError; -use thiserror::Error; -use tokio::sync::mpsc; -use tokio::task::{AbortHandle, JoinError}; -use tracing::info; - -pub type SyncHandleError = p2panda_net::sync::SyncHandleError< - Operation, - TopicLogSyncEvent, ->; - -#[derive(Debug, Error)] -pub enum TopicError { - #[error(transparent)] - TopicStore(#[from] sqlx::Error), - #[error(transparent)] - OperationStore(#[from] CreationError), - #[error(transparent)] - Encode(#[from] p2panda_core::cbor::EncodeError), - #[error(transparent)] - Publish(#[from] SyncHandleError), - #[error(transparent)] - PublishEphemeral(#[from] mpsc::error::SendError>), - #[error(transparent)] - Runtime(#[from] JoinError), -} - -#[derive(Debug, Error)] -pub enum SubscriptionError { - #[error(transparent)] - Gossip(#[from] GossipError), - #[error(transparent)] - LogSync(#[from] LogSyncError), - #[error(transparent)] - SyncHandle(#[from] SyncHandleError), -} - -pub trait SubscribableTopic: Sync + Send { - fn bytes_received(&self, author: PublicKey, data: Vec); - fn author_joined(&self, author: PublicKey); - fn author_left(&self, author: PublicKey); - fn ephemeral_bytes_received(&self, author: PublicKey, data: Vec); - fn error(&self, error: SubscriptionError); -} - -pub struct Subscription { - pub(crate) inner: Arc>, - pub(crate) runtime: tokio::runtime::Handle, - network_monitor_task: AbortHandle, -} - -impl Drop for Subscription { - fn drop(&mut self) { - self.network_monitor_task.abort(); - } -} - -impl Subscription { - pub(crate) async fn new(runtime: tokio::runtime::Handle, inner: SubscriptionInner) -> Self { - let inner = Arc::new(inner); - - let inner_clone = inner.clone(); - let network_monitor_task = runtime - .spawn(async move { - inner_clone.spawn_network_monitor().await; - }) - .abort_handle(); - - Subscription { - inner, - runtime, - network_monitor_task, - } - } - - pub async fn send_delta(&self, data: Vec) -> Result<(), TopicError> { - let inner = self.inner.clone(); - self.runtime - .spawn(async move { inner.send_delta(data).await }) - .await? - } - - pub async fn send_snapshot(&self, data: Vec) -> Result<(), TopicError> { - let inner = self.inner.clone(); - self.runtime - .spawn(async move { inner.send_snapshot(data).await }) - .await? - } - - pub async fn send_ephemeral(&self, data: Vec) -> Result<(), TopicError> { - let inner = self.inner.clone(); - self.runtime - .spawn(async move { inner.send_ephemeral(data).await }) - .await? - } - - pub async fn unsubscribe(self) -> Result<(), TopicError> { - let id = self.inner.id; - - self.network_monitor_task.abort(); - let inner = self.inner.clone(); - self.runtime - .spawn(async move { inner.unsubscribe().await }) - .await??; - - info!("Unsubscribed from topic {}", hex::encode(id)); - - Ok(()) - } - - /// Set the name for a given topic - /// - /// This information will be written to the database - pub async fn set_name(&self, name: Option) -> Result<(), TopicError> { - let inner = self.inner.clone(); - self.runtime - .spawn(async move { inner.set_name(name).await }) - .await? - } -} diff --git a/reflection-node/src/topic_store.rs b/reflection-node/src/topic_store.rs index b757abf1..a07a4e53 100644 --- a/reflection-node/src/topic_store.rs +++ b/reflection-node/src/topic_store.rs @@ -1,23 +1,13 @@ use std::collections::HashMap; -use std::hash::Hash as StdHash; use chrono::{DateTime, Utc}; -use p2panda_core::PublicKey; -use p2panda_net::TopicId; -use p2panda_store::LogStore; -use p2panda_sync::protocols::Logs; -use p2panda_sync::traits::TopicMap; -use serde::{Deserialize, Serialize}; +use p2panda_core::{PublicKey, Topic}; use sqlx::{FromRow, Row}; -use tracing::error; - -use crate::operation::{LogType, ReflectionExtensions}; -use crate::operation_store::OperationStore; #[derive(Debug, FromRow)] pub struct StoreTopic { #[sqlx(try_from = "Vec")] - pub id: TopicId, + pub id: Topic, #[sqlx(default)] pub name: Option, pub last_accessed: Option>, @@ -37,22 +27,10 @@ pub struct TopicStore { } impl TopicStore { - pub fn new(pool: sqlx::SqlitePool) -> Self { + pub fn from_pool(pool: sqlx::SqlitePool) -> Self { Self { pool } } - async fn authors(&self, id: &TopicId) -> sqlx::Result> { - let list = sqlx::query("SELECT public_key FROM authors WHERE topic_id = ?") - .bind(id.as_slice()) - .fetch_all(&self.pool) - .await?; - - Ok(list - .iter() - .filter_map(|row| PublicKey::try_from(row.get::<&[u8], _>("public_key")).ok()) - .collect()) - } - pub async fn topics(&self) -> sqlx::Result> { let mut topics: Vec = sqlx::query_as("SELECT id, name, last_accessed FROM topics") @@ -63,7 +41,7 @@ impl TopicStore { .await?; let mut authors_per_topic = authors.iter().fold(HashMap::new(), |mut acc, row| { - let Ok(id) = TopicId::try_from(row.get::<&[u8], _>("topic_id")) else { + let Ok(id) = Topic::try_from(row.get::<&[u8], _>("topic_id")) else { return acc; }; let Ok(public_key) = PublicKey::try_from(row.get::<&[u8], _>("public_key")) else { @@ -88,31 +66,32 @@ impl TopicStore { Ok(topics) } - pub async fn add_topic(&self, id: &TopicId) -> sqlx::Result<()> { - // The id is the primary key in the table therefore ignore insertion when the topic exists already + pub async fn add_topic(&self, topic: &Topic) -> sqlx::Result<()> { + // The id is the primary key in the table therefore ignore insertion when the topic exists + // already sqlx::query( " INSERT OR IGNORE INTO topics ( id ) VALUES ( ? ) ", ) - .bind(id.as_slice()) + .bind(topic.as_bytes().as_slice()) .execute(&self.pool) .await?; Ok(()) } - pub async fn delete_topic(&self, id: &TopicId) -> sqlx::Result<()> { + pub async fn delete_topic(&self, topic: &Topic) -> sqlx::Result<()> { sqlx::query("DELETE FROM topics WHERE id = ?") - .bind(id.as_slice()) + .bind(topic.as_bytes().as_slice()) .execute(&self.pool) .await?; Ok(()) } - pub async fn add_author(&self, id: &TopicId, public_key: &PublicKey) -> sqlx::Result<()> { + pub async fn add_author(&self, topic: &Topic, public_key: &PublicKey) -> sqlx::Result<()> { // The author/id pair is required to be unique therefore ignore if the insertion fails sqlx::query( " @@ -121,7 +100,7 @@ impl TopicStore { ", ) .bind(public_key.as_bytes().as_slice()) - .bind(id.as_slice()) + .bind(topic.as_bytes().as_slice()) .execute(&self.pool) .await?; @@ -148,7 +127,11 @@ impl TopicStore { Ok(()) } - pub async fn set_name_for_topic(&self, id: &TopicId, name: Option) -> sqlx::Result<()> { + pub async fn set_name_for_topic( + &self, + topic: &Topic, + name: Option, + ) -> sqlx::Result<()> { sqlx::query( " UPDATE topics @@ -157,7 +140,7 @@ impl TopicStore { ", ) .bind(name) - .bind(id.as_slice()) + .bind(topic.as_bytes().as_slice()) .execute(&self.pool) .await?; @@ -166,7 +149,7 @@ impl TopicStore { pub async fn set_last_accessed_for_topic( &self, - id: &TopicId, + topic: &Topic, last_accessed: Option>, ) -> sqlx::Result<()> { sqlx::query( @@ -177,81 +160,10 @@ impl TopicStore { ", ) .bind(last_accessed) - .bind(id.as_slice()) + .bind(topic.as_bytes().as_slice()) .execute(&self.pool) .await?; Ok(()) } - - pub async fn operations_for_topic( - &self, - operation_store: &OperationStore, - id: &TopicId, - ) -> sqlx::Result>> { - let operation_store = operation_store.inner(); - let authors = self.authors(id).await?; - - let log_ids = [ - LogId::new(LogType::Delta, id), - LogId::new(LogType::Snapshot, id), - ]; - - let mut result = Vec::new(); - - for author in authors.iter() { - for log_id in &log_ids { - let operations = match operation_store.get_log(author, log_id, None).await { - Ok(Some(operations)) => { - operations - .into_iter() - .map(|(header, body)| p2panda_core::Operation { - hash: header.hash(), - header, - body, - }) - } - Ok(None) => { - continue; - } - Err(error) => { - error!( - "Failed to load operation for {author} with log type {log_id:?}: {error}" - ); - continue; - } - }; - - result.extend(operations); - } - } - - Ok(result) - } -} - -#[derive(Clone, Debug, PartialEq, Eq, StdHash, Serialize, Deserialize)] -pub struct LogId(LogType, TopicId); - -impl LogId { - pub fn new(log_type: LogType, topic: &TopicId) -> Self { - Self(log_type, *topic) - } -} - -impl TopicMap> for TopicStore { - type Error = sqlx::Error; - - async fn get(&self, topic: &TopicId) -> Result, Self::Error> { - let authors = self.authors(topic).await?; - - let log_ids = [ - LogId::new(LogType::Delta, topic), - LogId::new(LogType::Snapshot, topic), - ]; - Ok(authors - .into_iter() - .map(|author| (author, log_ids.to_vec())) - .collect()) - } } diff --git a/reflection-node/src/traits.rs b/reflection-node/src/traits.rs new file mode 100644 index 00000000..9ac44e06 --- /dev/null +++ b/reflection-node/src/traits.rs @@ -0,0 +1,17 @@ +use p2panda::node::CreateStreamError; +use p2panda_core::PublicKey; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SubscriptionError { + #[error(transparent)] + CreateStream(#[from] CreateStreamError), +} + +pub trait SubscribableTopic: Sync + Send { + fn bytes_received(&self, author: PublicKey, data: Vec); + fn author_joined(&self, author: PublicKey); + fn author_left(&self, author: PublicKey); + fn ephemeral_bytes_received(&self, author: PublicKey, data: Vec); + fn error(&self, error: SubscriptionError); +} diff --git a/reflection-node/src/utils.rs b/reflection-node/src/utils.rs deleted file mode 100644 index a2989333..00000000 --- a/reflection-node/src/utils.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::pin::Pin; - -use sqlx::error::BoxDynError; -use sqlx::migrate::{Migration, MigrationSource, Migrator}; - -type BoxFuture<'a, T> = Pin + Send + 'a>>; - -/// Combine multiple `sqlx::migrate::Migrator` into a single `sqlx::migrate::MigrationSource` -/// -/// See for more details: https://github.com/launchbadge/sqlx/discussions/3407 -#[derive(Debug)] -pub struct CombinedMigrationSource { - migrators: Vec, -} - -impl CombinedMigrationSource { - pub fn new(migrators: Vec) -> CombinedMigrationSource { - Self { migrators } - } -} - -impl<'s> MigrationSource<'s> for CombinedMigrationSource { - fn resolve(self) -> BoxFuture<'s, Result, BoxDynError>> { - Box::pin(async move { - Ok(self - .migrators - .iter() - .flat_map(|migrator| migrator.iter()) - .cloned() - .collect()) - }) - } -}