-
Notifications
You must be signed in to change notification settings - Fork 1
feat!: libdatadog wasm #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5e262ed
61d4450
2746950
0530685
2ebd408
17c695e
5e54155
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| [workspace] | ||
| resolver = "2" | ||
| default-members = [ | ||
| "crates/crashtracker", | ||
| "crates/process_discovery", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| [package] | ||
| name = "libdatadog-nodejs-capabilities" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
| description = "Wasm capability implementations for libdatadog-nodejs (backed by JS transports)" | ||
|
|
||
| [lib] | ||
| crate-type = ["rlib"] | ||
|
|
||
| [dependencies] | ||
| wasm-bindgen = "0.2" | ||
| wasm-bindgen-futures = "0.4" | ||
| js-sys = "0.3" | ||
| serde_json = "1.0" | ||
| http = "1" | ||
| bytes = "1.4" | ||
| futures-core = "0.3" | ||
| anyhow = "1" | ||
| libdd-capabilities = { git = "https://github.com/DataDog/libdatadog.git", branch = "jwiriath/capability-traits-architecture" } | ||
|
|
||
| [dev-dependencies] | ||
| wasm-bindgen-test = "0.3" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Wasm implementation of [`HttpClientTrait`] backed by Node.js `http.request`. | ||
| //! | ||
| //! The JS transport is imported via `wasm_bindgen(module = ...)` from | ||
| //! `http_transport.js`, which ships alongside the wasm output. | ||
|
|
||
| use std::collections::HashMap; | ||
| use std::future::Future; | ||
|
|
||
| use bytes::Bytes; | ||
| use js_sys; | ||
| use wasm_bindgen::prelude::*; | ||
| use wasm_bindgen_futures::JsFuture; | ||
|
|
||
| use libdd_capabilities::http::{HttpClientTrait, HttpError}; | ||
| use libdd_capabilities::maybe_send::MaybeSend; | ||
|
|
||
| #[wasm_bindgen(module = "/src/http_transport.js")] | ||
| extern "C" { | ||
| #[wasm_bindgen(js_name = "httpRequest")] | ||
| fn http_request( | ||
| method: &str, | ||
| url: &str, | ||
| headers_json: &str, | ||
| body: &[u8], | ||
| ) -> js_sys::Promise; | ||
| } | ||
|
|
||
| /// Wasm [`HttpClientTrait`] implementation that delegates to Node.js HTTP. | ||
| /// | ||
| /// Named `DefaultHttpClient` to match the native version's public API. | ||
| pub struct DefaultHttpClient; | ||
|
|
||
| impl HttpClientTrait for DefaultHttpClient { | ||
| fn new_client() -> Self { | ||
| Self | ||
| } | ||
|
|
||
| #[allow(clippy::manual_async_fn)] | ||
| fn request( | ||
| &self, | ||
| req: http::Request<Bytes>, | ||
| ) -> impl Future<Output = Result<http::Response<Bytes>, HttpError>> + MaybeSend { | ||
| async move { | ||
| let method = req.method().as_str().to_owned(); | ||
| let url = req.uri().to_string(); | ||
| let headers_json = serialize_headers(req.headers())?; | ||
| let body = req.into_body(); | ||
|
|
||
| let result = JsFuture::from(http_request(&method, &url, &headers_json, &body)) | ||
| .await | ||
| .map_err(|e| HttpError::Network(anyhow::anyhow!("{:?}", e)))?; | ||
|
|
||
| let status = js_sys::Reflect::get(&result, &JsValue::from_str("status")) | ||
| .map_err(|_| HttpError::Other(anyhow::anyhow!("missing status in response")))? | ||
| .as_f64() | ||
| .ok_or_else(|| HttpError::Other(anyhow::anyhow!("status is not a number")))? | ||
| as u16; | ||
|
|
||
| let headers = parse_response_headers(&result)?; | ||
|
|
||
| let body_js = js_sys::Reflect::get(&result, &JsValue::from_str("body")) | ||
| .map_err(|_| HttpError::Other(anyhow::anyhow!("missing body in response")))?; | ||
|
|
||
| let body = if body_js.is_undefined() || body_js.is_null() { | ||
| Bytes::new() | ||
| } else { | ||
| Bytes::from(js_sys::Uint8Array::new(&body_js).to_vec()) | ||
| }; | ||
|
|
||
| let mut builder = http::Response::builder().status(status); | ||
| for (name, value) in &headers { | ||
| builder = builder.header(name.as_str(), value.as_str()); | ||
| } | ||
| builder | ||
| .body(body) | ||
| .map_err(|e| HttpError::Other(e.into())) | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /// Parse response headers from a JS object `{ "header-name": "value", ... }`. | ||
| /// | ||
| /// Node.js `res.headers` returns lowercased header names with string values. | ||
| fn parse_response_headers(result: &JsValue) -> Result<Vec<(String, String)>, HttpError> { | ||
| let headers_js = js_sys::Reflect::get(result, &JsValue::from_str("headers")) | ||
| .map_err(|_| HttpError::Other(anyhow::anyhow!("missing headers in response")))?; | ||
|
|
||
| if headers_js.is_undefined() || headers_js.is_null() { | ||
| return Ok(Vec::new()); | ||
| } | ||
|
|
||
| let entries = js_sys::Object::entries(&js_sys::Object::unchecked_from_js(headers_js)); | ||
| let mut headers = Vec::with_capacity(entries.length() as usize); | ||
| for i in 0..entries.length() { | ||
| let entry = js_sys::Array::from(&entries.get(i)); | ||
| if let (Some(key), Some(value)) = (entry.get(0).as_string(), entry.get(1).as_string()) { | ||
| headers.push((key, value)); | ||
| } | ||
| } | ||
| Ok(headers) | ||
| } | ||
|
|
||
| fn serialize_headers(headers: &http::HeaderMap) -> Result<String, HttpError> { | ||
| let mut map: HashMap<&str, Vec<&str>> = HashMap::new(); | ||
| for (name, value) in headers.iter() { | ||
| map.entry(name.as_str()) | ||
| .or_default() | ||
| .push(value.to_str().unwrap_or("")); | ||
| } | ||
| let flat: HashMap<&str, String> = map | ||
| .into_iter() | ||
| .map(|(k, v)| (k, v.join(", "))) | ||
| .collect(); | ||
| serde_json::to_string(&flat) | ||
| .map_err(|e| HttpError::InvalidRequest(e.into())) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| const http = require('http'); | ||
| const https = require('https'); | ||
|
|
||
| module.exports.httpRequest = function (method, url, headersJson, body) { | ||
| const headers = JSON.parse(headersJson || '{}'); | ||
| headers['Content-Length'] = body.length; | ||
| const parsed = new URL(url); | ||
| const transport = parsed.protocol === 'https:' ? https : http; | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a promise here is a bit of a code smell. That said, it's the easiest to map to Futures, so I'm fine with keeping it here.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean like using |
||
| const req = transport.request( | ||
| { | ||
| hostname: parsed.hostname, | ||
| port: parsed.port, | ||
| path: parsed.pathname + parsed.search, | ||
| method, | ||
| headers, | ||
| }, | ||
| (res) => { | ||
| const chunks = []; | ||
| res.on('data', (chunk) => chunks.push(chunk)); | ||
| res.on('end', () => { | ||
| resolve({ | ||
| status: res.statusCode, | ||
| headers: res.headers, | ||
| body: new Uint8Array(Buffer.concat(chunks)), | ||
| }); | ||
| }); | ||
| } | ||
| ); | ||
| req.on('error', reject); | ||
| req.write(body); | ||
| req.end(); | ||
| }); | ||
| }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Wasm capability implementations for libdatadog-nodejs. | ||
| //! | ||
| //! `WasmCapabilities` is the bundle struct that implements all capability | ||
| //! traits using wasm_bindgen + JS transports. The wasm binding crate pins | ||
| //! this type as the generic parameter for libdatadog structs. | ||
|
|
||
| pub mod http; | ||
|
|
||
| pub use http::DefaultHttpClient; | ||
|
|
||
| /// Bundle struct for wasm platform capabilities. | ||
| /// | ||
| /// Currently delegates to `DefaultHttpClient` for HTTP. As more capability | ||
| /// traits are added (spawn, sleep, etc.), this type will implement all of them. | ||
| pub type WasmCapabilities = DefaultHttpClient; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| [package] | ||
| name = "trace-exporter" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
| description = "Wasm binding for libdd-data-pipeline TraceExporter" | ||
|
|
||
| [lib] | ||
| crate-type = ["cdylib", "rlib"] | ||
|
|
||
| [dependencies] | ||
| wasm-bindgen = "0.2" | ||
| wasm-bindgen-futures = "0.4" | ||
| js-sys = "0.3" | ||
| libdatadog-nodejs-capabilities = { path = "../capabilities" } | ||
| libdd-capabilities = { git = "https://github.com/DataDog/libdatadog.git", branch = "jwiriath/capability-traits-architecture" } | ||
| libdd-data-pipeline = { git = "https://github.com/DataDog/libdatadog.git", branch = "jwiriath/capability-traits-architecture", default-features = false } | ||
| console_error_panic_hook = "0.1" | ||
|
|
||
| [target.'cfg(target_arch = "wasm32")'.dependencies] | ||
| getrandom = { version = "0.2", features = ["js"] } | ||
| uuid = { version = "1", features = ["js"] } | ||
|
|
||
| [dev-dependencies] | ||
| wasm-bindgen-test = "0.3" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Wasm binding for `TraceExporter<WasmCapabilities>`. | ||
| //! | ||
| //! This crate exposes libdatadog's trace exporter to JavaScript via | ||
| //! `wasm_bindgen`. The generic parameter is pinned to `WasmCapabilities`, | ||
| //! which routes I/O (HTTP, etc.) back into JavaScript. | ||
|
|
||
| use libdatadog_nodejs_capabilities::WasmCapabilities; | ||
| use libdd_data_pipeline::trace_exporter::{ | ||
| agent_response::AgentResponse, TraceExporter, TraceExporterInputFormat, | ||
| TraceExporterOutputFormat, | ||
| }; | ||
| use wasm_bindgen::prelude::*; | ||
|
|
||
| #[wasm_bindgen(start)] | ||
| fn init() { | ||
| console_error_panic_hook::set_once(); | ||
| } | ||
|
|
||
| #[wasm_bindgen] | ||
| pub struct JsTraceExporter { | ||
| inner: TraceExporter<WasmCapabilities>, | ||
| } | ||
|
|
||
| #[wasm_bindgen] | ||
| impl JsTraceExporter { | ||
| #[wasm_bindgen(constructor)] | ||
| pub fn new(url: &str, service: &str) -> Result<JsTraceExporter, JsValue> { | ||
| let mut builder = TraceExporter::<WasmCapabilities>::builder(); | ||
| builder | ||
| .set_url(url) | ||
| .set_service(service) | ||
| .set_language("javascript") | ||
| .set_language_version("") | ||
| .set_language_interpreter("nodejs") | ||
| .set_input_format(TraceExporterInputFormat::V04) | ||
| .set_output_format(TraceExporterOutputFormat::V04); | ||
|
|
||
| let exporter = builder | ||
| .build() | ||
| .map_err(|e| JsValue::from_str(&format!("{:?}", e)))?; | ||
|
|
||
| Ok(JsTraceExporter { inner: exporter }) | ||
| } | ||
|
|
||
| #[wasm_bindgen] | ||
| pub async fn send(&self, data: &[u8]) -> Result<JsValue, JsValue> { | ||
| let result = self | ||
| .inner | ||
| .send_async(data) | ||
| .await | ||
| .map_err(|e| JsValue::from_str(&format!("{:?}", e)))?; | ||
|
|
||
| match result { | ||
| AgentResponse::Changed { body } => Ok(JsValue::from_str(&body)), | ||
| AgentResponse::Unchanged => Ok(JsValue::NULL), | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| [toolchain] | ||
| channel = "1.84.1" | ||
| channel = "stable" | ||
| profile = "minimal" | ||
| components = ["clippy", "rustfmt", "rust-src"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| 'use strict' | ||
|
|
||
| // This test exercises the full call flow: | ||
| // JS -> wasm (TraceExporter logic) -> JS (http_transport.js for I/O) -> wasm -> JS | ||
| // | ||
| // To run: | ||
| // 1. Build: wasm-pack build --target nodejs ./crates/trace_exporter --out-dir ../../prebuilds/trace_exporter | ||
| // 2. Run: node test_wasm.js trace_exporter | ||
|
|
||
| const http = require('http') | ||
| const loader = require('../../../load.js') | ||
| const assert = require('assert') | ||
|
|
||
| const traceExporter = loader.load('trace_exporter') | ||
| assert(traceExporter !== undefined, 'trace_exporter wasm module loaded') | ||
|
|
||
| // Start a minimal HTTP server that acts as a mock Datadog agent | ||
| const server = http.createServer((req, res) => { | ||
| let body = [] | ||
| req.on('data', chunk => body.push(chunk)) | ||
| req.on('end', () => { | ||
| // Return a minimal agent response | ||
| res.writeHead(200, { 'content-type': 'application/json' }) | ||
| res.end(JSON.stringify({ rate_by_service: {} })) | ||
| }) | ||
| }) | ||
|
|
||
| server.listen(0, '127.0.0.1', async () => { | ||
| const port = server.address().port | ||
| const url = `http://127.0.0.1:${port}` | ||
|
|
||
| try { | ||
| // Create a TraceExporter pointing at the mock agent | ||
| const exporter = new traceExporter.JsTraceExporter(url, 'test-service') | ||
| assert(exporter !== undefined, 'JsTraceExporter created') | ||
|
|
||
| // Send a minimal msgpack-encoded v0.4 trace payload (empty array of traces) | ||
| // This is msgpack for [[]] — one trace containing no spans | ||
| const payload = new Uint8Array([0x91, 0x90]) | ||
| const result = await exporter.send(payload) | ||
|
|
||
| console.log('Trace export result:', result) | ||
| console.log('PASS: wasm trace exporter integration test') | ||
| } catch (err) { | ||
| console.error('Test error:', err) | ||
| process.exitCode = 1 | ||
| } finally { | ||
| server.close() | ||
| } | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dealing w/ JsValue directly can sometimes be a perf bottleneck. It's fine for now though. We can benchmark later.
Alternatives would be sending un-parsed headers (i.e. raw packet data) in here directly. Yep, that's a thing we can do.
We can also consider the network socket interface as a potential replacement layer here. Regardless, we have tons of room to figure that out and make improvements as we go, so this is all fine for now.