diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index e7a5b58..c439811 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -60,9 +60,37 @@ jobs:
run: ./gradlew --stacktrace --no-problems-report native-cli:distro
shell: bash
+ # Install Python build dependencies (setuptools/wheel may be missing on Windows runners)
+ - name: Install Python build dependencies
+ run: python3 -m pip install --upgrade setuptools wheel
+ shell: bash
+
+ # Generate native-lib python wheel
+ - name: Create Native Lib Python Wheel
+ run: ./gradlew --stacktrace --no-problems-report native-lib:buildPythonWheel
+ shell: bash
+
# Upload the artifact file
- name: Upload generated script
uses: actions/upload-artifact@v4
with:
name: dw-${{env.NATIVE_VERSION}}-${{runner.os}}
path: native-cli/build/distributions/native-cli-${{env.NATIVE_VERSION}}-native-distro-${{ matrix.script_name }}.zip
+
+ # Upload the Python wheel
+ - name: Upload Python wheel
+ uses: actions/upload-artifact@v4
+ with:
+ name: dw-python-wheel-${{env.NATIVE_VERSION}}-${{runner.os}}
+ path: native-lib/python/dist/dataweave_native-0.0.1-py3-*.whl
+
+ # Upload the native shared library + header together per OS
+ - name: Upload native shared library
+ uses: actions/upload-artifact@v4
+ with:
+ name: dwlib-${{env.NATIVE_VERSION}}-${{runner.os}}
+ path: |
+ native-lib/python/src/dataweave/native/dwlib.dylib
+ native-lib/python/src/dataweave/native/dwlib.so
+ native-lib/python/src/dataweave/native/dwlib.dll
+ native-lib/python/src/dataweave/native/dwlib.h
diff --git a/.gitignore b/.gitignore
index 6091d76..327b866 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,4 +19,9 @@ out/
.DS_Store
# GraalVM
-.graalvm
\ No newline at end of file
+.graalvm
+
+grimoires/
+
+.windsurf/
+.claude/
diff --git a/build.gradle b/build.gradle
index 9792b28..944d683 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,3 +1,13 @@
+buildscript {
+ repositories {
+ gradlePluginPortal()
+ mavenCentral()
+ }
+ dependencies {
+ classpath "org.graalvm.buildtools.native:org.graalvm.buildtools.native.gradle.plugin:0.11.2"
+ }
+}
+
plugins {
id "scala"
id "maven-publish"
@@ -11,6 +21,8 @@ subprojects {
apply plugin: 'maven-publish'
apply plugin: 'scala'
+ apply plugin: 'org.graalvm.buildtools.native'
+
group = 'org.mule.weave.native'
version = nativeVersion
@@ -21,8 +33,8 @@ subprojects {
compileJava {
- sourceCompatibility = '11'
- targetCompatibility = '11'
+ sourceCompatibility = '17'
+ targetCompatibility = '17'
}
repositories {
diff --git a/gradle.properties b/gradle.properties
index 01b5261..b3d45c4 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,10 +1,10 @@
-weaveVersion=2.11.0-20251023
-weaveTestSuiteVersion=2.11.0-20251023
+weaveVersion=2.12.0-20260413
+weaveTestSuiteVersion=2.12.0-20260413
nativeVersion=100.100.100
scalaVersion=2.12.18
-ioVersion=2.11.0-SNAPSHOT
+ioVersion=2.12.0-20260408
graalvmVersion=24.0.2
-weaveSuiteVersion=2.11.0-20251023
+weaveSuiteVersion=2.12.0-20260413
#Libaries
scalaTestVersion=3.2.15
scalaTestPluginVersion=0.33
diff --git a/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala b/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala
index 29a1ac0..66c23ba 100644
--- a/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala
+++ b/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala
@@ -259,6 +259,7 @@ class TCKCliTest extends AnyFunSpec with Matchers
"lazy_metadata_definition",
"module-singleton",
"multipart-write-binary",
+ "private_scope_directives",
"read-binary-files",
"underflow",
"try",
@@ -345,12 +346,16 @@ class TCKCliTest extends AnyFunSpec with Matchers
baseArray ++
Array(
"math-toRadians",
- "try-handle-lazy-values-with-failures"
+ "try-handle-lazy-values-with-failures",
+ "weave_ast_example",
+ "weave_ast_module"
)
} else if (versionString == "2.10") {
baseArray ++
Array(
- "try-handle-lazy-values-with-failures"
+ "try-handle-lazy-values-with-failures",
+ "weave_ast_example",
+ "weave_ast_module"
)
} else {
baseArray
diff --git a/native-cli/build.gradle b/native-cli/build.gradle
index b567687..5ce3b5e 100644
--- a/native-cli/build.gradle
+++ b/native-cli/build.gradle
@@ -2,8 +2,6 @@ plugins {
id "com.github.maiflai.scalatest" version "${scalaTestPluginVersion}"
id 'application'
- // Apply GraalVM Native Image plugin
- id 'org.graalvm.buildtools.native' version '0.11.2'
}
sourceSets {
diff --git a/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala b/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala
index 6a80b38..f5fbca8 100644
--- a/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala
+++ b/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala
@@ -211,9 +211,3 @@ case class WeaveFailureResult(message: String) extends WeaveExecutionResult {
override def result(): String = message
}
-
-
-class CustomWeaveDataFormat(moduleManager: ModuleLoaderManager) extends WeaveDataFormat {
- override def createModuleLoader(): ModuleLoaderManager = moduleManager
-}
-
diff --git a/native-lib/.gitignore b/native-lib/.gitignore
new file mode 100644
index 0000000..0e845cc
--- /dev/null
+++ b/native-lib/.gitignore
@@ -0,0 +1,4 @@
+python/src/dataweave/native/
+python/src/dataweave_native.egg-info/
+python/dist/
+python/build/
diff --git a/native-lib/README.md b/native-lib/README.md
new file mode 100644
index 0000000..e829ecf
--- /dev/null
+++ b/native-lib/README.md
@@ -0,0 +1,323 @@
+# native-lib
+
+## Overview
+
+`native-lib` builds a **GraalVM native shared library** that embeds the MuleSoft **DataWeave runtime** and exposes a small C-compatible API.
+
+The main purpose is to allow non-JVM consumers (most notably the Python package in `native-lib/python`) to execute DataWeave scripts **without running a JVM**, while still using the official DataWeave runtime.
+
+## Architecture (GraalVM + FFI)
+
+```
+┌─────────────────────────────────────────────┐
+│ Python Process │
+│ │
+│ ┌────────────────────────────────────────┐ │
+│ │ Application Script │ │
+│ │ - Python: ctypes │ │
+│ └──────────────┬─────────────────────────┘ │
+│ │ │
+│ │ FFI Call │
+│ ▼ │
+│ ┌────────────────────────────────────────┐ │
+│ │ Native Shared Library (dwlib) │ │
+│ │ ┌──────────────────────────────────┐ │ │
+│ │ │ GraalVM Isolate │ │ │
+│ │ │ - NativeLib.run_script() │ │ │
+│ │ │ - DataWeave script execution │ │ │
+│ │ └──────────────────────────────────┘ │ │
+│ └────────────────────────────────────────┘ │
+└─────────────────────────────────────────────┘
+```
+
+## Building with Gradle
+
+### Prerequisites
+
+- A GraalVM distribution installed that includes `native-image`.
+- Enough memory for native-image (this build config uses `-J-Xmx6G`).
+
+### Build the shared library
+
+From the repository root:
+
+```bash
+./gradlew :native-lib:nativeCompile
+```
+
+The shared library is produced under:
+
+- `native-lib/build/native/nativeCompile/`
+
+and is named:
+
+- macOS: `dwlib.dylib`
+- Linux: `dwlib.so`
+- Windows: `dwlib.dll`
+
+### Stage the library into the Python package (dev workflow)
+
+```bash
+./gradlew :native-lib:stagePythonNativeLib
+```
+
+This copies `dwlib.*` into:
+
+- `native-lib/python/src/dataweave/native/`
+
+### Build a Python wheel (bundles the native library)
+
+```bash
+./gradlew :native-lib:buildPythonWheel
+```
+
+The wheel will be created in:
+
+- `native-lib/python/dist/`
+
+## Installing for use in a Python project
+
+### Option A: Install the produced wheel (recommended)
+
+After `:native-lib:buildPythonWheel`:
+
+```bash
+python3 -m pip install native-lib/python/dist/dataweave_native-0.0.1-*.whl
+```
+
+This wheel includes the `dwlib.*` shared library inside the Python package.
+
+### Option B: Editable install for development
+
+1. Stage the native library:
+
+```bash
+./gradlew :native-lib:stagePythonNativeLib
+```
+
+2. Install the Python package in editable mode:
+
+```bash
+python3 -m pip install -e native-lib/python
+```
+
+### Option C: Use an externally-built library via an environment variable
+
+If you want to point Python at a specific built artifact, set:
+
+- `DATAWEAVE_NATIVE_LIB=/absolute/path/to/dwlib.(dylib|so|dll)`
+
+The Python module will also try a few fallbacks (including the wheel-bundled location).
+
+## Using the library (Python examples)
+
+All examples below assume:
+
+```python
+import dataweave
+```
+
+### 1) Simple script
+
+```python
+result = dataweave.run("2 + 2")
+assert result.success is True
+print(result.get_string()) # "4"
+```
+
+### 2) Script with inputs (auto-detected types)
+
+Inputs can be plain Python values. The module auto-encodes them as JSON or text.
+
+```python
+result = dataweave.run(
+ "num1 + num2",
+ {"num1": 25, "num2": 17},
+)
+print(result.get_string()) # "42"
+```
+
+### 3) Script with inputs (explicit mime type, charset, properties)
+
+Use an explicit input dict when you need full control over how DataWeave interprets bytes.
+
+```python
+script = "payload.person"
+xml_bytes = b"Billy31".decode("utf-8").encode("utf-16")
+
+result = dataweave.run(
+ script,
+ {
+ "payload": {
+ "content": xml_bytes,
+ "mimeType": "application/xml",
+ "charset": "UTF-16",
+ "properties": {
+ "nullValueOn": "empty",
+ "maxAttributeSize": 256
+ },
+ }
+ },
+)
+
+if result.success:
+ print(result.get_string())
+else:
+ print(result.error)
+```
+
+You can also use `InputValue` for the same purpose:
+
+```python
+input_value = dataweave.InputValue(
+ content="1234567",
+ mime_type="application/csv",
+ properties={"header": False, "separator": "4"},
+)
+
+result = dataweave.run("in0.column_1[0]", {"in0": input_value})
+print(result.get_string()) # '"567"'
+```
+
+### 4) Context manager (explicit lifecycle)
+
+The module-level API (`dataweave.run(...)`) uses a shared singleton. Use `DataWeave` directly when you need explicit control over isolate lifecycle or want multiple independent instances:
+
+```python
+with dataweave.DataWeave() as dw:
+ r1 = dw.run("2 + 2")
+ r2 = dw.run("x + y", {"x": 10, "y": 32})
+
+ print(r1.get_string()) # "4"
+ print(r2.get_string()) # "42"
+```
+
+### 5) Error handling
+
+There are three error types:
+
+- `DataWeaveLibraryNotFoundError` — the native library cannot be located/loaded.
+- `DataWeaveScriptError` — script compilation or runtime error (subclass of `DataWeaveError`). Carries the full result on `.result`.
+- `DataWeaveError` — FFI-level failures (isolate creation, library calls).
+
+**Option A: Use `raise_on_error=True` for a single try/except (recommended)**
+
+```python
+try:
+ result = dataweave.run("invalid syntax here", raise_on_error=True)
+ print(result.get_string())
+
+except dataweave.DataWeaveScriptError as e:
+ print(f"Script error: {e.result.error}")
+
+except dataweave.DataWeaveLibraryNotFoundError:
+ # Build it first: ./gradlew :native-lib:nativeCompile
+ raise
+```
+
+**Option B: Check `result.success` manually (default, backward-compatible)**
+
+```python
+result = dataweave.run("invalid syntax here")
+
+if not result.success:
+ print(f"Error: {result.error}")
+else:
+ print(result.get_string())
+```
+
+### 6) Output streaming
+
+Use `run_streaming` to execute a script and receive output chunks as they are produced, without buffering the entire result in memory.
+
+```python
+with dataweave.DataWeave() as dw:
+ stream = dw.run_streaming("output application/json --- (1 to 10000) map {id: $}")
+ for chunk in stream:
+ sys.stdout.buffer.write(chunk)
+ metadata = stream.metadata # StreamingResult with mime_type, charset, etc.
+ print(f"\nDone: {metadata.mime_type}, {metadata.charset}")
+```
+
+Or with the module-level API:
+
+```python
+stream = dataweave.run_streaming("output application/csv --- payload", {"payload": [1, 2, 3]})
+output = b"".join(stream)
+```
+
+### 7) Input and output streaming
+
+Use `run_transform` to stream both input and output — feed an iterable of bytes in, receive a generator of bytes out. Ideal for processing large files or network streams with constant memory.
+
+```python
+# Stream a file through DataWeave
+with open("large.json", "rb") as f:
+ stream = dataweave.run_transform(
+ "output application/csv --- payload",
+ input_stream=iter(lambda: f.read(8192), b""),
+ input_mime_type="application/json",
+ )
+ with open("output.csv", "wb") as out:
+ for chunk in stream:
+ out.write(chunk)
+ metadata = stream.metadata
+```
+
+Works with any iterable — generators, lists, network sockets:
+
+```python
+# From an in-memory list
+stream = dataweave.run_transform(
+ "output application/json --- payload map ($ * $)",
+ input_stream=[b"[1,2,3,4,5]"],
+ input_mime_type="application/json",
+)
+print(b"".join(stream)) # [1,4,9,16,25]
+```
+
+```python
+# From a generator producing chunks
+def read_from_network(sock):
+ while chunk := sock.recv(4096):
+ yield chunk
+
+stream = dataweave.run_transform(
+ "output application/json --- sizeOf(payload)",
+ input_stream=read_from_network(conn),
+ input_mime_type="application/json",
+)
+for chunk in stream:
+ process(chunk)
+```
+
+### 8) I/O streaming with callbacks (low-level)
+
+Use `run_input_output_callback` when you need direct callback control (e.g. integration with event-driven frameworks). For most use cases, prefer `run_transform` above.
+
+```python
+json_input = b'[1,2,3,4,5]'
+pos = 0
+
+def read_cb(buf_size):
+ nonlocal pos
+ chunk = json_input[pos:pos + buf_size]
+ pos += len(chunk)
+ return chunk # return b"" when done
+
+chunks = []
+def write_cb(data):
+ chunks.append(data)
+ return 0 # 0 = success
+
+result = dataweave.run_input_output_callback(
+ "output application/json deferred=true --- payload map ($ * $)",
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=read_cb,
+ write_callback=write_cb,
+)
+
+print(result) # StreamingResult(success=True, ...)
+print(b"".join(chunks)) # [1,4,9,16,25]
+```
diff --git a/native-lib/build.gradle b/native-lib/build.gradle
new file mode 100644
index 0000000..e495f06
--- /dev/null
+++ b/native-lib/build.gradle
@@ -0,0 +1,116 @@
+dependencies {
+ api group: 'org.mule.weave', name: 'runtime', version: weaveVersion
+ api group: 'org.mule.weave', name: 'core-modules', version: weaveVersion
+
+ implementation group: 'org.mule.weave', name: 'parser', version: weaveVersion
+ implementation group: 'org.mule.weave', name: 'wlang', version: weaveVersion
+ compileOnly group: 'org.graalvm.sdk', name: 'graal-sdk', version: graalvmVersion
+ compileOnly group: 'org.graalvm.nativeimage', name: 'svm', version: graalvmVersion
+
+ implementation "org.scala-lang:scala-library:${scalaVersion}"
+ implementation 'org.json:json:20240303'
+
+ testImplementation platform('org.junit:junit-bom:5.10.0')
+ testImplementation 'org.junit.jupiter:junit-jupiter'
+ testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
+}
+
+test {
+ useJUnitPlatform()
+}
+
+tasks.matching { it.name == 'nativeCompileClasspathJar' }.configureEach { t ->
+ t.exclude('META-INF/services/org.mule.weave.v2.module.DataFormat')
+ t.from("${projectDir}/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat") {
+ into('META-INF/services')
+ rename { 'org.mule.weave.v2.module.DataFormat' }
+ }
+ t.exclude('META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader')
+ t.from("${projectDir}/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader") {
+ into('META-INF/services')
+ rename { 'org.mule.weave.v2.parser.phase.ModuleLoader' }
+ }
+}
+
+// Configure GraalVM native-image to build a shared library
+graalvmNative {
+// toolchainDetection = true
+ binaries {
+ main {
+ sharedLibrary = true
+ debug = true
+ verbose = true
+ fallback = false
+ //agent = false
+ useFatJar = true
+ //buildArgs.add('-Ob') // quick build mode to speed up builds during development
+ buildArgs.add('--no-fallback')
+ buildArgs.add('-H:Name=dwlib')
+ buildArgs.add('--verbose')
+ buildArgs.add("--report-unsupported-elements-at-runtime")
+ buildArgs.add("-J-Xmx6G")
+
+ buildArgs.add("-H:+ReportExceptionStackTraces")
+ buildArgs.add("-H:+UnlockExperimentalVMOptions")
+ buildArgs.add("--initialize-at-build-time=sun.instrument.InstrumentationImpl")
+ buildArgs.add("-H:DeadlockWatchdogInterval=1000")
+ buildArgs.add("-H:CompilationExpirationPeriod=0")
+ buildArgs.add("-H:+AddAllCharsets")
+ buildArgs.add("-H:+IncludeAllLocales")
+ // Pass project directory as system property for header path resolution
+ buildArgs.add("-Dproject.root=${projectDir}")
+ }
+ }
+}
+
+def pythonExe = (project.findProperty('pythonExe') ?: 'python3') as String
+
+tasks.register('stagePythonNativeLib', Copy) {
+ dependsOn tasks.named('nativeCompile')
+ from("${buildDir}/native/nativeCompile") {
+ include('dwlib.*')
+ }
+ into("${projectDir}/python/src/dataweave/native")
+}
+
+tasks.register('buildPythonWheel', Exec) {
+ dependsOn tasks.named('stagePythonNativeLib')
+ workingDir("${projectDir}/python")
+ // Track inputs so Gradle rebuilds when native lib or Python sources change
+ inputs.dir("${projectDir}/python/src/dataweave")
+ inputs.file("${projectDir}/python/setup.py")
+ inputs.file("${projectDir}/python/setup.cfg")
+ inputs.file("${projectDir}/python/pyproject.toml")
+ outputs.dir("${projectDir}/python/dist")
+ doFirst {
+ // Clean old wheels and build artifacts to ensure fresh build
+ delete("${projectDir}/python/dist")
+ delete("${projectDir}/python/build")
+ file("${projectDir}/python/dist").mkdirs()
+ }
+ // Use setup.py bdist_wheel to generate platform-specific wheel tags
+ // The custom setup.py overrides bdist_wheel to set correct platform tags
+ // for the bundled native library (dwlib)
+ commandLine(pythonExe, 'setup.py', 'bdist_wheel', '-d', 'dist')
+}
+
+tasks.register('pythonTest', Exec) {
+ if (project.findProperty('skipPythonTests')?.toString()?.toBoolean() == true) {
+ enabled = false
+ }
+
+ dependsOn tasks.named('stagePythonNativeLib')
+ workingDir("${projectDir}/python")
+ commandLine(pythonExe, 'tests/test_dataweave_module.py')
+}
+
+tasks.named('test') {
+ dependsOn tasks.named('pythonTest')
+}
+
+tasks.named('clean') {
+ delete("${projectDir}/python/dist")
+ delete("${projectDir}/python/build")
+ delete("${projectDir}/python/src/dataweave/native")
+ delete("${projectDir}/python/src/dataweave_native.egg-info")
+}
diff --git a/native-lib/example_dataweave_module.py b/native-lib/example_dataweave_module.py
new file mode 100755
index 0000000..d1740a2
--- /dev/null
+++ b/native-lib/example_dataweave_module.py
@@ -0,0 +1,200 @@
+#!/usr/bin/env python3
+"""
+Example demonstrating the simplified DataWeave Python module.
+
+This shows how easy it is to use DataWeave without dealing with
+any GraalVM or native library complexity.
+"""
+
+import sys
+from pathlib import Path
+
+_PYTHON_SRC_DIR = Path(__file__).resolve().parent / "python" / "src"
+sys.path.insert(0, str(_PYTHON_SRC_DIR))
+
+import dataweave
+
+def example_simple_functions():
+ """Example using simple function API"""
+ print("="*70)
+ print("Example 1: Simple Function API")
+ print("="*70)
+
+ ok = True
+
+ # Simple script execution
+ print("\n[*] Simple arithmetic:")
+ script = "2 + 2"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "4") and ok
+
+ print("\n[*] Square root:")
+ script = "sqrt(144)"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "12") and ok
+
+ print("\n[*] Array operations:")
+ script = "[1, 2, 3] map $ * 2"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "[\n 2, \n 4, \n 6\n]") and ok
+
+ print("\n[*] String operations:")
+ script = "upper('hello world')"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, '"HELLO WORLD"') and ok
+
+ # Script with inputs (simple values - auto-converted)
+ print("\n[*] Script with inputs (auto-converted):")
+ script = "num1 + num2"
+ result = dataweave.run_script(script, {"num1": 25, "num2": 17})
+ ok = assert_result(script, result, "42") and ok
+
+ # Script with complex inputs
+ print("\n[*] Script with complex object:")
+ script = "payload.name"
+ result = dataweave.run_script(script, {"payload": {"content": '{"name": "John", "age": 30}', "mimeType": "application/json"}})
+ ok = assert_result(script, result, '"John"') and ok
+
+ # Script with mixed input types
+ print("\n[*] Script with mixed input types:")
+ script = "greeting ++ ' ' ++ payload.name"
+ result = dataweave.run_script(script, {"greeting": "Hello", "payload": {"content": '{"name": "Alice", "role": "Developer"}', "mimeType": "application/json"}})
+ ok = assert_result(script, result, '"Hello Alice"') and ok
+
+ # Binary output
+ print("\n[*] Binary output:")
+ script = "output application/octet-stream\n---\ndw::core::Binaries::fromBase64(\"holamund\")"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "holamund") and ok
+
+ # Script with InputValue
+ print("\n[*] Inputs:")
+ input_value = dataweave.InputValue(
+ content="1234567",
+ mimeType="application/csv",
+ properties={"header": False, "separator": "4"}
+ )
+ script = "in0.column_1[0]"
+ result = dataweave.run_script(script, {"in0": input_value})
+ ok = assert_result(script, result, '"567"') and ok
+
+ # Cleanup when done
+ dataweave.cleanup()
+ print("\n[OK] Cleanup completed")
+
+ return ok
+
+
+def assert_result(script, result, expected):
+ print(f" {script} = {result}")
+ ok = result.get_string() == expected
+ if ok:
+ status = "[OK]"
+ else:
+ status = f"[FAIL] (expected: {expected})"
+ print(f" result as string = {result.get_string()} {status}")
+ print(f" result as bytes = {result.get_bytes()}")
+ return ok
+
+
+def example_context_manager():
+ """Example using context manager (recommended)"""
+ print("\n" + "="*70)
+ print("Example 2: Context Manager API (Recommended)")
+ print("="*70)
+
+ ok = True
+
+ with dataweave.DataWeave() as dw:
+ print("\n[*] Multiple operations with same runtime:")
+
+ script = "2 + 2"
+ result = dw.run(script)
+ ok = assert_result(script, result, "4") and ok
+
+ script = "x + y + z"
+ result = dw.run(script, {"x": 1, "y": 2, "z": 3})
+ ok = assert_result(script, result, "6") and ok
+
+ script = "numbers map $ * multiplier"
+ result = dw.run(script, {"numbers": [1, 2, 3, 4, 5], "multiplier": 10})
+ ok = assert_result(script, result, "[\n 10, \n 20, \n 30, \n 40, \n 50\n]") and ok
+
+ print("\n[OK] Context manager automatically cleaned up resources")
+
+ return ok
+
+
+def example_explicit_format():
+ """Example using explicit content/mimeType format"""
+ print("\n" + "="*70)
+ print("Example 3: Explicit Format (Advanced)")
+ print("="*70)
+
+ print("\n[*] Using explicit content and mimeType:")
+
+ ok = True
+
+ script = "payload.message"
+ result = dataweave.run_script(script, {"payload": {"content": '{"message": "Hello from JSON!", "value": 42}', "mimeType": "application/json"}})
+ ok = assert_result(script, result, '"Hello from JSON!"') and ok
+
+ script = "payload.value + offset"
+ result = dataweave.run_script(script, {"payload": {"content": '{"value": 100}', "mimeType": "application/json"}, "offset": 50})
+ ok = assert_result(script, result, "150") and ok
+
+ return ok
+
+
+def example_error_handling():
+ """Example with error handling"""
+ print("\n" + "="*70)
+ print("Example 4: Error Handling")
+ print("="*70)
+
+ try:
+ print("\n[*] Invalid script (will show error):")
+ result = dataweave.run_script("invalid syntax here", {})
+ print(f" Result: {result} {'[OK]' if result.success == False else '[FAIL]'}")
+
+ except dataweave.DataWeaveLibraryNotFoundError as e:
+ print(f"[ERROR] Library not found: {e}")
+ print(" Please build the library first: ./gradlew nativeCompile")
+ except dataweave.DataWeaveError as e:
+ print(f"[ERROR] DataWeave error: {e}")
+
+
+def main():
+ """Run all examples"""
+ print("\n" + "="*70)
+ print("DataWeave Python Module - Examples")
+ print("="*70)
+ print("\nThis module abstracts all GraalVM/native complexity!")
+ print("Just import and use - no ctypes, no manual memory management.\n")
+
+ try:
+ all_ok = True
+ all_ok = example_simple_functions() and all_ok
+ all_ok = example_context_manager() and all_ok
+ all_ok = example_explicit_format() and all_ok
+ example_error_handling()
+
+ print("\n" + "="*70)
+ if all_ok:
+ print("[OK] All examples completed successfully!")
+ else:
+ print("[FAIL] One or more examples failed")
+ print("="*70)
+
+ except dataweave.DataWeaveLibraryNotFoundError as e:
+ print(f"\n[ERROR] {e}")
+ print("\nPlease build the native library first:")
+ print(" ./gradlew nativeCompile")
+ except Exception as e:
+ print(f"\n[ERROR] Unexpected error: {e}")
+ import traceback
+ traceback.print_exc()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/native-lib/example_streaming.py b/native-lib/example_streaming.py
new file mode 100755
index 0000000..9e55ba9
--- /dev/null
+++ b/native-lib/example_streaming.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python3
+
+import sys
+from pathlib import Path
+
+_PYTHON_SRC_DIR = Path(__file__).resolve().parent / "python" / "src"
+sys.path.insert(0, str(_PYTHON_SRC_DIR))
+
+import dataweave
+import resource
+import psutil, os
+import time
+
+def example_streaming_input_output_callback():
+ print("\nTesting streaming input and output using callbacks (square numbers)...")
+ try:
+ start_time = time.monotonic()
+ num_elements = 1_000_000 * 50
+
+ script = """output application/json deferred=true
+---
+payload map ($ * $)"""
+
+ # -- input generator (called by native on a background thread) --
+ input_iter = iter(range(num_elements))
+ input_started = False
+ input_done = False
+ pending_token = None
+
+ def read_callback(buf_size):
+ nonlocal input_started, input_done, pending_token
+ if input_done:
+ return b""
+ parts = []
+ if not input_started:
+ parts.append(b"[")
+ input_started = True
+ remaining = buf_size - sum(len(p) for p in parts)
+ if pending_token is not None:
+ if len(pending_token) <= remaining:
+ parts.append(pending_token)
+ remaining -= len(pending_token)
+ pending_token = None
+ else:
+ return b"".join(parts)
+ try:
+ while remaining > 0:
+ i = next(input_iter)
+ token = (b"," if i > 0 else b"") + str(i).encode("utf-8")
+ if len(token) > remaining:
+ pending_token = token
+ break
+ parts.append(token)
+ remaining -= len(token)
+ except StopIteration:
+ parts.append(b"]")
+ input_done = True
+ return b"".join(parts)
+
+ # -- output collector (called by native on the calling thread) --
+ chunk_count = 0
+ total_bytes = 0
+
+ def write_callback(data):
+ nonlocal chunk_count, total_bytes
+ chunk_count += 1
+ total_bytes += len(data)
+ if chunk_count % 5000 == 0:
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f"--- chunk {chunk_count}: {len(data)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+ return 0
+
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f">>> Before run_input_output_callback, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+
+ result = dataweave.run_input_output_callback(
+ script,
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=read_callback,
+ write_callback=write_callback,
+ input_charset="utf-8",
+ )
+
+ if not result.success:
+ raise Exception(result.error or "Unknown error")
+
+ peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576
+ elapsed = time.monotonic() - start_time
+ mins, secs = divmod(elapsed, 60)
+ print(f"\n[OK] Streaming input/output callback done ({chunk_count} chunks, {total_bytes / 1048576:.1f} MB, {num_elements:,} elements) - Time: {int(mins)}:{secs:06.3f}")
+ print(f"Peak memory (max RSS): {peak_rss:.1f} MB")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming input/output callback failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+
+def example_streaming_run_transform():
+ print("\nTesting streaming input and output using run_transform (square numbers)...")
+ try:
+ start_time = time.monotonic()
+ num_elements = 1_000_000 * 50
+
+ script = """output application/json deferred=true
+---
+payload map ($ * $)"""
+
+ # -- input as a generator of byte chunks --
+ def input_chunks():
+ buf_size = 8192
+ input_iter = iter(range(num_elements))
+ started = False
+ done = False
+ pending_token = None
+
+ while not done:
+ parts = []
+ if not started:
+ parts.append(b"[")
+ started = True
+ remaining = buf_size - sum(len(p) for p in parts)
+ if pending_token is not None:
+ if len(pending_token) <= remaining:
+ parts.append(pending_token)
+ remaining -= len(pending_token)
+ pending_token = None
+ else:
+ yield b"".join(parts)
+ continue
+ try:
+ while remaining > 0:
+ i = next(input_iter)
+ token = (b"," if i > 0 else b"") + str(i).encode("utf-8")
+ if len(token) > remaining:
+ pending_token = token
+ break
+ parts.append(token)
+ remaining -= len(token)
+ except StopIteration:
+ parts.append(b"]")
+ done = True
+ if parts:
+ yield b"".join(parts)
+
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f">>> Before run_transform, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+
+ stream = dataweave.run_transform(
+ script,
+ input_stream=input_chunks(),
+ input_mime_type="application/json",
+ input_charset="utf-8",
+ )
+
+ chunk_count = 0
+ total_bytes = 0
+
+ for data in stream:
+ chunk_count += 1
+ total_bytes += len(data)
+ if chunk_count % 5000 == 0:
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f"--- chunk {chunk_count}: {len(data)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+
+ if not stream.metadata.success:
+ raise Exception(stream.metadata.error)
+
+ peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576
+ elapsed = time.monotonic() - start_time
+ mins, secs = divmod(elapsed, 60)
+ print(f"\n[OK] run_transform done ({chunk_count} chunks, {total_bytes / 1048576:.1f} MB, {num_elements:,} elements) - Time: {int(mins)}:{secs:06.3f}")
+ print(f"Peak memory (max RSS): {peak_rss:.1f} MB")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_transform failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+
+def doc_example():
+ json_input = b'[1,2,3,4,5]'
+ pos = 0
+
+ def read_cb(buf_size):
+ nonlocal pos
+ chunk = json_input[pos:pos + buf_size]
+ pos += len(chunk)
+ return chunk # return b"" when done
+
+ chunks = []
+ def write_cb(data):
+ chunks.append(data)
+ return 0 # 0 = success
+
+ result = dataweave.run_input_output_callback(
+ "output application/json deferred=true --- payload map ($ * $)",
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=read_cb,
+ write_callback=write_cb,
+ )
+
+ print(result) # {"success": True}
+ print(b"".join(chunks)) # [1,4,9,16,25]
+
+
+def main():
+ print("=" * 70)
+ print("run_input_output_callback (low-level callbacks)")
+ print("=" * 70)
+ example_streaming_input_output_callback()
+
+ print("\n")
+ print("=" * 70)
+ print("run_transform (Pythonic generator API)")
+ print("=" * 70)
+ example_streaming_run_transform()
+
+ print("\n")
+ doc_example()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/native-lib/python/pyproject.toml b/native-lib/python/pyproject.toml
new file mode 100644
index 0000000..642ab3c
--- /dev/null
+++ b/native-lib/python/pyproject.toml
@@ -0,0 +1,3 @@
+[build-system]
+requires = ["setuptools>=68", "wheel"]
+build-backend = "setuptools.build_meta"
diff --git a/native-lib/python/setup.cfg b/native-lib/python/setup.cfg
new file mode 100644
index 0000000..a1635ae
--- /dev/null
+++ b/native-lib/python/setup.cfg
@@ -0,0 +1,18 @@
+[metadata]
+name = dataweave-native
+version = 0.0.1
+description = Python bindings for the DataWeave native library
+
+[options]
+package_dir =
+ = src
+packages = find:
+include_package_data = True
+python_requires = >=3.9
+
+[options.packages.find]
+where = src
+
+[options.package_data]
+dataweave =
+ native/*
diff --git a/native-lib/python/setup.py b/native-lib/python/setup.py
new file mode 100644
index 0000000..8c32d49
--- /dev/null
+++ b/native-lib/python/setup.py
@@ -0,0 +1,109 @@
+"""
+Custom setup.py to build platform-specific wheels for dataweave-native.
+
+Since this package bundles a native shared library (dwlib), the wheel must
+be tagged with the correct platform (e.g., macosx_11_0_arm64, manylinux, etc.)
+rather than the generic 'any' platform.
+
+This is achieved by overriding the bdist_wheel command to set platform-specific
+tags based on the current build environment.
+"""
+
+import platform
+import struct
+import sys
+
+from setuptools import setup
+
+try:
+ from wheel.bdist_wheel import bdist_wheel as _bdist_wheel
+except ImportError:
+ _bdist_wheel = None
+
+
+def get_platform_tag():
+ """
+ Determine the platform tag for the wheel based on the current system.
+ """
+ system = platform.system().lower()
+ machine = platform.machine().lower()
+
+ # Normalize machine architecture names
+ if machine in ("x86_64", "amd64"):
+ machine = "x86_64"
+ elif machine in ("arm64", "aarch64"):
+ machine = "arm64"
+ elif machine in ("i386", "i686"):
+ machine = "i686"
+
+ if system == "darwin":
+ # macOS: use macosx_11_0 as minimum for universal compatibility
+ # Adjust based on actual deployment target if needed
+ mac_ver = platform.mac_ver()[0]
+ if mac_ver:
+ parts = mac_ver.split(".")
+ major = int(parts[0])
+ minor = int(parts[1]) if len(parts) > 1 else 0
+ # Use at least 11.0 for arm64, 10.9 for x86_64
+ if machine == "arm64":
+ major = max(major, 11)
+ minor = 0
+ else:
+ major = max(major, 10)
+ minor = max(minor, 9) if major == 10 else 0
+ else:
+ major, minor = (11, 0) if machine == "arm64" else (10, 9)
+
+ return f"macosx_{major}_{minor}_{machine}"
+
+ elif system == "linux":
+ # Linux: use manylinux2014 for broad compatibility
+ # manylinux2014 supports glibc 2.17+
+ return f"manylinux2014_{machine}"
+
+ elif system == "windows":
+ # Windows: win_amd64 or win32
+ bits = struct.calcsize("P") * 8
+ if bits == 64:
+ return "win_amd64"
+ else:
+ return "win32"
+
+ else:
+ # Fallback: use the platform module's platform tag
+ return None
+
+
+if _bdist_wheel is not None:
+
+ class bdist_wheel(_bdist_wheel):
+ """
+ Custom bdist_wheel that forces platform-specific tags for native library wheels.
+ """
+
+ def finalize_options(self):
+ super().finalize_options()
+ # Mark as platform-specific (not pure Python)
+ self.root_is_pure = False
+
+ def get_tag(self):
+ # Get the default tags
+ python, abi, plat = super().get_tag()
+
+ # Override with platform-specific tag
+ platform_tag = get_platform_tag()
+ if platform_tag:
+ plat = platform_tag
+
+ # Use py3 and none for Python/ABI since we don't have compiled Python extensions
+ return "py3", "none", plat
+
+else:
+ bdist_wheel = None
+
+
+cmdclass = {}
+if bdist_wheel is not None:
+ cmdclass["bdist_wheel"] = bdist_wheel
+
+setup(cmdclass=cmdclass)
diff --git a/native-lib/python/src/dataweave/__init__.py b/native-lib/python/src/dataweave/__init__.py
new file mode 100644
index 0000000..977361e
--- /dev/null
+++ b/native-lib/python/src/dataweave/__init__.py
@@ -0,0 +1,1002 @@
+"""
+DataWeave Python Module
+
+Execute DataWeave scripts from Python via a GraalVM native shared library.
+Supports buffered execution, output streaming, and bidirectional streaming
+with constant memory overhead.
+
+Basic usage:
+ import dataweave
+
+ result = dataweave.run("2 + 2")
+ print(result.get_string()) # "4"
+
+Output streaming (yields chunks as produced):
+ stream = dataweave.run_streaming("output json --- (1 to 10000) map {id: $}")
+ for chunk in stream:
+ sys.stdout.buffer.write(chunk)
+
+Bidirectional streaming (iterable in, generator out):
+ with open("large.json", "rb") as f:
+ stream = dataweave.run_transform(
+ "output csv --- payload",
+ input_stream=iter(lambda: f.read(8192), b""),
+ input_mime_type="application/json",
+ )
+ for chunk in stream:
+ process(chunk)
+
+Context manager (explicit lifecycle control):
+ from dataweave import DataWeave
+
+ with DataWeave() as dw:
+ result = dw.run("2 + 2")
+ print(result.get_string())
+
+Error handling:
+ try:
+ result = dataweave.run("invalid", raise_on_error=True)
+ except dataweave.DataWeaveScriptError as e:
+ print(e.result.error)
+
+Native resources are released automatically at interpreter exit via atexit.
+Call dataweave.cleanup() to release them earlier if needed.
+"""
+
+import base64
+import ctypes
+import json
+from dataclasses import dataclass
+from pathlib import Path
+from queue import Queue
+from threading import Thread
+from typing import Any, Callable, Dict, Generator, Iterable, Optional, Union
+
+
+class DataWeaveError(Exception):
+ pass
+
+
+class DataWeaveScriptError(DataWeaveError):
+ """Raised when a DataWeave script fails (compile or runtime error).
+
+ Carries the full result object so callers can inspect details.
+ """
+
+ def __init__(self, result):
+ self.result = result
+ super().__init__(result.error or "Script execution failed")
+
+
+class DataWeaveLibraryNotFoundError(Exception):
+ pass
+
+
+# ctypes callback signatures matching NativeCallbacks.WriteCallback / ReadCallback.
+# Buffer parameters use c_void_p (not c_char_p) because ctypes gives c_char_p
+# special treatment that prevents writing into the buffer.
+# int (*WriteCallback)(void *ctx, const char *buffer, int length)
+WRITE_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int)
+# int (*ReadCallback)(void *ctx, char *buffer, int bufferSize)
+READ_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int)
+
+
+WriteCallback = Callable[[bytes], int]
+ReadCallback = Callable[[int], bytes]
+
+_ENV_NATIVE_LIB = "DATAWEAVE_NATIVE_LIB"
+
+
+@dataclass
+class InputValue:
+ content: Union[str, bytes]
+ mime_type: Optional[str] = None
+ charset: Optional[str] = None
+ properties: Optional[Dict[str, Union[str, int, bool]]] = None
+
+ def encode_content(self) -> str:
+ if isinstance(self.content, bytes):
+ raw = self.content
+ else:
+ raw = self.content.encode(self.charset or "utf-8")
+ return base64.b64encode(raw).decode("ascii")
+
+
+@dataclass(repr=False)
+class ExecutionResult:
+ success: bool
+ result: Optional[str]
+ error: Optional[str]
+ binary: bool
+ mime_type: Optional[str]
+ charset: Optional[str]
+
+ def __repr__(self):
+ if not self.success:
+ return f"ExecutionResult(success=False, error={self.error!r})"
+ preview = (self.result[:50] + "...") if self.result and len(self.result) > 50 else self.result
+ return f"ExecutionResult(success=True, mime_type={self.mime_type!r}, charset={self.charset!r}, result={preview!r})"
+
+ def get_bytes(self) -> Optional[bytes]:
+ if not self.success or self.result is None:
+ return None
+ return base64.b64decode(self.result)
+
+ def get_string(self) -> Optional[str]:
+ if not self.success or self.result is None:
+ return None
+ if self.binary:
+ return self.result
+ return self.get_bytes().decode(self.charset or "utf-8")
+
+
+@dataclass
+class StreamingResult:
+ """Metadata returned after a streaming execution completes."""
+ success: bool
+ error: Optional[str]
+ mime_type: Optional[str]
+ charset: Optional[str]
+ binary: bool
+
+
+class Stream:
+ """Wrapper around a streaming generator that captures metadata.
+
+ Iterate to consume output chunks. After iteration completes,
+ access ``.metadata`` for the :class:`StreamingResult`.
+ """
+
+ def __init__(self, gen: Generator[bytes, None, StreamingResult]):
+ self._gen = gen
+ self._metadata: Optional[StreamingResult] = None
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> bytes:
+ try:
+ return next(self._gen)
+ except StopIteration as e:
+ self._metadata = e.value
+ raise
+
+ @property
+ def metadata(self) -> Optional[StreamingResult]:
+ return self._metadata
+
+
+def _parse_native_encoded_response(raw: str) -> ExecutionResult:
+ if raw is None:
+ return ExecutionResult(False, None, "Native returned null", False, None, None)
+
+ if raw == "":
+ return ExecutionResult(False, None, "Native returned empty response", False, None, None)
+
+ try:
+ parsed = json.loads(raw)
+ except Exception as e:
+ return ExecutionResult(False, None, f"Failed to parse native JSON response: {e}", False, None, None)
+
+ if not isinstance(parsed, dict):
+ return ExecutionResult(False, None, "Native response JSON is not an object", False, None, None)
+
+ success = bool(parsed.get("success", False))
+ if not success:
+ return ExecutionResult(False, None, parsed.get("error"), False, None, None)
+
+ return ExecutionResult(
+ success=True,
+ result=parsed.get("result"),
+ error=None,
+ binary=bool(parsed.get("binary", False)),
+ mime_type=parsed.get("mimeType"),
+ charset=parsed.get("charset"),
+ )
+
+
+def _parse_streaming_result(meta: dict) -> StreamingResult:
+ success = meta.get("success", False)
+ if not success:
+ return StreamingResult(
+ success=False,
+ error=meta.get("error"),
+ mime_type=None,
+ charset=None,
+ binary=False,
+ )
+ return StreamingResult(
+ success=True,
+ error=None,
+ mime_type=meta.get("mimeType"),
+ charset=meta.get("charset"),
+ binary=meta.get("binary", False),
+ )
+
+
+def _candidate_library_paths() -> list[Path]:
+ paths: list[Path] = []
+
+ env_value = (__import__("os").environ.get(_ENV_NATIVE_LIB) or "").strip()
+ if env_value:
+ paths.append(Path(env_value))
+
+ pkg_dir = Path(__file__).resolve().parent
+ native_dir = pkg_dir / "native"
+ paths.append(native_dir / "dwlib.dylib")
+ paths.append(native_dir / "dwlib.so")
+ paths.append(native_dir / "dwlib.dll")
+
+ # Dev fallback: if this package is being used from the data-weave-cli repo
+ # tree, locate native-lib/build/native/nativeCompile.
+ for parent in pkg_dir.parents:
+ build_dir = parent / "build" / "native" / "nativeCompile"
+ if build_dir.exists():
+ paths.append(build_dir / "dwlib.dylib")
+ paths.append(build_dir / "dwlib.so")
+ paths.append(build_dir / "dwlib.dll")
+ break
+
+ # CWD fallback
+ paths.append(Path("dwlib.dylib"))
+ paths.append(Path("dwlib.so"))
+ paths.append(Path("dwlib.dll"))
+
+ return paths
+
+
+def _find_library() -> str:
+ for p in _candidate_library_paths():
+ if p.exists() and p.is_file():
+ return str(p)
+
+ raise DataWeaveLibraryNotFoundError(
+ "Could not find DataWeave native library (dwlib). "
+ f"Set {_ENV_NATIVE_LIB} to an absolute path or install a wheel that bundles the native library."
+ )
+
+
+def _normalize_input_value(value: Any, mime_type: Optional[str] = None) -> Dict[str, Any]:
+ if isinstance(value, dict):
+ allowed_keys = {"content", "mimeType", "charset", "properties"}
+ extra_keys = set(value.keys()) - allowed_keys
+ if extra_keys:
+ raise DataWeaveError(
+ "Explicit input dict contains unsupported keys: " + ", ".join(sorted(extra_keys))
+ )
+
+ if "content" in value or "mimeType" in value:
+ if "content" not in value or "mimeType" not in value:
+ raise DataWeaveError(
+ "Explicit input dict must include both 'content' and 'mimeType'"
+ )
+
+ raw_content = value.get("content")
+ charset = value.get("charset") or "utf-8"
+ if isinstance(raw_content, bytes):
+ encoded_content = base64.b64encode(raw_content).decode("ascii")
+ else:
+ encoded_content = base64.b64encode(str(raw_content).encode(charset)).decode("ascii")
+
+ normalized: Dict[str, Any] = {
+ "content": encoded_content,
+ "mimeType": value.get("mimeType"),
+ }
+ if "charset" in value:
+ normalized["charset"] = value.get("charset")
+ if "properties" in value:
+ normalized["properties"] = value.get("properties")
+ return normalized
+
+ if isinstance(value, InputValue):
+ out: Dict[str, Any] = {
+ "content": value.encode_content(),
+ "mimeType": value.mime_type or mime_type,
+ }
+ if value.charset is not None:
+ out["charset"] = value.charset
+ if value.properties is not None:
+ out["properties"] = value.properties
+ return out
+
+ if isinstance(value, str):
+ content = value
+ default_mime = "text/plain"
+ elif isinstance(value, (int, float, bool)):
+ content = json.dumps(value)
+ default_mime = "application/json"
+ elif value is None:
+ content = "null"
+ default_mime = "application/json"
+ else:
+ try:
+ content = json.dumps(value)
+ default_mime = "application/json"
+ except (TypeError, ValueError):
+ content = str(value)
+ default_mime = "text/plain"
+
+ charset = "utf-8"
+ encoded_content = base64.b64encode(content.encode(charset)).decode("ascii")
+
+ return {
+ "content": encoded_content,
+ "mimeType": mime_type or default_mime,
+ "charset": charset,
+ }
+
+
+class DataWeave:
+ def __init__(self, lib_path: Optional[str] = None):
+ self._lib_path = lib_path or _find_library()
+ self._lib = None
+ self._isolate = None
+ self._thread = None
+ self._initialized = False
+
+ def _load_library(self):
+ try:
+ self._lib = ctypes.CDLL(self._lib_path)
+ except OSError as e:
+ raise DataWeaveError(f"Failed to load library from {self._lib_path}: {e}")
+
+ def _setup_graal_structures(self):
+ class graal_isolate_t(ctypes.Structure):
+ pass
+
+ class graal_isolatethread_t(ctypes.Structure):
+ pass
+
+ self._graal_isolate_t_ptr = ctypes.POINTER(graal_isolate_t)
+ self._graal_isolatethread_t_ptr = ctypes.POINTER(graal_isolatethread_t)
+
+ def _create_isolate(self):
+ self._lib.graal_create_isolate.argtypes = [
+ ctypes.c_void_p,
+ ctypes.POINTER(self._graal_isolate_t_ptr),
+ ctypes.POINTER(self._graal_isolatethread_t_ptr),
+ ]
+ self._lib.graal_create_isolate.restype = ctypes.c_int
+
+ self._isolate = self._graal_isolate_t_ptr()
+ self._thread = self._graal_isolatethread_t_ptr()
+
+ result = self._lib.graal_create_isolate(None, ctypes.byref(self._isolate), ctypes.byref(self._thread))
+ if result != 0:
+ raise DataWeaveError(f"Failed to create GraalVM isolate. Error code: {result}")
+
+ def _setup_functions(self):
+ if not hasattr(self._lib, "run_script"):
+ raise DataWeaveError("Native library does not export run_script")
+
+ self._lib.run_script.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ]
+ self._lib.run_script.restype = ctypes.c_void_p
+
+ if hasattr(self._lib, "free_cstring"):
+ self._lib.free_cstring.argtypes = [self._graal_isolatethread_t_ptr, ctypes.c_void_p]
+ self._lib.free_cstring.restype = None
+
+ # Thread attachment for background threads
+ if hasattr(self._lib, "graal_attach_thread"):
+ self._lib.graal_attach_thread.argtypes = [self._graal_isolate_t_ptr, ctypes.POINTER(self._graal_isolatethread_t_ptr)]
+ self._lib.graal_attach_thread.restype = ctypes.c_int
+ if hasattr(self._lib, "graal_detach_thread"):
+ self._lib.graal_detach_thread.argtypes = [self._graal_isolatethread_t_ptr]
+ self._lib.graal_detach_thread.restype = ctypes.c_int
+ if hasattr(self._lib, "graal_tear_down_isolate"):
+ self._lib.graal_tear_down_isolate.argtypes = [self._graal_isolatethread_t_ptr]
+ self._lib.graal_tear_down_isolate.restype = ctypes.c_int
+
+ # Callback-based Streaming API
+ if hasattr(self._lib, "run_script_callback"):
+ self._lib.run_script_callback.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ WRITE_CALLBACK,
+ ctypes.c_void_p,
+ ]
+ self._lib.run_script_callback.restype = ctypes.c_void_p
+
+ self._has_callback_streaming = True
+ else:
+ self._has_callback_streaming = False
+
+ if hasattr(self._lib, "run_script_input_output_callback"):
+ self._lib.run_script_input_output_callback.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ READ_CALLBACK,
+ WRITE_CALLBACK,
+ ctypes.c_void_p,
+ ]
+ self._lib.run_script_input_output_callback.restype = ctypes.c_void_p
+
+ self._has_callback_input_output = True
+ else:
+ self._has_callback_input_output = False
+
+ def _decode_and_free(self, ptr: Optional[int]) -> str:
+ if not ptr:
+ return ""
+
+ try:
+ result_bytes = ctypes.string_at(ptr)
+ return result_bytes.decode("utf-8")
+ finally:
+ if self._lib is not None and hasattr(self._lib, "free_cstring"):
+ self._lib.free_cstring(self._thread, ptr)
+
+ def initialize(self):
+ if self._initialized:
+ return
+
+ self._load_library()
+ self._setup_graal_structures()
+ self._create_isolate()
+ self._setup_functions()
+ self._initialized = True
+
+ def cleanup(self):
+ if not self._initialized:
+ return
+
+ if hasattr(self._lib, "graal_tear_down_isolate") and self._thread:
+ try:
+ self._lib.graal_tear_down_isolate(self._thread)
+ except Exception:
+ pass
+ elif hasattr(self._lib, "graal_detach_thread") and self._thread:
+ try:
+ self._lib.graal_detach_thread(self._thread)
+ except Exception:
+ pass
+
+ self._initialized = False
+ self._thread = None
+ self._isolate = None
+ self._lib = None
+
+ def run_callback(
+ self,
+ script: str,
+ write_callback: WriteCallback,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> StreamingResult:
+ """Execute a DataWeave script and stream the output via a write callback.
+
+ The native side reads the output internally and invokes *write_callback*
+ for each chunk.
+
+ :param script: the DataWeave script source
+ :param write_callback: callable ``(data: bytes) -> int`` invoked with each
+ output chunk. Must return ``0`` on success or non-zero to abort.
+ :param inputs: optional input bindings (same format as :meth:`run`)
+ :return: a :class:`StreamingResult` with metadata
+ :raises DataWeaveError: if the runtime is not initialized or the callback API
+ is not available
+ """
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_callback_streaming:
+ raise DataWeaveError(
+ "Native library does not support callback streaming API (run_script_callback not found)."
+ )
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()}
+ inputs_json = json.dumps(normalized_inputs)
+
+ @WRITE_CALLBACK
+ def _write_cb(_ctx, buf, length):
+ try:
+ data = ctypes.string_at(buf, length)
+ return write_callback(data)
+ except Exception:
+ return -1
+
+ try:
+ result_ptr = self._lib.run_script_callback(
+ self._thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ _write_cb,
+ None,
+ )
+ raw = self._decode_and_free(result_ptr)
+ meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"}
+ except Exception as e:
+ raise DataWeaveError(f"Failed to execute callback streaming: {e}")
+
+ return _parse_streaming_result(meta)
+
+ def run_streaming(
+ self,
+ script: str,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> Stream:
+ """Execute a DataWeave script and yield output chunks as they arrive.
+
+ Chunks are yielded in real-time as the native engine produces them,
+ using a background thread and queue. The caller sees data before the
+ script finishes executing.
+
+ Usage::
+
+ with DataWeave() as dw:
+ stream = dw.run_streaming("output json --- {items: (1 to 100)}")
+ for chunk in stream:
+ sys.stdout.buffer.write(chunk)
+ metadata = stream.metadata # StreamingResult with mime_type, charset, etc.
+
+ :param script: the DataWeave script source
+ :param inputs: optional input bindings (same format as :meth:`run`)
+ :return: a :class:`Stream` yielding ``bytes`` chunks; after iteration,
+ ``.metadata`` holds a :class:`StreamingResult`
+ :raises DataWeaveError: if the runtime is not initialized or the callback API
+ is not available
+ """
+ return Stream(self._run_streaming_gen(script, inputs))
+
+ def _run_streaming_gen(
+ self,
+ script: str,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> Generator[bytes, None, StreamingResult]:
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_callback_streaming:
+ raise DataWeaveError(
+ "Native library does not support callback streaming API (run_script_callback not found)."
+ )
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()}
+ inputs_json = json.dumps(normalized_inputs)
+
+ _SENTINEL = object()
+ q: Queue = Queue()
+
+ @WRITE_CALLBACK
+ def _write_cb(_ctx, buf, length):
+ try:
+ q.put(ctypes.string_at(buf, length))
+ return 0
+ except Exception:
+ return -1
+
+ def _run_native():
+ worker_thread = self._graal_isolatethread_t_ptr()
+ rc = self._lib.graal_attach_thread(self._isolate, ctypes.byref(worker_thread))
+ if rc != 0:
+ q.put({"success": False, "error": f"Failed to attach worker thread to isolate (code {rc})"})
+ q.put(_SENTINEL)
+ return
+ try:
+ result_ptr = self._lib.run_script_callback(
+ worker_thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ _write_cb,
+ None,
+ )
+ raw_ptr = result_ptr
+ if raw_ptr:
+ raw = ctypes.string_at(raw_ptr).decode("utf-8")
+ self._lib.free_cstring(worker_thread, raw_ptr)
+ else:
+ raw = ""
+ meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"}
+ q.put(meta)
+ except Exception as e:
+ q.put({"success": False, "error": str(e)})
+ finally:
+ self._lib.graal_detach_thread(worker_thread)
+ q.put(_SENTINEL)
+
+ worker = Thread(target=_run_native, name="dw-streaming-worker", daemon=True)
+ worker.start()
+
+ meta = None
+ while True:
+ item = q.get()
+ if item is _SENTINEL:
+ break
+ if isinstance(item, dict):
+ meta = item
+ else:
+ yield item
+
+ worker.join()
+
+ if meta is None:
+ meta = {"success": False, "error": "No metadata received from native call"}
+
+ success = meta.get("success", False)
+ if not success:
+ return StreamingResult(
+ success=False,
+ error=meta.get("error"),
+ mime_type=None,
+ charset=None,
+ binary=False,
+ )
+
+ return StreamingResult(
+ success=True,
+ error=None,
+ mime_type=meta.get("mimeType"),
+ charset=meta.get("charset"),
+ binary=meta.get("binary", False),
+ )
+
+ def run_transform(
+ self,
+ script: str,
+ input_stream: Iterable[bytes],
+ input_name: str = "payload",
+ input_mime_type: str = "application/json",
+ input_charset: Optional[str] = None,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> Stream:
+ """Execute a DataWeave script with streaming input and output.
+
+ Input data is pulled from *input_stream* (any iterable of bytes) and
+ output chunks are yielded as they are produced — fully streaming in
+ both directions with constant memory overhead.
+
+ Usage::
+
+ with DataWeave() as dw:
+ with open("large.json", "rb") as f:
+ stream = dw.run_transform(
+ "output application/csv --- payload",
+ input_stream=iter(lambda: f.read(8192), b""),
+ input_mime_type="application/json",
+ )
+ for chunk in stream:
+ sys.stdout.buffer.write(chunk)
+ metadata = stream.metadata
+
+ :param script: the DataWeave script source
+ :param input_stream: iterable yielding ``bytes`` chunks for the input binding
+ :param input_name: binding name for the streamed input (default ``"payload"``)
+ :param input_mime_type: MIME type of the streamed input
+ :param input_charset: charset of the streamed input (default UTF-8)
+ :param inputs: optional additional input bindings (same format as :meth:`run`)
+ :return: a :class:`Stream` yielding ``bytes`` output chunks; after iteration,
+ ``.metadata`` holds a :class:`StreamingResult`
+ :raises DataWeaveError: if the runtime is not initialized or the API is missing
+ """
+ return Stream(self._run_transform_gen(
+ script, input_stream, input_name, input_mime_type, input_charset, inputs,
+ ))
+
+ def _run_transform_gen(
+ self,
+ script: str,
+ input_stream: Iterable[bytes],
+ input_name: str = "payload",
+ input_mime_type: str = "application/json",
+ input_charset: Optional[str] = None,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> Generator[bytes, None, StreamingResult]:
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_callback_input_output:
+ raise DataWeaveError(
+ "Native library does not support callback input/output API "
+ "(run_script_input_output_callback not found)."
+ )
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()}
+ inputs_json = json.dumps(normalized_inputs)
+
+ _SENTINEL = object()
+ q: Queue = Queue()
+
+ @WRITE_CALLBACK
+ def _write_cb(_ctx, buf, length):
+ try:
+ q.put(ctypes.string_at(buf, length))
+ return 0
+ except Exception:
+ return -1
+
+ input_iter = iter(input_stream)
+
+ @READ_CALLBACK
+ def _read_cb(_ctx, buf, buf_size):
+ try:
+ data = next(input_iter, b"")
+ if not data:
+ return 0
+ n = min(len(data), buf_size)
+ ctypes.memmove(buf, data, n)
+ return n
+ except Exception:
+ return -1
+
+ def _run_native():
+ worker_thread = self._graal_isolatethread_t_ptr()
+ rc = self._lib.graal_attach_thread(self._isolate, ctypes.byref(worker_thread))
+ if rc != 0:
+ q.put({"success": False, "error": f"Failed to attach worker thread to isolate (code {rc})"})
+ q.put(_SENTINEL)
+ return
+ try:
+ result_ptr = self._lib.run_script_input_output_callback(
+ worker_thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ input_name.encode("utf-8"),
+ input_mime_type.encode("utf-8"),
+ input_charset.encode("utf-8") if input_charset else None,
+ _read_cb,
+ _write_cb,
+ None,
+ )
+ if result_ptr:
+ raw = ctypes.string_at(result_ptr).decode("utf-8")
+ self._lib.free_cstring(worker_thread, result_ptr)
+ else:
+ raw = ""
+ meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"}
+ q.put(meta)
+ except Exception as e:
+ q.put({"success": False, "error": str(e)})
+ finally:
+ self._lib.graal_detach_thread(worker_thread)
+ q.put(_SENTINEL)
+
+ worker = Thread(target=_run_native, name="dw-transform-worker", daemon=True)
+ worker.start()
+
+ meta = None
+ while True:
+ item = q.get()
+ if item is _SENTINEL:
+ break
+ if isinstance(item, dict):
+ meta = item
+ else:
+ yield item
+
+ worker.join()
+
+ if meta is None:
+ meta = {"success": False, "error": "No metadata received from native call"}
+
+ success = meta.get("success", False)
+ if not success:
+ return StreamingResult(
+ success=False,
+ error=meta.get("error"),
+ mime_type=None,
+ charset=None,
+ binary=False,
+ )
+
+ return StreamingResult(
+ success=True,
+ error=None,
+ mime_type=meta.get("mimeType"),
+ charset=meta.get("charset"),
+ binary=meta.get("binary", False),
+ )
+
+ def run_input_output_callback(
+ self,
+ script: str,
+ input_name: str,
+ input_mime_type: str,
+ read_callback: ReadCallback,
+ write_callback: WriteCallback,
+ input_charset: Optional[str] = None,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> StreamingResult:
+ """Execute a DataWeave script with callback-driven input *and* output streaming.
+
+ The native side calls *read_callback* on a background thread to pull input
+ data for the binding named *input_name*, and calls *write_callback* on the
+ calling thread to push output chunks.
+
+ :param script: the DataWeave script source
+ :param input_name: the binding name for the callback-supplied input
+ :param input_mime_type: MIME type of the callback-supplied input
+ :param read_callback: callable ``(buf_size: int) -> bytes`` returning the
+ next chunk, empty bytes ``b""`` on EOF, or raising on error
+ :param write_callback: callable ``(data: bytes) -> int`` returning ``0`` on
+ success or non-zero to abort
+ :param input_charset: charset of the callback-supplied input (default UTF-8)
+ :param inputs: optional additional input bindings
+ :return: a :class:`StreamingResult` with metadata
+ :raises DataWeaveError: if the runtime is not initialized or the API is missing
+ """
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_callback_input_output:
+ raise DataWeaveError(
+ "Native library does not support callback input/output API "
+ "(run_script_input_output_callback not found)."
+ )
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()}
+ inputs_json = json.dumps(normalized_inputs)
+
+ @READ_CALLBACK
+ def _read_cb(_ctx, buf, buf_size):
+ try:
+ data = read_callback(buf_size)
+ if not data:
+ return 0 # EOF
+ n = min(len(data), buf_size)
+ ctypes.memmove(buf, data, n)
+ return n
+ except Exception:
+ return -1
+
+ @WRITE_CALLBACK
+ def _write_cb(_ctx, buf, length):
+ try:
+ data = ctypes.string_at(buf, length)
+ return write_callback(data)
+ except Exception:
+ return -1
+
+ try:
+ result_ptr = self._lib.run_script_input_output_callback(
+ self._thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ input_name.encode("utf-8"),
+ input_mime_type.encode("utf-8"),
+ input_charset.encode("utf-8") if input_charset else None,
+ _read_cb,
+ _write_cb,
+ None,
+ )
+ raw = self._decode_and_free(result_ptr)
+ meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"}
+ except Exception as e:
+ raise DataWeaveError(f"Failed to execute callback input/output streaming: {e}")
+
+ return _parse_streaming_result(meta)
+
+ def run(self, script: str, inputs: Optional[Dict[str, Any]] = None, raise_on_error: bool = False) -> ExecutionResult:
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()}
+ inputs_json = json.dumps(normalized_inputs)
+
+ try:
+ result_ptr = self._lib.run_script(
+ self._thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ )
+ raw = self._decode_and_free(result_ptr)
+ result = _parse_native_encoded_response(raw)
+ except Exception as e:
+ raise DataWeaveError(f"Failed to execute script: {e}")
+
+ if raise_on_error and not result.success:
+ raise DataWeaveScriptError(result)
+ return result
+
+ def __enter__(self):
+ self.initialize()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.cleanup()
+ return False
+
+
+_global_instance: Optional[DataWeave] = None
+
+
+def _get_global_instance() -> DataWeave:
+ global _global_instance
+ if _global_instance is None:
+ import atexit
+ _global_instance = DataWeave()
+ _global_instance.initialize()
+ atexit.register(cleanup)
+ return _global_instance
+
+
+def run(script: str, inputs: Optional[Dict[str, Any]] = None, raise_on_error: bool = False) -> ExecutionResult:
+ return _get_global_instance().run(script, inputs, raise_on_error=raise_on_error)
+
+
+def run_streaming(
+ script: str, inputs: Optional[Dict[str, Any]] = None,
+) -> Stream:
+ """Execute a script and yield output chunks. See :meth:`DataWeave.run_streaming`."""
+ return _get_global_instance().run_streaming(script, inputs)
+
+
+def run_callback(script: str, write_callback: WriteCallback, inputs: Optional[Dict[str, Any]] = None) -> StreamingResult:
+ """Execute a script and stream output via a write callback. See :meth:`DataWeave.run_callback`."""
+ return _get_global_instance().run_callback(script, write_callback, inputs)
+
+
+def run_transform(
+ script: str,
+ input_stream: Iterable[bytes],
+ input_name: str = "payload",
+ input_mime_type: str = "application/json",
+ input_charset: Optional[str] = None,
+ inputs: Optional[Dict[str, Any]] = None,
+) -> Stream:
+ """Execute a script with streaming input and output. See :meth:`DataWeave.run_transform`."""
+ return _get_global_instance().run_transform(
+ script, input_stream, input_name, input_mime_type, input_charset, inputs,
+ )
+
+
+def run_input_output_callback(
+ script: str,
+ input_name: str,
+ input_mime_type: str,
+ read_callback: ReadCallback,
+ write_callback: WriteCallback,
+ input_charset: Optional[str] = None,
+ inputs: Optional[Dict[str, Any]] = None,
+) -> StreamingResult:
+ """Execute a script with callback-driven input and output. See :meth:`DataWeave.run_input_output_callback`."""
+ return _get_global_instance().run_input_output_callback(
+ script, input_name, input_mime_type, read_callback, write_callback, input_charset, inputs,
+ )
+
+
+def cleanup():
+ global _global_instance
+ if _global_instance is not None:
+ _global_instance.cleanup()
+ _global_instance = None
+
+
+__all__ = [
+ "DataWeave",
+ "DataWeaveError",
+ "DataWeaveLibraryNotFoundError",
+ "DataWeaveScriptError",
+ "ExecutionResult",
+ "InputValue",
+ "ReadCallback",
+ "Stream",
+ "StreamingResult",
+ "WriteCallback",
+ "READ_CALLBACK",
+ "WRITE_CALLBACK",
+ "run",
+ "run_callback",
+ "run_input_output_callback",
+ "run_streaming",
+ "run_transform",
+ "cleanup",
+]
diff --git a/native-lib/python/tests/person.xml b/native-lib/python/tests/person.xml
new file mode 100644
index 0000000..376a6b7
Binary files /dev/null and b/native-lib/python/tests/person.xml differ
diff --git a/native-lib/python/tests/test_dataweave_module.py b/native-lib/python/tests/test_dataweave_module.py
new file mode 100755
index 0000000..bbf4a32
--- /dev/null
+++ b/native-lib/python/tests/test_dataweave_module.py
@@ -0,0 +1,494 @@
+#!/usr/bin/env python3
+"""
+Quick test script for the DataWeave Python module.
+"""
+
+import sys
+from pathlib import Path
+
+_PYTHON_SRC_DIR = Path(__file__).resolve().parents[1] / "src"
+sys.path.insert(0, str(_PYTHON_SRC_DIR))
+
+import dataweave
+
+def test_basic():
+ """Test basic functionality"""
+ print("Testing basic script execution...")
+ try:
+ result = dataweave.run("2 + 2", {})
+ assert result.get_string() == "4", f"Expected '4', got '{result.get_string()}'"
+ print("[OK] Basic script execution works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Basic script execution failed: {e}")
+ return False
+
+def test_with_inputs():
+ """Test script with inputs"""
+ print("\nTesting script with inputs...")
+ try:
+ result = dataweave.run("num1 + num2", {"num1": 25, "num2": 17})
+ assert result.get_string() == "42", f"Expected '42', got '{result.get_string()}'"
+ print("[OK] Script with inputs works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Script with inputs failed: {e}")
+ return False
+
+def test_context_manager():
+ """Test context manager"""
+ print("\nTesting with context manager...")
+ try:
+ with dataweave.DataWeave() as dw:
+
+ result = dw.run("sqrt(144)")
+ assert result.get_string() == "12", f"Expected '12', got '{result.get_string()}'"
+ result = dw.run("sqrt(10000)")
+ assert result.get_string() == "100", f"Expected '100', got '{result.get_string()}'"
+ print("[OK] Script execution witch context manager works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Script execution witch context manager failed: {e}")
+ return False
+
+def test_encoding():
+ """Test reading UTF-16 XML input and producing CSV output"""
+ print("\nTesting encoding (UTF-16 XML -> CSV)...")
+ try:
+ xml_path = (
+ Path(__file__).resolve().parent / "person.xml"
+ )
+ xml_bytes = xml_path.read_bytes()
+
+ script = """output application/csv header=true
+---
+[payload.person]
+"""
+
+ result = dataweave.run(
+ script,
+ {
+ "payload": {
+ "content": xml_bytes,
+ "mimeType": "application/xml",
+ "charset": "UTF-16",
+ }
+ },
+ )
+
+ out = result.get_string() or ""
+ print(f"out: \n{out}")
+ assert result.success is True, f"Expected success=true, got: {result}"
+ assert "name" in out and "age" in out, f"CSV header missing, got: {out!r}"
+ assert "Billy" in out, f"Expected name 'Billy' in CSV, got: {out!r}"
+ assert "31" in out, f"Expected age '31' in CSV, got: {out!r}"
+
+ print("[OK] Encoding conversion works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Encoding conversion failed: {e}")
+ return False
+
+def test_auto_conversion():
+ """Test auto-conversion of different types"""
+ print("\nTesting auto-conversion...")
+ try:
+
+ # Test array
+ result = dataweave.run(
+ "numbers[0]",
+ {"numbers": [1, 2, 3]}
+ )
+ assert result.get_string() == "1", f"Expected '1', got '{result.get_string()}'"
+
+ print("[OK] Auto-conversion works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Auto-conversion failed: {e}")
+ return False
+
+def test_callback_output_basic():
+ """Test callback-based output streaming"""
+ print("\nTesting callback output basic...")
+ try:
+ chunks = []
+
+ def on_write(data: bytes) -> int:
+ chunks.append(data)
+ return 0
+
+ result = dataweave.run_callback("2 + 2", on_write)
+ assert result.success is True, f"Expected success, got: {result}"
+ full = b"".join(chunks)
+ text = full.decode(result.charset or "utf-8")
+ assert text == "4", f"Expected '4', got '{text}'"
+ print(f"[OK] Callback output basic works (chunks={len(chunks)}, result='{text}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback output basic failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_callback_output_with_inputs():
+ """Test callback-based output streaming with inputs"""
+ print("\nTesting callback output with inputs...")
+ try:
+ chunks = []
+
+ def on_write(data: bytes) -> int:
+ chunks.append(data)
+ return 0
+
+ result = dataweave.run_callback("num1 + num2", on_write, inputs={"num1": 25, "num2": 17})
+ assert result.success is True, f"Expected success, got: {result}"
+ full = b"".join(chunks)
+ text = full.decode(result.charset or "utf-8")
+ assert text == "42", f"Expected '42', got '{text}'"
+ print(f"[OK] Callback output with inputs works (result='{text}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback output with inputs failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_callback_input_output():
+ """Test callback-based input and output streaming"""
+ print("\nTesting callback input+output...")
+ try:
+ import io as _io
+
+ source = _io.BytesIO(b'[10, 20, 30, 40, 50]')
+ output_chunks = []
+
+ def on_read(buf_size: int) -> bytes:
+ return source.read(buf_size)
+
+ def on_write(data: bytes) -> int:
+ output_chunks.append(data)
+ return 0
+
+ script = "output application/json\n---\npayload map ($ * 2)"
+ result = dataweave.run_input_output_callback(
+ script,
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=on_read,
+ write_callback=on_write,
+ )
+ assert result.success is True, f"Expected success, got: {result}"
+ full = b"".join(output_chunks)
+ text = full.decode(result.charset or "utf-8")
+ assert "20" in text, f"Expected 20 in result (10*2), got: {text}"
+ assert "100" in text, f"Expected 100 in result (50*2), got: {text}"
+ print(f"[OK] Callback input+output works (chunks={len(output_chunks)}, result={text.strip()[:80]}...)")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback input+output failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_run_streaming_basic():
+ """Test run_streaming yields chunks and returns metadata"""
+ print("\nTesting run_streaming basic...")
+ try:
+ stream = dataweave.run_streaming("output application/json --- {a: 1, b: 2}")
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(stream))
+ except StopIteration as e:
+ metadata = e.value
+
+ full = b"".join(chunks)
+ assert len(full) > 0, "Expected non-empty output"
+ text = full.decode(metadata.charset or "utf-8")
+ assert '"a": 1' in text or '"a":1' in text, f"Expected key 'a' in JSON, got: {text}"
+ assert metadata.success is True, f"Expected success, got: {metadata}"
+ assert metadata.mime_type == "application/json", f"Expected json mime, got: {metadata.mime_type}"
+ print(f"[OK] run_streaming basic works (chunks={len(chunks)}, result={text.strip()[:60]})")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_streaming basic failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_run_streaming_large():
+ """Test run_streaming with large output to verify true streaming (multiple chunks)"""
+ print("\nTesting run_streaming large...")
+ try:
+ script = 'output application/json --- (1 to 5000) map {id: $, name: "item_" ++ $}'
+ stream = dataweave.run_streaming(script)
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(stream))
+ except StopIteration as e:
+ metadata = e.value
+
+ full = b"".join(chunks)
+ text = full.decode(metadata.charset or "utf-8")
+ assert metadata.success is True, f"Expected success, got: {metadata}"
+ assert len(chunks) > 1, f"Expected multiple chunks for large output, got {len(chunks)}"
+ assert '"id": 5000' in text or '"id":5000' in text, f"Expected last item in output"
+ print(f"[OK] run_streaming large works (chunks={len(chunks)}, bytes={len(full)})")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_streaming large failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_run_streaming_error():
+ """Test run_streaming with an invalid script returns error metadata"""
+ print("\nTesting run_streaming error...")
+ try:
+ stream = dataweave.run_streaming("output application/json --- invalid_var")
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(stream))
+ except StopIteration as e:
+ metadata = e.value
+
+ assert metadata.success is False, f"Expected failure, got: {metadata}"
+ assert metadata.error is not None, "Expected error message"
+ assert len(chunks) == 0, f"Expected no chunks on error, got {len(chunks)}"
+ print(f"[OK] run_streaming error works (error={metadata.error[:60]}...)")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_streaming error failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_run_streaming_with_inputs():
+ """Test run_streaming with input bindings"""
+ print("\nTesting run_streaming with inputs...")
+ try:
+ stream = dataweave.run_streaming("num1 + num2", {"num1": 25, "num2": 17})
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(stream))
+ except StopIteration as e:
+ metadata = e.value
+
+ full = b"".join(chunks)
+ text = full.decode(metadata.charset or "utf-8")
+ assert metadata.success is True, f"Expected success, got: {metadata}"
+ assert text.strip() == "42", f"Expected '42', got '{text.strip()}'"
+ print(f"[OK] run_streaming with inputs works (result='{text.strip()}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_streaming with inputs failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_callback_input_output_large():
+ """Test callback-based input+output streaming with large data"""
+ print("\nTesting callback input+output large...")
+ try:
+ import io as _io
+
+ # Build a large JSON array
+ parts = [b"["]
+ for i in range(1, 1001):
+ if i > 1:
+ parts.append(b",")
+ parts.append(f'{{"id":{i}}}'.encode())
+ parts.append(b"]")
+ source = _io.BytesIO(b"".join(parts))
+ output_chunks = []
+
+ def on_read(buf_size: int) -> bytes:
+ return source.read(buf_size)
+
+ def on_write(data: bytes) -> int:
+ output_chunks.append(data)
+ return 0
+
+ result = dataweave.run_input_output_callback(
+ "output application/json\n---\nsizeOf(payload)",
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=on_read,
+ write_callback=on_write,
+ )
+ assert result.success is True, f"Expected success, got: {result}"
+ full = b"".join(output_chunks)
+ text = full.decode(result.charset or "utf-8")
+ assert text == "1000", f"Expected '1000', got '{text}'"
+ print(f"[OK] Callback input+output large works (result='{text}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback input+output large failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_run_transform_basic():
+ """Test run_transform with an iterable input and streaming output"""
+ print("\nTesting run_transform basic...")
+ try:
+ input_data = [b'[10, 20, 30, 40, 50]']
+ script = "output application/json\n---\npayload map ($ * 2)"
+ stream = dataweave.run_transform(script, input_stream=input_data, input_mime_type="application/json")
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(stream))
+ except StopIteration as e:
+ metadata = e.value
+
+ full = b"".join(chunks)
+ text = full.decode(metadata.charset or "utf-8")
+ assert metadata.success is True, f"Expected success, got: {metadata}"
+ assert "20" in text, f"Expected 20 in result (10*2), got: {text}"
+ assert "100" in text, f"Expected 100 in result (50*2), got: {text}"
+ print(f"[OK] run_transform basic works (chunks={len(chunks)}, result={text.strip()[:60]})")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_transform basic failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_run_transform_large():
+ """Test run_transform with large chunked input to verify streaming both directions"""
+ print("\nTesting run_transform large...")
+ try:
+ import io as _io
+
+ # Build a large JSON array as chunked input
+ parts = [b"["]
+ for i in range(1, 1001):
+ if i > 1:
+ parts.append(b",")
+ parts.append(f'{{"id":{i}}}'.encode())
+ parts.append(b"]")
+ full_input = b"".join(parts)
+
+ # Feed in 4KB chunks (simulating a file/network read)
+ def chunked(data, size=4096):
+ for i in range(0, len(data), size):
+ yield data[i:i+size]
+
+ script = "output application/json\n---\nsizeOf(payload)"
+ stream = dataweave.run_transform(
+ script,
+ input_stream=chunked(full_input),
+ input_mime_type="application/json",
+ )
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(stream))
+ except StopIteration as e:
+ metadata = e.value
+
+ full = b"".join(chunks)
+ text = full.decode(metadata.charset or "utf-8")
+ assert metadata.success is True, f"Expected success, got: {metadata}"
+ assert text == "1000", f"Expected '1000', got '{text}'"
+ print(f"[OK] run_transform large works (result='{text}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_transform large failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_run_transform_with_file():
+ """Test run_transform reading from a file-like object"""
+ print("\nTesting run_transform with file...")
+ try:
+ from pathlib import Path
+
+ xml_path = Path(__file__).resolve().parent / "person.xml"
+
+ with open(xml_path, "rb") as f:
+ stream = dataweave.run_transform(
+ "output application/csv header=true\n---\n[payload.person]",
+ input_stream=iter(lambda: f.read(4096), b""),
+ input_mime_type="application/xml",
+ input_charset="UTF-16",
+ )
+ chunks = []
+ try:
+ while True:
+ chunks.append(next(stream))
+ except StopIteration as e:
+ metadata = e.value
+
+ full = b"".join(chunks)
+ text = full.decode(metadata.charset or "utf-8")
+ assert metadata.success is True, f"Expected success, got: {metadata}"
+ assert "Billy" in text, f"Expected 'Billy' in CSV, got: {text}"
+ assert "31" in text, f"Expected '31' in CSV, got: {text}"
+ print(f"[OK] run_transform with file works (result={text.strip()[:60]})")
+ return True
+ except Exception as e:
+ print(f"[FAIL] run_transform with file failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def main():
+ """Run all tests"""
+ print("="*70)
+ print("DataWeave Python Module - Test Suite")
+ print("="*70)
+
+ try:
+ results = []
+ results.append(test_basic())
+ results.append(test_with_inputs())
+ results.append(test_context_manager())
+ results.append(test_encoding())
+ results.append(test_auto_conversion())
+ results.append(test_callback_output_basic())
+ results.append(test_callback_output_with_inputs())
+ results.append(test_callback_input_output())
+ results.append(test_callback_input_output_large())
+ results.append(test_run_streaming_basic())
+ results.append(test_run_streaming_large())
+ results.append(test_run_streaming_error())
+ results.append(test_run_streaming_with_inputs())
+ results.append(test_run_transform_basic())
+ results.append(test_run_transform_large())
+ results.append(test_run_transform_with_file())
+
+ # Cleanup
+ dataweave.cleanup()
+
+ print("\n" + "="*70)
+ passed = sum(results)
+ total = len(results)
+ print(f"Results: {passed}/{total} tests passed")
+ print("="*70)
+
+ if passed == total:
+ print("\n[OK] All tests passed!")
+ sys.exit(0)
+ else:
+ print(f"\n[FAIL] {total - passed} test(s) failed")
+ sys.exit(1)
+
+ except dataweave.DataWeaveLibraryNotFoundError as e:
+ print(f"\n[ERROR] {e}")
+ print("\nPlease build the native library first:")
+ print(" ./gradlew nativeCompile")
+ sys.exit(2)
+ except Exception as e:
+ print(f"\n[ERROR] Unexpected error: {e}")
+ import traceback
+ traceback.print_exc()
+ sys.exit(1)
+
+if __name__ == "__main__":
+ main()
diff --git a/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java b/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java
new file mode 100644
index 0000000..6d1537b
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java
@@ -0,0 +1,125 @@
+package org.mule.weave.lib;
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages a {@link PipedInputStream}/{@link PipedOutputStream} pair that allows FFI callers
+ * to stream data into the DataWeave engine as an input binding.
+ *
+ *
The caller writes bytes into the {@link PipedOutputStream} via {@link #write(byte[], int)}
+ * while the DW engine reads from the paired {@link PipedInputStream} on a separate thread.
+ *
+ * Instances are stored in a static registry keyed by a monotonically increasing handle so that
+ * native callers can reference them across {@code @CEntryPoint} invocations.
+ */
+public class InputStreamSession {
+
+ private static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>();
+ private static final AtomicLong NEXT_HANDLE = new AtomicLong(1);
+
+ private static final int PIPE_BUFFER_SIZE = 64 * 1024;
+
+ private final PipedInputStream pipedInputStream;
+ private final PipedOutputStream pipedOutputStream;
+ private final String mimeType;
+ private final String charset;
+
+ /**
+ * Creates a new input stream session with the given metadata.
+ *
+ * @param mimeType the MIME type of the input data
+ * @param charset the character set of the input data (may be {@code null}, defaults to UTF-8)
+ */
+ public InputStreamSession(String mimeType, String charset) {
+ try {
+ this.pipedInputStream = new PipedInputStream(PIPE_BUFFER_SIZE);
+ this.pipedOutputStream = new PipedOutputStream(pipedInputStream);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create piped streams", e);
+ }
+ this.mimeType = mimeType;
+ this.charset = charset != null ? charset : "UTF-8";
+ }
+
+ /**
+ * Registers this session and returns its handle.
+ *
+ * @return a unique handle that callers use to reference this session
+ */
+ public long register() {
+ long handle = NEXT_HANDLE.getAndIncrement();
+ SESSIONS.put(handle, this);
+ return handle;
+ }
+
+ /**
+ * Looks up a previously registered session.
+ *
+ * @param handle the handle returned by {@link #register()}
+ * @return the session, or {@code null} if not found
+ */
+ public static InputStreamSession get(long handle) {
+ return SESSIONS.get(handle);
+ }
+
+ /**
+ * Removes a session from the registry and closes both ends of the pipe.
+ *
+ * @param handle the session handle
+ */
+ public static void close(long handle) {
+ InputStreamSession session = SESSIONS.remove(handle);
+ if (session != null) {
+ try {
+ session.pipedOutputStream.close();
+ } catch (IOException ignored) {
+ }
+ try {
+ session.pipedInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Writes bytes into the pipe. The DW engine will read these from the paired
+ * {@link PipedInputStream}.
+ *
+ * @param data the byte array to write from
+ * @param length the number of bytes to write
+ * @throws IOException if an I/O error occurs
+ */
+ public void write(byte[] data, int length) throws IOException {
+ pipedOutputStream.write(data, 0, length);
+ }
+
+ /**
+ * Closes the write end of the pipe, signalling EOF to the reader.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void closeWriter() throws IOException {
+ pipedOutputStream.close();
+ }
+
+ /**
+ * Returns the {@link PipedInputStream} that the DW engine should read from.
+ *
+ * @return the read end of the pipe
+ */
+ public PipedInputStream getInputStream() {
+ return pipedInputStream;
+ }
+
+ public String getMimeType() {
+ return mimeType;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java b/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java
new file mode 100644
index 0000000..3872354
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java
@@ -0,0 +1,49 @@
+package org.mule.weave.lib;
+
+import org.graalvm.nativeimage.c.function.CFunctionPointer;
+import org.graalvm.nativeimage.c.function.InvokeCFunctionPointer;
+import org.graalvm.nativeimage.c.type.CCharPointer;
+import org.graalvm.word.PointerBase;
+
+/**
+ * Function-pointer (callback) interfaces used by the callback-based streaming API in
+ * {@link NativeLib}.
+ *
+ * FFI callers pass C function pointers that conform to these signatures. The Java side
+ * invokes them via {@link InvokeCFunctionPointer} to push/pull data without requiring
+ * the session-based open/read|write/close round-trips.
+ */
+public final class NativeCallbacks {
+
+ private NativeCallbacks() {
+ }
+
+ /**
+ * Callback the native caller provides to receive output data.
+ *
+ * Signature in C:
+ *
{@code int (*WriteCallback)(void *ctx, const char *buffer, int length);}
+ *
+ * The Java side calls this repeatedly with chunks of the script result.
+ * The callback must return {@code 0} on success or a non-zero value to abort.
+ */
+ public interface WriteCallback extends CFunctionPointer {
+ @InvokeCFunctionPointer
+ int invoke(PointerBase ctx, CCharPointer buffer, int length);
+ }
+
+ /**
+ * Callback the native caller provides to supply input data.
+ *
+ * Signature in C:
+ *
{@code int (*ReadCallback)(void *ctx, char *buffer, int bufferSize);}
+ *
+ * The Java side calls this to pull the next chunk of input bytes. The callback must
+ * write up to {@code bufferSize} bytes into {@code buffer} and return the number of
+ * bytes written, {@code 0} on EOF, or {@code -1} on error.
+ */
+ public interface ReadCallback extends CFunctionPointer {
+ @InvokeCFunctionPointer
+ int invoke(PointerBase ctx, CCharPointer buffer, int bufferSize);
+ }
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java b/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java
new file mode 100644
index 0000000..de776e7
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java
@@ -0,0 +1,333 @@
+package org.mule.weave.lib;
+
+import org.graalvm.nativeimage.IsolateThread;
+import org.graalvm.nativeimage.UnmanagedMemory;
+import org.graalvm.nativeimage.c.function.CEntryPoint;
+import org.graalvm.nativeimage.c.type.CCharPointer;
+import org.graalvm.nativeimage.c.type.CTypeConversion;
+import org.graalvm.word.PointerBase;
+import org.graalvm.word.WordFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * GraalVM native entry points exposed for FFI consumers.
+ *
+ * This class provides C-callable functions to execute DataWeave scripts and to free the returned
+ * unmanaged strings.
+ */
+public class NativeLib {
+
+ /**
+ * Native method that executes a DataWeave script with inputs and returns the result.
+ * Can be called from Python via FFI.
+ *
+ * @param thread the isolate thread (automatically provided by GraalVM)
+ * @param script the DataWeave script to execute (C string pointer)
+ * @param inputsJson JSON string containing the inputs map with content (base64 encoded), mimeType, properties and charset for each binding
+ * @return the script execution result base64 encoded (C string pointer)
+ */
+ @CEntryPoint(name = "run_script")
+ public static CCharPointer runDwScriptEncoded(IsolateThread thread, CCharPointer script, CCharPointer inputsJson) {
+ String dwScript = CTypeConversion.toJavaString(script);
+ String inputs = CTypeConversion.toJavaString(inputsJson);
+
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ String result = runtime.run(dwScript, inputs);
+ return toUnmanagedCString(result);
+ }
+
+ /**
+ * Frees a C string previously returned by {@link #runDwScriptEncoded(IsolateThread, CCharPointer, CCharPointer)}.
+ *
+ * @param thread the isolate thread (automatically provided by GraalVM)
+ * @param pointer the pointer to the unmanaged C string to free; if null, this is a no-op
+ */
+ @CEntryPoint(name = "free_cstring")
+ public static void freeCString(IsolateThread thread, CCharPointer pointer) {
+ if (pointer.isNull()) {
+ return;
+ }
+ UnmanagedMemory.free(pointer);
+ }
+
+ // ── Callback-based Streaming API ─────────────────────────────────────
+
+ private static final int CALLBACK_BUFFER_SIZE = 8 * 1024;
+
+ /**
+ * Executes a DataWeave script and streams the result to a caller-supplied write callback.
+ *
+ * Instead of the session-based open/read/close cycle, the caller passes a
+ * {@code WriteCallback} function pointer. The Java side reads the output stream in chunks
+ * and invokes the callback for each chunk until the stream is exhausted.
+ *
+ * The returned C string is a JSON object with the execution metadata:
+ *
+ * - On success: {@code {"success":true,"mimeType":"...","charset":"...","binary":true/false}}
+ * - On error: {@code {"success":false,"error":"..."}}
+ *
+ * The caller must free the returned pointer with {@link #freeCString}.
+ *
+ * @param thread the isolate thread
+ * @param script the DataWeave script (C string)
+ * @param inputsJson JSON-encoded inputs map (C string), may be null
+ * @param writeCallback function pointer invoked with each output chunk; must return 0 on success
+ * @param ctx opaque context pointer forwarded to every callback invocation
+ * @return an unmanaged C string with JSON metadata/error
+ */
+ @CEntryPoint(name = "run_script_callback")
+ public static CCharPointer runScriptCallback(
+ IsolateThread thread,
+ CCharPointer script,
+ CCharPointer inputsJson,
+ NativeCallbacks.WriteCallback writeCallback,
+ PointerBase ctx) {
+
+ String dwScript = CTypeConversion.toJavaString(script);
+ String inputs = inputsJson.isNull() ? null : CTypeConversion.toJavaString(inputsJson);
+
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ StreamSession session = runtime.runStreaming(dwScript, inputs);
+
+ if (session.isError()) {
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(session.getError()) + "\"}");
+ }
+
+ try {
+ byte[] buf = new byte[CALLBACK_BUFFER_SIZE];
+ CCharPointer nativeBuf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE);
+ try {
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ for (int i = 0; i < n; i++) {
+ nativeBuf.write(i, buf[i]);
+ }
+ int rc = writeCallback.invoke(ctx, nativeBuf, n);
+ if (rc != 0) {
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + "Write callback returned error: " + rc + "\"}");
+ }
+ }
+ } finally {
+ UnmanagedMemory.free(nativeBuf);
+ }
+ } catch (IOException e) {
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(e.getMessage()) + "\"}");
+ } finally {
+ session.closeStream();
+ }
+
+ return toUnmanagedCString("{\"success\":true"
+ + ",\"mimeType\":\"" + session.getMimeType() + "\""
+ + ",\"charset\":\"" + session.getCharset() + "\""
+ + ",\"binary\":" + session.isBinary()
+ + "}");
+ }
+
+ /**
+ * Executes a DataWeave script whose output is streamed via a write callback, and whose
+ * input named {@code inputName} is fed via a read callback.
+ *
+ * The read callback is invoked on a background thread to pull input data while the
+ * output is pushed to the write callback on the calling thread. This allows fully
+ * callback-driven input and output streaming in a single call.
+ *
+ * The returned C string follows the same JSON schema as
+ * {@link #runScriptCallback}.
+ *
+ * @param thread the isolate thread
+ * @param script the DataWeave script (C string)
+ * @param inputsJson JSON-encoded inputs map (C string), may be null; entries for
+ * {@code inputName} are ignored since the read callback supplies that input
+ * @param inputName the binding name for the callback-supplied input (C string)
+ * @param inputMimeType the MIME type of the callback-supplied input (C string)
+ * @param inputCharset the charset of the callback-supplied input (C string), may be null for UTF-8
+ * @param readCallback function pointer invoked to read the next chunk; must return bytes written,
+ * 0 on EOF, or -1 on error
+ * @param writeCallback function pointer invoked with each output chunk; must return 0 on success
+ * @param ctx opaque context pointer forwarded to every callback invocation
+ * @return an unmanaged C string with JSON metadata/error
+ */
+ @CEntryPoint(name = "run_script_input_output_callback")
+ public static CCharPointer runScriptInputOutputCallback(
+ IsolateThread thread,
+ CCharPointer script,
+ CCharPointer inputsJson,
+ CCharPointer inputName,
+ CCharPointer inputMimeType,
+ CCharPointer inputCharset,
+ NativeCallbacks.ReadCallback readCallback,
+ NativeCallbacks.WriteCallback writeCallback,
+ PointerBase ctx) {
+
+ String dwScript = CTypeConversion.toJavaString(script);
+ String inputs = inputsJson.isNull() ? null : CTypeConversion.toJavaString(inputsJson);
+ String inName = CTypeConversion.toJavaString(inputName);
+ String inMime = CTypeConversion.toJavaString(inputMimeType);
+ String inCharset = inputCharset.isNull() ? null : CTypeConversion.toJavaString(inputCharset);
+
+ // Create a piped input stream session for the callback-supplied input
+ InputStreamSession inputSession = new InputStreamSession(inMime, inCharset);
+ long inputHandle = inputSession.register();
+
+ // Merge the stream handle into the inputs JSON
+ String streamEntry = "{\"streamHandle\":\"" + inputHandle + "\",\"mimeType\":\"" + inMime + "\""
+ + (inCharset != null ? ",\"charset\":\"" + inCharset + "\"" : "") + "}";
+ String mergedInputs = mergeInputEntry(inputs, inName, streamEntry);
+
+ // Start a background thread that calls the readCallback and feeds data into the pipe.
+ // Word types (CCharPointer, CFunctionPointer, PointerBase) cannot be captured in
+ // lambdas in GraalVM Native Image, so we use an explicit Runnable that stores their
+ // raw addresses and reconstitutes them via WordFactory.
+ final long readCallbackAddr = readCallback.rawValue();
+ final long ctxAddr = ctx.rawValue();
+ Thread feeder = new Thread(new InputCallbackFeeder(
+ readCallbackAddr, ctxAddr, inputSession), "dw-input-callback-feeder");
+ feeder.setDaemon(true);
+ feeder.start();
+
+ // Execute the script and stream output via the writeCallback
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ StreamSession session = runtime.runStreaming(dwScript, mergedInputs);
+
+ if (session.isError()) {
+ cleanupFeeder(feeder, inputHandle);
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(session.getError()) + "\"}");
+ }
+
+ try {
+ byte[] buf = new byte[CALLBACK_BUFFER_SIZE];
+ CCharPointer writeBuf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE);
+ try {
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ for (int i = 0; i < n; i++) {
+ writeBuf.write(i, buf[i]);
+ }
+ int rc = writeCallback.invoke(ctx, writeBuf, n);
+ if (rc != 0) {
+ cleanupFeeder(feeder, inputHandle);
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + "Write callback returned error: " + rc + "\"}");
+ }
+ }
+ } finally {
+ UnmanagedMemory.free(writeBuf);
+ }
+ } catch (IOException e) {
+ cleanupFeeder(feeder, inputHandle);
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(e.getMessage()) + "\"}");
+ } finally {
+ session.closeStream();
+ }
+
+ cleanupFeeder(feeder, inputHandle);
+
+ return toUnmanagedCString("{\"success\":true"
+ + ",\"mimeType\":\"" + session.getMimeType() + "\""
+ + ",\"charset\":\"" + session.getCharset() + "\""
+ + ",\"binary\":" + session.isBinary()
+ + "}");
+ }
+
+ /**
+ * Merges a single input entry into an existing JSON inputs string.
+ */
+ private static String mergeInputEntry(String existingJson, String name, String entryJson) {
+ org.json.JSONObject obj = (existingJson == null || existingJson.trim().isEmpty())
+ ? new org.json.JSONObject()
+ : new org.json.JSONObject(existingJson);
+ obj.put(name, new org.json.JSONObject(entryJson));
+ return obj.toString();
+ }
+
+ /**
+ * Waits for the feeder thread to finish and closes the input session.
+ */
+ private static void cleanupFeeder(Thread feeder, long inputHandle) {
+ try {
+ feeder.join(5000);
+ } catch (InterruptedException ignored) {
+ }
+ InputStreamSession.close(inputHandle);
+ }
+
+ /**
+ * Explicit {@link Runnable} that drives the read-callback loop on a background thread.
+ *
+ * GraalVM Native Image forbids capturing {@code Word} types (such as
+ * {@link CCharPointer} or {@link CFunctionPointer}) inside lambdas. This class stores
+ * the raw addresses as plain {@code long} values and reconstitutes the pointers via
+ * {@link WordFactory#pointer(long)} inside {@link #run()}.
+ *
+ * The feeder allocates its own native read buffer and frees it in its {@code finally}
+ * block, ensuring no shared native memory between threads.
+ */
+ private static final class InputCallbackFeeder implements Runnable {
+ private final long readCallbackAddr;
+ private final long ctxAddr;
+ private final InputStreamSession inputSession;
+
+ InputCallbackFeeder(long readCallbackAddr, long ctxAddr,
+ InputStreamSession inputSession) {
+ this.readCallbackAddr = readCallbackAddr;
+ this.ctxAddr = ctxAddr;
+ this.inputSession = inputSession;
+ }
+
+ @Override
+ public void run() {
+ NativeCallbacks.ReadCallback cb = WordFactory.pointer(readCallbackAddr);
+ PointerBase ctx = WordFactory.pointer(ctxAddr);
+ CCharPointer buf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE);
+ try {
+ while (true) {
+ int n = cb.invoke(ctx, buf, CALLBACK_BUFFER_SIZE);
+ if (n <= 0) {
+ break; // 0 = EOF, negative = error
+ }
+ byte[] tmp = new byte[n];
+ for (int i = 0; i < n; i++) {
+ tmp[i] = buf.read(i);
+ }
+ inputSession.write(tmp, n);
+ }
+ } catch (IOException e) {
+ // pipe broken – DW engine will see the error
+ } finally {
+ UnmanagedMemory.free(buf);
+ try {
+ inputSession.closeWriter();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+ }
+
+ private static String escapeJsonString(String input) {
+ if (input == null) return "";
+ return input
+ .replace("\\", "\\\\")
+ .replace("\"", "\\\"")
+ .replace("\n", "\\n")
+ .replace("\r", "\\r")
+ .replace("\t", "\\t");
+ }
+
+ private static CCharPointer toUnmanagedCString(String value) {
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ CCharPointer ptr = UnmanagedMemory.malloc(bytes.length + 1);
+ for (int i = 0; i < bytes.length; i++) {
+ ptr.write(i, bytes[i]);
+ }
+ ptr.write(bytes.length, (byte) 0);
+ return ptr;
+ }
+
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java b/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java
new file mode 100644
index 0000000..9cb4007
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java
@@ -0,0 +1,234 @@
+package org.mule.weave.lib;
+
+import org.json.JSONObject;
+import org.mule.weave.v2.runtime.BindingValue;
+import org.mule.weave.v2.runtime.DataWeaveResult;
+import org.mule.weave.v2.runtime.ScriptingBindings;
+import org.mule.weave.v2.runtime.api.DWResult;
+import org.mule.weave.v2.runtime.api.DWScript;
+import org.mule.weave.v2.runtime.api.DWScriptingEngine;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.immutable.Map;
+import scala.collection.immutable.Map$;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Base64;
+
+/**
+ * Singleton wrapper around a {@link DWScriptingEngine} used to compile and execute DataWeave scripts.
+ *
+ * Execution results are returned as a JSON string containing a base64-encoded payload plus metadata
+ * (mime type, charset, and whether the result is binary). Errors are returned as a JSON string with
+ * {@code success=false} and an escaped error message.
+ */
+public class ScriptRuntime {
+
+ private static final ScriptRuntime INSTANCE = new ScriptRuntime();
+
+ /**
+ * Returns the singleton instance.
+ *
+ * @return the shared {@link ScriptRuntime}
+ */
+ public static ScriptRuntime getInstance() {
+ return INSTANCE;
+ }
+
+ private DWScriptingEngine engine;
+
+ private ScriptRuntime() {
+ engine = DWScriptingEngine.builder().build();
+ }
+
+ /**
+ * Executes a DataWeave script with no input bindings.
+ *
+ * @param script the DataWeave script source
+ * @return a JSON string describing either the successful result or an error
+ */
+ public String run(String script) {
+ return run(script, null);
+ }
+
+ /**
+ * Executes a DataWeave script with optional input bindings encoded as JSON.
+ *
+ * The expected JSON structure maps binding names to an object containing {@code content}
+ * (base64), {@code mimeType}, optional {@code charset}, and optional {@code properties}.
+ *
+ * @param script the DataWeave script source
+ * @param inputsJson JSON string encoding the input bindings map, or {@code null}
+ * @return a JSON string describing either the successful result or an error
+ */
+ public String run(String script, String inputsJson) {
+ ScriptingBindings bindings = parseJsonInputsToBindings(inputsJson);
+ String[] inputs = bindings.bindingNames();
+
+ try {
+ DWScript compiled = engine.compileDWScript(script, inputs);
+ DWResult dwResult = compiled.writeDWResult(bindings);
+
+ String encodedResult;
+ if (dwResult.getContent() instanceof InputStream) {
+ try {
+ byte[] ba = ((InputStream) dwResult.getContent()).readAllBytes();
+ encodedResult = Base64.getEncoder().encodeToString(ba);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException("Result is not an InputStream: " + dwResult.getContent().getClass().getName());
+ }
+
+ return "{"
+ + "\"success\":true,"
+ + "\"result\":\"" + encodedResult + "\","
+ + "\"mimeType\":\"" + dwResult.getMimeType() + "\","
+ + "\"charset\":\"" + dwResult.getCharset() + "\","
+ + "\"binary\":" + ((DataWeaveResult) dwResult).isBinary()
+ + "}";
+ } catch (Exception e) {
+ String message = e.getMessage();
+ if (message == null || message.trim().isEmpty()) {
+ message = e.toString();
+ }
+
+ return "{"
+ + "\"success\":false,"
+ + "\"error\":\"" + escapeJsonString(message) + "\""
+ + "}";
+ }
+ }
+
+ /**
+ * Executes a DataWeave script and returns a {@link StreamSession} whose {@link java.io.InputStream}
+ * can be read incrementally, avoiding loading the entire result into memory.
+ *
+ * @param script the DataWeave script source
+ * @param inputsJson JSON string encoding the input bindings map, or {@code null}
+ * @return a {@link StreamSession} with the result stream and metadata, or an error session
+ */
+ public StreamSession runStreaming(String script, String inputsJson) {
+ ScriptingBindings bindings = parseJsonInputsToBindings(inputsJson);
+ String[] inputs = bindings.bindingNames();
+
+ try {
+ DWScript compiled = engine.compileDWScript(script, inputs);
+ DWResult dwResult = compiled.writeDWResult(bindings);
+
+ if (dwResult.getContent() instanceof InputStream) {
+ return new StreamSession(
+ (InputStream) dwResult.getContent(),
+ dwResult.getMimeType(),
+ dwResult.getCharset().name(),
+ ((DataWeaveResult) dwResult).isBinary()
+ );
+ } else {
+ return StreamSession.ofError("Result is not an InputStream: " + dwResult.getContent().getClass().getName());
+ }
+ } catch (Exception e) {
+ String message = e.getMessage();
+ if (message == null || message.trim().isEmpty()) {
+ message = e.toString();
+ }
+ return StreamSession.ofError(message);
+ }
+ }
+
+ private ScriptingBindings parseJsonInputsToBindings(String inputsJson) {
+ ScriptingBindings bindings = new ScriptingBindings();
+
+ if (inputsJson == null || inputsJson.trim().isEmpty()) {
+ return bindings;
+ }
+
+ try {
+ JSONObject root = new JSONObject(inputsJson);
+
+ for (String name : root.keySet()) {
+ JSONObject entry = root.getJSONObject(name);
+
+ if (entry.has("streamHandle")) {
+ long streamHandle = Long.parseLong(entry.getString("streamHandle"));
+ InputStreamSession inputSession = InputStreamSession.get(streamHandle);
+ if (inputSession == null) {
+ throw new RuntimeException("Invalid streamHandle " + streamHandle + " for input '" + name + "'");
+ }
+ String mimeTypeRaw = entry.optString("mimeType", inputSession.getMimeType());
+ String charsetRaw = entry.optString("charset", inputSession.getCharset());
+ Charset charset = Charset.forName(charsetRaw);
+ Option mimeType = Option.apply(mimeTypeRaw);
+
+ BindingValue bindingValue = new BindingValue(inputSession.getInputStream(), mimeType, Map$.MODULE$.empty(), charset);
+ bindings.addBinding(name, bindingValue);
+
+ } else if (entry.has("content")) {
+ String contentRaw = entry.getString("content");
+ String mimeTypeRaw = entry.optString("mimeType", null);
+ String charsetRaw = entry.optString("charset", "UTF-8");
+
+ Map properties = Map$.MODULE$.empty();
+ if (entry.has("properties") && !entry.isNull("properties")) {
+ JSONObject propsObj = entry.getJSONObject("properties");
+ properties = parseJsonProperties(propsObj);
+ }
+
+ Charset charset = Charset.forName(charsetRaw);
+ Option mimeType = Option.apply(mimeTypeRaw);
+
+ byte[] content = Base64.getDecoder().decode(contentRaw);
+ BindingValue bindingValue = new BindingValue(content, mimeType, properties, charset);
+ bindings.addBinding(name, bindingValue);
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("Error parsing JSON inputs: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ return bindings;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map parseJsonProperties(JSONObject propsObj) {
+ Map result = Map$.MODULE$.empty();
+
+ for (String key : propsObj.keySet()) {
+ Object val = propsObj.get(key);
+ if (val instanceof String || val instanceof Boolean) {
+ result = (Map) result.$plus(new Tuple2<>(key, val));
+ } else if (val instanceof Number) {
+ Number num = (Number) val;
+ Object boxed;
+ if (val instanceof Double || val instanceof Float) {
+ boxed = num.doubleValue();
+ } else {
+ boxed = num.longValue();
+ }
+ result = (Map) result.$plus(new Tuple2<>(key, boxed));
+ } else if (val == JSONObject.NULL) {
+ throw new IllegalArgumentException("properties values cannot be null (key '" + key + "')");
+ } else {
+ throw new IllegalArgumentException("properties values must be primitive (string/number/boolean) (key '" + key + "')");
+ }
+ }
+
+ return result;
+ }
+
+ private String escapeJsonString(String input) {
+ if (input == null) {
+ return "";
+ }
+
+ return input
+ .replace("\\", "\\\\")
+ .replace("\"", "\\\"")
+ .replace("\n", "\\n")
+ .replace("\r", "\\r")
+ .replace("\t", "\\t");
+ }
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java b/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java
new file mode 100644
index 0000000..b54f299
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java
@@ -0,0 +1,136 @@
+package org.mule.weave.lib;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Holds an open {@link InputStream} and associated metadata for a streaming script execution.
+ *
+ * Instances are stored in a static registry keyed by a monotonically increasing handle so that
+ * native callers can reference them across {@code @CEntryPoint} invocations.
+ */
+public class StreamSession {
+
+ private static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>();
+ private static final AtomicLong NEXT_HANDLE = new AtomicLong(1);
+
+ private final InputStream inputStream;
+ private final String mimeType;
+ private final String charset;
+ private final boolean binary;
+ private final String error;
+
+ StreamSession(InputStream inputStream, String mimeType, String charset, boolean binary) {
+ this.inputStream = inputStream;
+ this.mimeType = mimeType;
+ this.charset = charset;
+ this.binary = binary;
+ this.error = null;
+ }
+
+ private StreamSession(String error) {
+ this.inputStream = null;
+ this.mimeType = null;
+ this.charset = null;
+ this.binary = false;
+ this.error = error;
+ }
+
+ /**
+ * Creates an error session that carries only an error message.
+ *
+ * @param error the error message
+ * @return an error session
+ */
+ public static StreamSession ofError(String error) {
+ return new StreamSession(error);
+ }
+
+ /**
+ * Registers this session and returns its handle.
+ *
+ * @return a unique handle that callers use to reference this session
+ */
+ public long register() {
+ long handle = NEXT_HANDLE.getAndIncrement();
+ SESSIONS.put(handle, this);
+ return handle;
+ }
+
+ /**
+ * Looks up a previously registered session.
+ *
+ * @param handle the handle returned by {@link #register()}
+ * @return the session, or {@code null} if not found
+ */
+ public static StreamSession get(long handle) {
+ return SESSIONS.get(handle);
+ }
+
+ /**
+ * Removes and closes a session.
+ *
+ * @param handle the session handle
+ */
+ public static void close(long handle) {
+ StreamSession session = SESSIONS.remove(handle);
+ if (session != null && session.inputStream != null) {
+ try {
+ session.inputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Reads up to {@code len} bytes into the provided byte array.
+ *
+ * @param buf destination buffer
+ * @param len maximum number of bytes to read
+ * @return number of bytes actually read, or {@code -1} on EOF
+ * @throws IOException if an I/O error occurs
+ */
+ public int read(byte[] buf, int len) throws IOException {
+ return inputStream.read(buf, 0, len);
+ }
+
+ public String getMimeType() {
+ return mimeType;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+
+ public boolean isBinary() {
+ return binary;
+ }
+
+ /**
+ * Closes the underlying input stream. Safe to call on error sessions (no-op).
+ */
+ public void closeStream() {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Returns the error message if this is an error session, or {@code null} otherwise.
+ */
+ public String getError() {
+ return error;
+ }
+
+ /**
+ * Returns {@code true} if this session represents a failed execution.
+ */
+ public boolean isError() {
+ return error != null;
+ }
+}
diff --git a/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat
new file mode 100644
index 0000000..b1a22df
--- /dev/null
+++ b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat
@@ -0,0 +1,9 @@
+org.mule.weave.v2.interpreted.module.WeaveDataFormat
+org.mule.weave.v2.module.core.json.JsonDataFormat
+org.mule.weave.v2.module.core.xml.XmlDataFormat
+org.mule.weave.v2.module.core.csv.CSVDataFormat
+org.mule.weave.v2.module.core.octetstream.OctetStreamDataFormat
+org.mule.weave.v2.module.core.textplain.TextPlainDataFormat
+org.mule.weave.v2.module.core.urlencoded.UrlEncodedDataFormat
+org.mule.weave.v2.module.core.multipart.MultiPartDataFormat
+org.mule.weave.v2.module.core.properties.PropertiesDataFormat
\ No newline at end of file
diff --git a/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader
new file mode 100644
index 0000000..ef3215b
--- /dev/null
+++ b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader
@@ -0,0 +1 @@
+org.mule.weave.v2.compilation.loader.WeaveBinaryResourceModuleLoader
\ No newline at end of file
diff --git a/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java b/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java
new file mode 100644
index 0000000..70f8044
--- /dev/null
+++ b/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java
@@ -0,0 +1,613 @@
+package org.mule.weave.lib;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Base64;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+class ScriptRuntimeTest {
+
+ @Test
+ void runSimpleScript() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Running sqrt(144) 10 times with timing:");
+ System.out.println("=".repeat(50));
+
+ for (int i = 1; i <= 20; i++) {
+ long startTime = System.nanoTime();
+ String result = runtime.run("sqrt(144)");
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ assertEquals("12", Result.parse(result).result);
+ System.out.printf("Run %2d: %.3f ms - Result: %s%n", i, executionTimeMs, result);
+ }
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runParseError() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Running sqrt(144) 10 times with timing:");
+ System.out.println("=".repeat(50));
+
+ String result = runtime.run("invalid syntax here");
+
+ String error = Result.parse(result).error;
+ assertTrue(error.contains("Unable to resolve reference"));
+ System.out.printf("Error: %s%n", result);
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithInputs() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing runWithInputs with two integer numbers:");
+ System.out.println("=".repeat(50));
+
+ // Test 1: Sum 25 + 17
+ int num1 = 25;
+ int num2 = 17;
+ int expected = num1 + num2;
+
+ // Create inputs JSON with content and mimeType for each binding
+ String inputsJson = String.format(
+ "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " +
+ "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(num1), encode(num2)
+ );
+
+ String script = "num1 + num2";
+
+ System.out.printf("Test 1: %d + %d%n", num1, num2);
+ System.out.printf("Script: %s%n", script);
+ System.out.printf("Inputs: %s%n", inputsJson);
+
+ long startTime = System.nanoTime();
+ String result = Result.parse(runtime.run(script, inputsJson)).result;
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: %d%n", expected);
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ assertEquals(String.valueOf(expected), result);
+ System.out.println("✓ Test 1 passed!");
+
+ System.out.println("-".repeat(50));
+
+ // Test 2: Sum 100 + 250
+ num1 = 100;
+ num2 = 250;
+ expected = num1 + num2;
+
+ inputsJson = String.format(
+ "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " +
+ "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(num1), encode(num2)
+ );
+
+ System.out.printf("Test 2: %d + %d%n", num1, num2);
+ System.out.printf("Script: %s%n", script);
+
+ startTime = System.nanoTime();
+ result = Result.parse(runtime.run(script, inputsJson)).result;
+ endTime = System.nanoTime();
+ executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: %d%n", expected);
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ assertEquals(String.valueOf(expected), result);
+ System.out.println("✓ Test 2 passed!");
+
+ System.out.println("=".repeat(50));
+ }
+
+ private String encode(Object value) {
+ byte[] bytes = value instanceof byte[] ? (byte[]) value : String.valueOf(value).getBytes();
+ return Base64.getEncoder().encodeToString(bytes);
+
+ }
+
+ @Test
+ void runWithXmlInput() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing runWithInputs with XML input to calculate average age:");
+ System.out.println("=".repeat(50));
+
+ // XML input with two people
+ String xmlInput = """
+
+
+ 19
+ john
+
+
+ 25
+ jane
+
+
+ """;
+
+ String inputsJson = String.format(
+ "{\"people\": {\"content\": \"%s\", \"mimeType\": \"application/xml\"}}",
+ encode(xmlInput)
+ );
+
+ // DataWeave script to calculate average age
+ String script = """
+ output application/json
+ ---
+ avg(people.people.*person.age)
+ """;
+
+ System.out.printf("XML Input:%n%s%n", xmlInput);
+ System.out.printf("Script:%n%s%n", script);
+
+ long startTime = System.nanoTime();
+ String result = runtime.run(script, inputsJson);
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: 22 (average of 19 and 25)%n");
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ // The average of 19 and 25 is 22
+ assertEquals("22", Result.parse(result).result);
+ System.out.println("✓ Test passed!");
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithJsonObjectInput() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing runWithInputs with JSON object input:");
+ System.out.println("=".repeat(50));
+
+ String jsonInput = "{\"name\": \"John\", \"age\": 30}";
+
+ String inputsJson = String.format(
+ "{\"payload\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(jsonInput)
+ );
+
+ // DataWeave script to extract name
+ String script = "output application/json\n---\npayload.name";
+
+ System.out.printf("JSON Input: %s%n", jsonInput);
+ System.out.printf("Script: %s%n", script);
+
+ long startTime = System.nanoTime();
+ String result = Result.parse(runtime.run(script, inputsJson)).result;
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: \"John\"%n");
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ assertEquals("\"John\"", result);
+ System.out.println("✓ Test passed!");
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithBinaryResult() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Running fromBase64 10 times with timing:");
+ System.out.println("=".repeat(50));
+
+ for (int i = 1; i <= 1; i++) {
+ long startTime = System.nanoTime();
+ Result result = Result.parse(runtime.run("import fromBase64 from dw::core::Binaries\n" +
+ "output application/octet-stream\n" +
+ "---\n" +
+ "fromBase64(\"12345678\")", ""));
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ assertEquals("12345678", result.result);
+ System.out.printf("Run %2d: %.3f ms - Result: %s%n", i, executionTimeMs, result.result);
+ }
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithInputProperties() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ String encodedIn0 = Base64.getEncoder().encodeToString("1234567".getBytes());
+ Result result = Result.parse(runtime.run("in0.column_1[0] as Number",
+ "{\"in0\": " +
+ "{\"content\": \"" + encodedIn0 + "\", " +
+ "\"mimeType\": \"application/csv\", " +
+ "\"properties\": {\"header\": false, \"separator\": \"4\"}}}"));
+ assertEquals("567", result.result);
+
+ }
+
+ @Test
+ void streamSimpleScript() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming simple script:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("sqrt(144)", null);
+ assertFalse(session.isError(), "Expected successful session");
+ assertNull(session.getError());
+ assertNotNull(session.getMimeType());
+
+ byte[] buf = new byte[64];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("12", result);
+ StreamSession.close(session.register()); // clean up handle
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming simple script passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamWithInputs() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming with inputs:");
+ System.out.println("=".repeat(50));
+
+ String inputsJson = String.format(
+ "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " +
+ "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(25), encode(17)
+ );
+
+ StreamSession session = runtime.runStreaming("num1 + num2", inputsJson);
+ assertFalse(session.isError());
+
+ byte[] buf = new byte[64];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("42", result);
+ StreamSession.close(session.register());
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming with inputs passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamChunkedRead() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming chunked read:");
+ System.out.println("=".repeat(50));
+
+ String script = "output application/json\n---\n{items: (1 to 100) map {id: $, name: \"item_\" ++ $}}";
+
+ StreamSession session = runtime.runStreaming(script, null);
+ assertFalse(session.isError());
+
+ byte[] smallBuf = new byte[32];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ int chunkCount = 0;
+ while ((n = session.read(smallBuf, smallBuf.length)) > 0) {
+ bos.write(smallBuf, 0, n);
+ chunkCount++;
+ }
+ String result = bos.toString(session.getCharset());
+ assertTrue(chunkCount > 1, "Expected multiple chunks, got " + chunkCount);
+ assertTrue(result.contains("item_1"));
+ assertTrue(result.contains("item_100"));
+ StreamSession.close(session.register());
+
+ System.out.printf("Read %d chunks, total %d bytes%n", chunkCount, bos.size());
+ System.out.println("✓ Streaming chunked read passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamWithStreamingInput() throws Exception {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming with streaming input:");
+ System.out.println("=".repeat(50));
+
+ // Create an input stream session for JSON data
+ InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8");
+ long inputHandle = inputSession.register();
+
+ // Build inputs JSON referencing the streamHandle
+ String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}";
+
+ // The DW engine will read from the PipedInputStream on the main thread,
+ // so we must feed data from a separate thread.
+ CountDownLatch started = new CountDownLatch(1);
+ AtomicReference feedError = new AtomicReference<>();
+
+ Thread feeder = new Thread(() -> {
+ try {
+ started.countDown();
+ String jsonData = "{\"name\": \"Alice\", \"age\": 30}";
+ byte[] bytes = jsonData.getBytes("UTF-8");
+ inputSession.write(bytes, bytes.length);
+ inputSession.closeWriter();
+ } catch (Exception e) {
+ feedError.set(e);
+ }
+ });
+ feeder.start();
+ started.await();
+
+ // Run streaming with the piped input
+ StreamSession session = runtime.runStreaming("output application/json\n---\npayload.name", inputsJson);
+ assertFalse(session.isError(), "Expected successful session but got: " + session.getError());
+
+ byte[] buf = new byte[64];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("\"Alice\"", result);
+ StreamSession.close(session.register());
+ InputStreamSession.close(inputHandle);
+ feeder.join(5000);
+ assertNull(feedError.get(), "Feeder thread threw: " + feedError.get());
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming with streaming input passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamWithLargeStreamingInput() throws Exception {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming with large streaming input:");
+ System.out.println("=".repeat(50));
+
+ InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8");
+ long inputHandle = inputSession.register();
+
+ String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}";
+
+ // Feed a large JSON array from a separate thread
+ AtomicReference feedError = new AtomicReference<>();
+ Thread feeder = new Thread(() -> {
+ try {
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 1; i <= 1000; i++) {
+ if (i > 1) sb.append(",");
+ sb.append("{\"id\":").append(i).append(",\"val\":\"item_").append(i).append("\"}");
+ }
+ sb.append("]");
+ byte[] bytes = sb.toString().getBytes("UTF-8");
+ // Write in chunks to simulate streaming
+ int chunkSize = 4096;
+ for (int off = 0; off < bytes.length; off += chunkSize) {
+ int len = Math.min(chunkSize, bytes.length - off);
+ byte[] chunk = new byte[len];
+ System.arraycopy(bytes, off, chunk, 0, len);
+ inputSession.write(chunk, len);
+ }
+ inputSession.closeWriter();
+ } catch (Exception e) {
+ feedError.set(e);
+ }
+ });
+ feeder.start();
+
+ StreamSession session = runtime.runStreaming("output application/json\n---\nsizeOf(payload)", inputsJson);
+ assertFalse(session.isError(), "Expected successful session but got: " + session.getError());
+
+ byte[] buf = new byte[256];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("1000", result);
+ StreamSession.close(session.register());
+ InputStreamSession.close(inputHandle);
+ feeder.join(10000);
+ assertNull(feedError.get(), "Feeder thread threw: " + feedError.get());
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming with large streaming input passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamErrorSession() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming error session:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("invalid syntax here", null);
+ assertTrue(session.isError());
+ assertNotNull(session.getError());
+ assertTrue(session.getError().contains("Unable to resolve reference"));
+
+ System.out.println("Error: " + session.getError());
+ System.out.println("✓ Streaming error session passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ // ── Callback-based streaming pattern tests ──────────────────────────
+
+ @Test
+ void callbackOutputStreaming() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing callback-based output streaming:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("output application/json\n---\n{items: (1 to 50) map {id: $}}", null);
+ assertFalse(session.isError());
+
+ // Simulate the write-callback pattern: read chunks and collect them
+ ByteArrayOutputStream collected = new ByteArrayOutputStream();
+ byte[] buf = new byte[64];
+ int callbackCount = 0;
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ // This is what the write callback would receive
+ collected.write(buf, 0, n);
+ callbackCount++;
+ }
+ String result = collected.toString(session.getCharset());
+ assertTrue(result.contains("\"id\": 1"), "Expected id 1 in result");
+ assertTrue(result.contains("\"id\": 50"), "Expected id 50 in result");
+ assertTrue(callbackCount > 0, "Expected at least one callback invocation");
+ StreamSession.close(session.register());
+
+ System.out.printf("Callback invoked %d times, total %d bytes%n", callbackCount, collected.size());
+ System.out.println("✓ Callback output streaming passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void callbackInputOutputStreaming() throws Exception {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing callback-based input+output streaming:");
+ System.out.println("=".repeat(50));
+
+ // Simulate the read-callback pattern: a feeder thread pulls from a data source
+ // and pushes into an InputStreamSession, while the main thread reads the output.
+ InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8");
+ long inputHandle = inputSession.register();
+
+ String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}";
+
+ // Simulate read callback: feeds a JSON array in chunks
+ byte[] sourceData = "[10, 20, 30, 40, 50]".getBytes("UTF-8");
+ AtomicReference feedError = new AtomicReference<>();
+
+ Thread feeder = new Thread(() -> {
+ try {
+ int chunkSize = 8;
+ for (int off = 0; off < sourceData.length; off += chunkSize) {
+ int len = Math.min(chunkSize, sourceData.length - off);
+ byte[] chunk = new byte[len];
+ System.arraycopy(sourceData, off, chunk, 0, len);
+ inputSession.write(chunk, len);
+ }
+ inputSession.closeWriter();
+ } catch (Exception e) {
+ feedError.set(e);
+ }
+ }, "test-read-callback-feeder");
+ feeder.start();
+
+ StreamSession session = runtime.runStreaming("output application/json\n---\npayload map ($ * 2)", inputsJson);
+ assertFalse(session.isError(), "Expected successful session but got: " + session.getError());
+
+ // Simulate write callback: collect output chunks
+ ByteArrayOutputStream collected = new ByteArrayOutputStream();
+ byte[] buf = new byte[32];
+ int callbackCount = 0;
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ collected.write(buf, 0, n);
+ callbackCount++;
+ }
+ String result = collected.toString(session.getCharset());
+ assertTrue(result.contains("20"), "Expected 20 in result (10*2)");
+ assertTrue(result.contains("100"), "Expected 100 in result (50*2)");
+
+ StreamSession.close(session.register());
+ InputStreamSession.close(inputHandle);
+ feeder.join(5000);
+ assertNull(feedError.get(), "Feeder thread threw: " + feedError.get());
+
+ System.out.printf("Read callback fed %d bytes, write callback invoked %d times, output: %s%n",
+ sourceData.length, callbackCount, result.trim());
+ System.out.println("✓ Callback input+output streaming passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void callbackOutputStreamingError() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing callback-based output streaming with error:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("invalid syntax here", null);
+ assertTrue(session.isError());
+ assertNotNull(session.getError());
+
+ System.out.println("Error correctly returned: " + session.getError());
+ System.out.println("✓ Callback output streaming error passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ static class Result {
+ boolean success;
+ String result;
+ String error;
+ boolean binary;
+ String mimeType;
+ String charset;
+
+ static Result parse(String json) {
+ Result result = new Result();
+ org.json.JSONObject obj = new org.json.JSONObject(json);
+
+ result.success = obj.getBoolean("success");
+ if (result.success) {
+ result.binary = obj.getBoolean("binary");
+ result.mimeType = obj.getString("mimeType");
+ result.charset = obj.getString("charset");
+ String encoded = obj.getString("result");
+ if (result.binary) {
+ result.result = encoded;
+ } else {
+ result.result = new String(Base64.getDecoder().decode(encoded), Charset.forName(result.charset));
+ }
+ } else {
+ result.error = obj.getString("error");
+ }
+ return result;
+ }
+ }
+
+}
diff --git a/settings.gradle b/settings.gradle
index a47c02c..befbb96 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,3 +1,3 @@
include 'native-cli'
include 'native-cli-integration-tests'
-
+include 'native-lib'