Skip to content

Commit 4aca936

Browse files
committed
p2p_http: add live logs sink and document P2P HTTP usage
1 parent 0bf96b4 commit 4aca936

4 files changed

Lines changed: 166 additions & 60 deletions

File tree

README.md

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,54 +108,113 @@ Validation rules:
108108

109109
---
110110

111-
## Usage Example
111+
## Minimal P2P example (no HTTP)
112+
113+
This example starts two P2P nodes and connects one to the other over TCP.
114+
### Node A (listener)
112115

113116
```cpp
114117
#include <vix.hpp>
115118
#include <vix/console.hpp>
116119
#include <vix/p2p/Node.hpp>
117120
#include <vix/p2p/P2P.hpp>
118-
#include <vix/p2p_http/P2PHttp.hpp>
119121

120122
using namespace vix;
121123

122124
int main()
123125
{
124-
App app;
125-
126126
vix::p2p::NodeConfig cfg;
127-
cfg.node_id = "A";
127+
cfg.node_id = "node-A";
128128
cfg.listen_port = 9001;
129129

130+
cfg.on_log = [](std::string_view s) {
131+
console.info(std::string(s));
132+
};
133+
130134
auto node = vix::p2p::make_tcp_node(cfg);
131135
vix::p2p::P2PRuntime runtime(node);
136+
132137
runtime.start();
133138

134-
vix::p2p_http::P2PHttpOptions opt;
135-
opt.prefix = "/api/p2p";
136-
opt.enable_ping = true;
137-
opt.enable_status = true;
138-
opt.enable_peers = true;
139-
opt.enable_logs = true;
140-
opt.enable_live_logs = true;
141-
opt.stats_every_ms = 250;
139+
console.info("Node A running on port 9001");
140+
runtime.wait(); // blocks
141+
142+
return 0;
143+
}
144+
```
145+
### Node B (connects to A)
146+
147+
```cpp
148+
#include <vix.hpp>
149+
#include <vix/console.hpp>
150+
#include <vix/p2p/Node.hpp>
151+
#include <vix/p2p/P2P.hpp>
152+
153+
using namespace vix;
154+
155+
int main()
156+
{
157+
vix::p2p::NodeConfig cfg;
158+
cfg.node_id = "node-B";
159+
160+
cfg.on_log = [](std::string_view s) {
161+
console.info(std::string(s));
162+
};
142163

143-
vix::p2p_http::registerRoutes(app, runtime, opt);
164+
auto node = vix::p2p::make_tcp_node(cfg);
165+
vix::p2p::P2PRuntime runtime(node);
144166

145-
app.static_dir("./public");
146-
app.get("/", [](Request &, Response &res)
147-
{ res.file("./public/index.html"); });
167+
runtime.start();
168+
169+
vix::p2p::PeerEndpoint ep;
170+
ep.host = "127.0.0.1";
171+
ep.port = 9001;
172+
ep.scheme = "tcp";
148173

149-
app.get("/connect", [](Request &, Response &res)
150-
{ res.file("./public/connect.html"); });
174+
node->connect(ep);
151175

152-
app.listen(5178, [](const vix::utils::ServerReadyInfo &info)
153-
{ console.info("UI API listening on", info.port); });
176+
console.info("Node B connecting to node A...");
177+
runtime.wait(); // blocks
154178

155-
app.wait();
156-
runtime.stop();
179+
return 0;
157180
}
158181
```
182+
## What this demonstrates
183+
184+
✅ Pure P2P (no HTTP, no UI)
185+
✅ Asynchronous TCP transport
186+
✅ Secure handshake (Hello / Ack / Finish)
187+
✅ Heartbeats + peer lifecycle
188+
✅ Logs via cfg.on_log
189+
190+
## How to run
191+
```bash
192+
# terminal 1
193+
vix run node_a.cpp
194+
195+
# terminal 2
196+
vix run node_b.cpp
197+
```
198+
## You should see logs like:
199+
200+
```bash
201+
[p2p] connect() requested: tcp://127.0.0.1:9001
202+
[p2p] connect() ready: peer_id=...
203+
[p2p] handshake completed
204+
```
205+
206+
## Why this example is important
207+
208+
- Shows the smallest useful P2P setup
209+
- No framework noise
210+
- Easy mental model
211+
212+
Perfect starting point for:
213+
214+
- messaging
215+
- replication
216+
- offline-first sync
217+
- custom protocols
159218

160219
---
161220

include/vix/p2p_http/P2PHttp.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ namespace vix::p2p_http
3232
vix::App &app,
3333
vix::p2p::P2PRuntime &runtime,
3434
const P2PHttpOptions &opt);
35+
36+
void shutdown_live_logs();
37+
void set_live_log_sink(std::function<void(std::string)> sink);
3538
}
3639

3740
#endif // VIX_P2P_HTTP_HPP

include/vix/p2p_http/P2PHttpOptions.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
namespace vix::p2p_http
2424
{
2525
using AuthHookCtx = std::function<bool(vix::mw::Context &)>;
26-
2726
using AuthHookLegacy = std::function<bool(vix::vhttp::Request &, vix::vhttp::ResponseWrapper &)>;
27+
using LogSink = std::function<void(std::string_view)>;
2828

2929
struct P2PHttpOptions
3030
{
@@ -40,6 +40,8 @@ namespace vix::p2p_http
4040

4141
AuthHookCtx auth_ctx = nullptr;
4242
AuthHookLegacy auth_legacy = nullptr;
43+
44+
LogSink log_sink = nullptr;
4345
};
4446

4547
}

src/P2PHttp.cpp

Lines changed: 78 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,48 @@ namespace vix::p2p_http
7575

7676
static std::atomic<bool> g_tick_started{false};
7777
static std::atomic<bool> g_tick_stop{false};
78+
static std::thread g_tick_thread;
79+
static std::mutex g_tick_mu;
80+
81+
static std::function<void(std::string)> g_external_sink = nullptr;
82+
83+
static void p2p_http_sink(std::string line)
84+
{
85+
if (g_external_sink)
86+
{
87+
g_external_sink(std::move(line));
88+
return;
89+
}
90+
g_logs.push(std::move(line));
91+
}
92+
93+
void set_live_log_sink(std::function<void(std::string)> sink)
94+
{
95+
g_external_sink = std::move(sink);
96+
}
97+
98+
static void push_log(const P2PHttpOptions *opt, std::string line)
99+
{
100+
if (opt && opt->log_sink)
101+
{
102+
opt->log_sink(line);
103+
return;
104+
}
105+
g_logs.push(std::move(line));
106+
}
107+
108+
void shutdown_live_logs()
109+
{
110+
std::lock_guard<std::mutex> lk(g_tick_mu);
111+
112+
g_tick_stop.store(true);
113+
114+
if (g_tick_thread.joinable())
115+
g_tick_thread.join();
116+
117+
vix::p2p::clear_global_log_sink();
118+
g_tick_started.store(false);
119+
}
78120

79121
static std::string stats_line_plain(const vix::p2p::RuntimeStats &st)
80122
{
@@ -202,49 +244,49 @@ namespace vix::p2p_http
202244
{
203245
const std::string base = (opt.prefix.empty() ? "/p2p" : opt.prefix);
204246

205-
g_logs.push("[p2p_http] routes registered");
247+
push_log(&opt, "[p2p_http] routes registered");
248+
vix::p2p::set_global_log_sink([](std::string_view s)
249+
{ p2p_http_sink(std::string(s)); });
206250

207251
if (opt.enable_live_logs && opt.enable_logs)
208252
{
209-
bool expected = false;
210-
if (g_tick_started.compare_exchange_strong(expected, true))
211-
{
212-
g_tick_stop.store(false);
253+
const int every = (opt.stats_every_ms <= 0 ? 1000 : opt.stats_every_ms);
254+
auto *rt = &runtime;
255+
256+
std::lock_guard<std::mutex> lk(g_tick_mu);
213257

214-
const int every = (opt.stats_every_ms <= 0 ? 1000 : opt.stats_every_ms);
258+
if (g_tick_started.load())
259+
return;
215260

216-
auto *rt = &runtime;
261+
g_tick_started.store(true);
262+
g_tick_stop.store(false);
217263

218-
std::thread(
219-
[rt, every]()
220-
{
221-
vix::p2p::RuntimeStats last{};
222-
while (!g_tick_stop.load())
223-
{
224-
const auto st = rt->runtime_stats();
225-
226-
const bool changed =
227-
(st.peers_total != last.peers_total) ||
228-
(st.peers_connected != last.peers_connected) ||
229-
(st.handshakes_started != last.handshakes_started) ||
230-
(st.handshakes_completed != last.handshakes_completed) ||
231-
232-
(st.connect.connect_attempts != last.connect.connect_attempts) ||
233-
(st.connect.connect_deduped != last.connect.connect_deduped) ||
234-
(st.connect.connect_failures != last.connect.connect_failures) ||
235-
(st.connect.backoff_skips != last.connect.backoff_skips) ||
236-
(st.connect.tracked_endpoints != last.connect.tracked_endpoints);
237-
238-
if (changed)
239-
{
240-
g_logs.push(std::string("[p2p] ") + stats_line_plain(st));
241-
last = st;
242-
}
264+
g_tick_thread = std::thread([rt, every]()
265+
{
266+
vix::p2p::RuntimeStats last{};
267+
while (!g_tick_stop.load())
268+
{
269+
const auto st = rt->runtime_stats();
270+
271+
const bool changed =
272+
(st.peers_total != last.peers_total) ||
273+
(st.peers_connected != last.peers_connected) ||
274+
(st.handshakes_started != last.handshakes_started) ||
275+
(st.handshakes_completed != last.handshakes_completed) ||
276+
(st.connect.connect_attempts != last.connect.connect_attempts) ||
277+
(st.connect.connect_deduped != last.connect.connect_deduped) ||
278+
(st.connect.connect_failures != last.connect.connect_failures) ||
279+
(st.connect.backoff_skips != last.connect.backoff_skips) ||
280+
(st.connect.tracked_endpoints != last.connect.tracked_endpoints);
281+
282+
if (changed)
283+
{
284+
p2p_http_sink(std::string("[p2p] ") + stats_line_plain(st));
285+
last = st;
286+
}
243287

244-
std::this_thread::sleep_for(std::chrono::milliseconds(every));
245-
} })
246-
.detach();
247-
}
288+
std::this_thread::sleep_for(std::chrono::milliseconds(every));
289+
} });
248290
}
249291

250292
// GET /p2p/ping

0 commit comments

Comments
 (0)