diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e7a5b58..c439811 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -60,9 +60,37 @@ jobs: run: ./gradlew --stacktrace --no-problems-report native-cli:distro shell: bash + # Install Python build dependencies (setuptools/wheel may be missing on Windows runners) + - name: Install Python build dependencies + run: python3 -m pip install --upgrade setuptools wheel + shell: bash + + # Generate native-lib python wheel + - name: Create Native Lib Python Wheel + run: ./gradlew --stacktrace --no-problems-report native-lib:buildPythonWheel + shell: bash + # Upload the artifact file - name: Upload generated script uses: actions/upload-artifact@v4 with: name: dw-${{env.NATIVE_VERSION}}-${{runner.os}} path: native-cli/build/distributions/native-cli-${{env.NATIVE_VERSION}}-native-distro-${{ matrix.script_name }}.zip + + # Upload the Python wheel + - name: Upload Python wheel + uses: actions/upload-artifact@v4 + with: + name: dw-python-wheel-${{env.NATIVE_VERSION}}-${{runner.os}} + path: native-lib/python/dist/dataweave_native-0.0.1-py3-*.whl + + # Upload the native shared library + header together per OS + - name: Upload native shared library + uses: actions/upload-artifact@v4 + with: + name: dwlib-${{env.NATIVE_VERSION}}-${{runner.os}} + path: | + native-lib/python/src/dataweave/native/dwlib.dylib + native-lib/python/src/dataweave/native/dwlib.so + native-lib/python/src/dataweave/native/dwlib.dll + native-lib/python/src/dataweave/native/dwlib.h diff --git a/.gitignore b/.gitignore index 6091d76..327b866 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,9 @@ out/ .DS_Store # GraalVM -.graalvm \ No newline at end of file +.graalvm + +grimoires/ + +.windsurf/ +.claude/ diff --git a/build.gradle b/build.gradle index 9792b28..944d683 100644 --- a/build.gradle +++ b/build.gradle @@ -1,3 +1,13 @@ +buildscript { + repositories { + gradlePluginPortal() + mavenCentral() + } + dependencies { + classpath "org.graalvm.buildtools.native:org.graalvm.buildtools.native.gradle.plugin:0.11.2" + } +} + plugins { id "scala" id "maven-publish" @@ -11,6 +21,8 @@ subprojects { apply plugin: 'maven-publish' apply plugin: 'scala' + apply plugin: 'org.graalvm.buildtools.native' + group = 'org.mule.weave.native' version = nativeVersion @@ -21,8 +33,8 @@ subprojects { compileJava { - sourceCompatibility = '11' - targetCompatibility = '11' + sourceCompatibility = '17' + targetCompatibility = '17' } repositories { diff --git a/gradle.properties b/gradle.properties index 01b5261..b3d45c4 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,10 +1,10 @@ -weaveVersion=2.11.0-20251023 -weaveTestSuiteVersion=2.11.0-20251023 +weaveVersion=2.12.0-20260413 +weaveTestSuiteVersion=2.12.0-20260413 nativeVersion=100.100.100 scalaVersion=2.12.18 -ioVersion=2.11.0-SNAPSHOT +ioVersion=2.12.0-20260408 graalvmVersion=24.0.2 -weaveSuiteVersion=2.11.0-20251023 +weaveSuiteVersion=2.12.0-20260413 #Libaries scalaTestVersion=3.2.15 scalaTestPluginVersion=0.33 diff --git a/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala b/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala index 29a1ac0..66c23ba 100644 --- a/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala +++ b/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala @@ -259,6 +259,7 @@ class TCKCliTest extends AnyFunSpec with Matchers "lazy_metadata_definition", "module-singleton", "multipart-write-binary", + "private_scope_directives", "read-binary-files", "underflow", "try", @@ -345,12 +346,16 @@ class TCKCliTest extends AnyFunSpec with Matchers baseArray ++ Array( "math-toRadians", - "try-handle-lazy-values-with-failures" + "try-handle-lazy-values-with-failures", + "weave_ast_example", + "weave_ast_module" ) } else if (versionString == "2.10") { baseArray ++ Array( - "try-handle-lazy-values-with-failures" + "try-handle-lazy-values-with-failures", + "weave_ast_example", + "weave_ast_module" ) } else { baseArray diff --git a/native-cli/build.gradle b/native-cli/build.gradle index b567687..5ce3b5e 100644 --- a/native-cli/build.gradle +++ b/native-cli/build.gradle @@ -2,8 +2,6 @@ plugins { id "com.github.maiflai.scalatest" version "${scalaTestPluginVersion}" id 'application' - // Apply GraalVM Native Image plugin - id 'org.graalvm.buildtools.native' version '0.11.2' } sourceSets { diff --git a/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala b/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala index 6a80b38..f5fbca8 100644 --- a/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala +++ b/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala @@ -211,9 +211,3 @@ case class WeaveFailureResult(message: String) extends WeaveExecutionResult { override def result(): String = message } - - -class CustomWeaveDataFormat(moduleManager: ModuleLoaderManager) extends WeaveDataFormat { - override def createModuleLoader(): ModuleLoaderManager = moduleManager -} - diff --git a/native-lib/.gitignore b/native-lib/.gitignore new file mode 100644 index 0000000..0e845cc --- /dev/null +++ b/native-lib/.gitignore @@ -0,0 +1,4 @@ +python/src/dataweave/native/ +python/src/dataweave_native.egg-info/ +python/dist/ +python/build/ diff --git a/native-lib/README.md b/native-lib/README.md new file mode 100644 index 0000000..e829ecf --- /dev/null +++ b/native-lib/README.md @@ -0,0 +1,323 @@ +# native-lib + +## Overview + +`native-lib` builds a **GraalVM native shared library** that embeds the MuleSoft **DataWeave runtime** and exposes a small C-compatible API. + +The main purpose is to allow non-JVM consumers (most notably the Python package in `native-lib/python`) to execute DataWeave scripts **without running a JVM**, while still using the official DataWeave runtime. + +## Architecture (GraalVM + FFI) + +``` +┌─────────────────────────────────────────────┐ +│ Python Process │ +│ │ +│ ┌────────────────────────────────────────┐ │ +│ │ Application Script │ │ +│ │ - Python: ctypes │ │ +│ └──────────────┬─────────────────────────┘ │ +│ │ │ +│ │ FFI Call │ +│ ▼ │ +│ ┌────────────────────────────────────────┐ │ +│ │ Native Shared Library (dwlib) │ │ +│ │ ┌──────────────────────────────────┐ │ │ +│ │ │ GraalVM Isolate │ │ │ +│ │ │ - NativeLib.run_script() │ │ │ +│ │ │ - DataWeave script execution │ │ │ +│ │ └──────────────────────────────────┘ │ │ +│ └────────────────────────────────────────┘ │ +└─────────────────────────────────────────────┘ +``` + +## Building with Gradle + +### Prerequisites + +- A GraalVM distribution installed that includes `native-image`. +- Enough memory for native-image (this build config uses `-J-Xmx6G`). + +### Build the shared library + +From the repository root: + +```bash +./gradlew :native-lib:nativeCompile +``` + +The shared library is produced under: + +- `native-lib/build/native/nativeCompile/` + +and is named: + +- macOS: `dwlib.dylib` +- Linux: `dwlib.so` +- Windows: `dwlib.dll` + +### Stage the library into the Python package (dev workflow) + +```bash +./gradlew :native-lib:stagePythonNativeLib +``` + +This copies `dwlib.*` into: + +- `native-lib/python/src/dataweave/native/` + +### Build a Python wheel (bundles the native library) + +```bash +./gradlew :native-lib:buildPythonWheel +``` + +The wheel will be created in: + +- `native-lib/python/dist/` + +## Installing for use in a Python project + +### Option A: Install the produced wheel (recommended) + +After `:native-lib:buildPythonWheel`: + +```bash +python3 -m pip install native-lib/python/dist/dataweave_native-0.0.1-*.whl +``` + +This wheel includes the `dwlib.*` shared library inside the Python package. + +### Option B: Editable install for development + +1. Stage the native library: + +```bash +./gradlew :native-lib:stagePythonNativeLib +``` + +2. Install the Python package in editable mode: + +```bash +python3 -m pip install -e native-lib/python +``` + +### Option C: Use an externally-built library via an environment variable + +If you want to point Python at a specific built artifact, set: + +- `DATAWEAVE_NATIVE_LIB=/absolute/path/to/dwlib.(dylib|so|dll)` + +The Python module will also try a few fallbacks (including the wheel-bundled location). + +## Using the library (Python examples) + +All examples below assume: + +```python +import dataweave +``` + +### 1) Simple script + +```python +result = dataweave.run("2 + 2") +assert result.success is True +print(result.get_string()) # "4" +``` + +### 2) Script with inputs (auto-detected types) + +Inputs can be plain Python values. The module auto-encodes them as JSON or text. + +```python +result = dataweave.run( + "num1 + num2", + {"num1": 25, "num2": 17}, +) +print(result.get_string()) # "42" +``` + +### 3) Script with inputs (explicit mime type, charset, properties) + +Use an explicit input dict when you need full control over how DataWeave interprets bytes. + +```python +script = "payload.person" +xml_bytes = b"Billy31".decode("utf-8").encode("utf-16") + +result = dataweave.run( + script, + { + "payload": { + "content": xml_bytes, + "mimeType": "application/xml", + "charset": "UTF-16", + "properties": { + "nullValueOn": "empty", + "maxAttributeSize": 256 + }, + } + }, +) + +if result.success: + print(result.get_string()) +else: + print(result.error) +``` + +You can also use `InputValue` for the same purpose: + +```python +input_value = dataweave.InputValue( + content="1234567", + mime_type="application/csv", + properties={"header": False, "separator": "4"}, +) + +result = dataweave.run("in0.column_1[0]", {"in0": input_value}) +print(result.get_string()) # '"567"' +``` + +### 4) Context manager (explicit lifecycle) + +The module-level API (`dataweave.run(...)`) uses a shared singleton. Use `DataWeave` directly when you need explicit control over isolate lifecycle or want multiple independent instances: + +```python +with dataweave.DataWeave() as dw: + r1 = dw.run("2 + 2") + r2 = dw.run("x + y", {"x": 10, "y": 32}) + + print(r1.get_string()) # "4" + print(r2.get_string()) # "42" +``` + +### 5) Error handling + +There are three error types: + +- `DataWeaveLibraryNotFoundError` — the native library cannot be located/loaded. +- `DataWeaveScriptError` — script compilation or runtime error (subclass of `DataWeaveError`). Carries the full result on `.result`. +- `DataWeaveError` — FFI-level failures (isolate creation, library calls). + +**Option A: Use `raise_on_error=True` for a single try/except (recommended)** + +```python +try: + result = dataweave.run("invalid syntax here", raise_on_error=True) + print(result.get_string()) + +except dataweave.DataWeaveScriptError as e: + print(f"Script error: {e.result.error}") + +except dataweave.DataWeaveLibraryNotFoundError: + # Build it first: ./gradlew :native-lib:nativeCompile + raise +``` + +**Option B: Check `result.success` manually (default, backward-compatible)** + +```python +result = dataweave.run("invalid syntax here") + +if not result.success: + print(f"Error: {result.error}") +else: + print(result.get_string()) +``` + +### 6) Output streaming + +Use `run_streaming` to execute a script and receive output chunks as they are produced, without buffering the entire result in memory. + +```python +with dataweave.DataWeave() as dw: + stream = dw.run_streaming("output application/json --- (1 to 10000) map {id: $}") + for chunk in stream: + sys.stdout.buffer.write(chunk) + metadata = stream.metadata # StreamingResult with mime_type, charset, etc. + print(f"\nDone: {metadata.mime_type}, {metadata.charset}") +``` + +Or with the module-level API: + +```python +stream = dataweave.run_streaming("output application/csv --- payload", {"payload": [1, 2, 3]}) +output = b"".join(stream) +``` + +### 7) Input and output streaming + +Use `run_transform` to stream both input and output — feed an iterable of bytes in, receive a generator of bytes out. Ideal for processing large files or network streams with constant memory. + +```python +# Stream a file through DataWeave +with open("large.json", "rb") as f: + stream = dataweave.run_transform( + "output application/csv --- payload", + input_stream=iter(lambda: f.read(8192), b""), + input_mime_type="application/json", + ) + with open("output.csv", "wb") as out: + for chunk in stream: + out.write(chunk) + metadata = stream.metadata +``` + +Works with any iterable — generators, lists, network sockets: + +```python +# From an in-memory list +stream = dataweave.run_transform( + "output application/json --- payload map ($ * $)", + input_stream=[b"[1,2,3,4,5]"], + input_mime_type="application/json", +) +print(b"".join(stream)) # [1,4,9,16,25] +``` + +```python +# From a generator producing chunks +def read_from_network(sock): + while chunk := sock.recv(4096): + yield chunk + +stream = dataweave.run_transform( + "output application/json --- sizeOf(payload)", + input_stream=read_from_network(conn), + input_mime_type="application/json", +) +for chunk in stream: + process(chunk) +``` + +### 8) I/O streaming with callbacks (low-level) + +Use `run_input_output_callback` when you need direct callback control (e.g. integration with event-driven frameworks). For most use cases, prefer `run_transform` above. + +```python +json_input = b'[1,2,3,4,5]' +pos = 0 + +def read_cb(buf_size): + nonlocal pos + chunk = json_input[pos:pos + buf_size] + pos += len(chunk) + return chunk # return b"" when done + +chunks = [] +def write_cb(data): + chunks.append(data) + return 0 # 0 = success + +result = dataweave.run_input_output_callback( + "output application/json deferred=true --- payload map ($ * $)", + input_name="payload", + input_mime_type="application/json", + read_callback=read_cb, + write_callback=write_cb, +) + +print(result) # StreamingResult(success=True, ...) +print(b"".join(chunks)) # [1,4,9,16,25] +``` diff --git a/native-lib/build.gradle b/native-lib/build.gradle new file mode 100644 index 0000000..e495f06 --- /dev/null +++ b/native-lib/build.gradle @@ -0,0 +1,116 @@ +dependencies { + api group: 'org.mule.weave', name: 'runtime', version: weaveVersion + api group: 'org.mule.weave', name: 'core-modules', version: weaveVersion + + implementation group: 'org.mule.weave', name: 'parser', version: weaveVersion + implementation group: 'org.mule.weave', name: 'wlang', version: weaveVersion + compileOnly group: 'org.graalvm.sdk', name: 'graal-sdk', version: graalvmVersion + compileOnly group: 'org.graalvm.nativeimage', name: 'svm', version: graalvmVersion + + implementation "org.scala-lang:scala-library:${scalaVersion}" + implementation 'org.json:json:20240303' + + testImplementation platform('org.junit:junit-bom:5.10.0') + testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' +} + +test { + useJUnitPlatform() +} + +tasks.matching { it.name == 'nativeCompileClasspathJar' }.configureEach { t -> + t.exclude('META-INF/services/org.mule.weave.v2.module.DataFormat') + t.from("${projectDir}/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat") { + into('META-INF/services') + rename { 'org.mule.weave.v2.module.DataFormat' } + } + t.exclude('META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader') + t.from("${projectDir}/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader") { + into('META-INF/services') + rename { 'org.mule.weave.v2.parser.phase.ModuleLoader' } + } +} + +// Configure GraalVM native-image to build a shared library +graalvmNative { +// toolchainDetection = true + binaries { + main { + sharedLibrary = true + debug = true + verbose = true + fallback = false + //agent = false + useFatJar = true + //buildArgs.add('-Ob') // quick build mode to speed up builds during development + buildArgs.add('--no-fallback') + buildArgs.add('-H:Name=dwlib') + buildArgs.add('--verbose') + buildArgs.add("--report-unsupported-elements-at-runtime") + buildArgs.add("-J-Xmx6G") + + buildArgs.add("-H:+ReportExceptionStackTraces") + buildArgs.add("-H:+UnlockExperimentalVMOptions") + buildArgs.add("--initialize-at-build-time=sun.instrument.InstrumentationImpl") + buildArgs.add("-H:DeadlockWatchdogInterval=1000") + buildArgs.add("-H:CompilationExpirationPeriod=0") + buildArgs.add("-H:+AddAllCharsets") + buildArgs.add("-H:+IncludeAllLocales") + // Pass project directory as system property for header path resolution + buildArgs.add("-Dproject.root=${projectDir}") + } + } +} + +def pythonExe = (project.findProperty('pythonExe') ?: 'python3') as String + +tasks.register('stagePythonNativeLib', Copy) { + dependsOn tasks.named('nativeCompile') + from("${buildDir}/native/nativeCompile") { + include('dwlib.*') + } + into("${projectDir}/python/src/dataweave/native") +} + +tasks.register('buildPythonWheel', Exec) { + dependsOn tasks.named('stagePythonNativeLib') + workingDir("${projectDir}/python") + // Track inputs so Gradle rebuilds when native lib or Python sources change + inputs.dir("${projectDir}/python/src/dataweave") + inputs.file("${projectDir}/python/setup.py") + inputs.file("${projectDir}/python/setup.cfg") + inputs.file("${projectDir}/python/pyproject.toml") + outputs.dir("${projectDir}/python/dist") + doFirst { + // Clean old wheels and build artifacts to ensure fresh build + delete("${projectDir}/python/dist") + delete("${projectDir}/python/build") + file("${projectDir}/python/dist").mkdirs() + } + // Use setup.py bdist_wheel to generate platform-specific wheel tags + // The custom setup.py overrides bdist_wheel to set correct platform tags + // for the bundled native library (dwlib) + commandLine(pythonExe, 'setup.py', 'bdist_wheel', '-d', 'dist') +} + +tasks.register('pythonTest', Exec) { + if (project.findProperty('skipPythonTests')?.toString()?.toBoolean() == true) { + enabled = false + } + + dependsOn tasks.named('stagePythonNativeLib') + workingDir("${projectDir}/python") + commandLine(pythonExe, 'tests/test_dataweave_module.py') +} + +tasks.named('test') { + dependsOn tasks.named('pythonTest') +} + +tasks.named('clean') { + delete("${projectDir}/python/dist") + delete("${projectDir}/python/build") + delete("${projectDir}/python/src/dataweave/native") + delete("${projectDir}/python/src/dataweave_native.egg-info") +} diff --git a/native-lib/example_dataweave_module.py b/native-lib/example_dataweave_module.py new file mode 100755 index 0000000..d1740a2 --- /dev/null +++ b/native-lib/example_dataweave_module.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +""" +Example demonstrating the simplified DataWeave Python module. + +This shows how easy it is to use DataWeave without dealing with +any GraalVM or native library complexity. +""" + +import sys +from pathlib import Path + +_PYTHON_SRC_DIR = Path(__file__).resolve().parent / "python" / "src" +sys.path.insert(0, str(_PYTHON_SRC_DIR)) + +import dataweave + +def example_simple_functions(): + """Example using simple function API""" + print("="*70) + print("Example 1: Simple Function API") + print("="*70) + + ok = True + + # Simple script execution + print("\n[*] Simple arithmetic:") + script = "2 + 2" + result = dataweave.run_script(script) + ok = assert_result(script, result, "4") and ok + + print("\n[*] Square root:") + script = "sqrt(144)" + result = dataweave.run_script(script) + ok = assert_result(script, result, "12") and ok + + print("\n[*] Array operations:") + script = "[1, 2, 3] map $ * 2" + result = dataweave.run_script(script) + ok = assert_result(script, result, "[\n 2, \n 4, \n 6\n]") and ok + + print("\n[*] String operations:") + script = "upper('hello world')" + result = dataweave.run_script(script) + ok = assert_result(script, result, '"HELLO WORLD"') and ok + + # Script with inputs (simple values - auto-converted) + print("\n[*] Script with inputs (auto-converted):") + script = "num1 + num2" + result = dataweave.run_script(script, {"num1": 25, "num2": 17}) + ok = assert_result(script, result, "42") and ok + + # Script with complex inputs + print("\n[*] Script with complex object:") + script = "payload.name" + result = dataweave.run_script(script, {"payload": {"content": '{"name": "John", "age": 30}', "mimeType": "application/json"}}) + ok = assert_result(script, result, '"John"') and ok + + # Script with mixed input types + print("\n[*] Script with mixed input types:") + script = "greeting ++ ' ' ++ payload.name" + result = dataweave.run_script(script, {"greeting": "Hello", "payload": {"content": '{"name": "Alice", "role": "Developer"}', "mimeType": "application/json"}}) + ok = assert_result(script, result, '"Hello Alice"') and ok + + # Binary output + print("\n[*] Binary output:") + script = "output application/octet-stream\n---\ndw::core::Binaries::fromBase64(\"holamund\")" + result = dataweave.run_script(script) + ok = assert_result(script, result, "holamund") and ok + + # Script with InputValue + print("\n[*] Inputs:") + input_value = dataweave.InputValue( + content="1234567", + mimeType="application/csv", + properties={"header": False, "separator": "4"} + ) + script = "in0.column_1[0]" + result = dataweave.run_script(script, {"in0": input_value}) + ok = assert_result(script, result, '"567"') and ok + + # Cleanup when done + dataweave.cleanup() + print("\n[OK] Cleanup completed") + + return ok + + +def assert_result(script, result, expected): + print(f" {script} = {result}") + ok = result.get_string() == expected + if ok: + status = "[OK]" + else: + status = f"[FAIL] (expected: {expected})" + print(f" result as string = {result.get_string()} {status}") + print(f" result as bytes = {result.get_bytes()}") + return ok + + +def example_context_manager(): + """Example using context manager (recommended)""" + print("\n" + "="*70) + print("Example 2: Context Manager API (Recommended)") + print("="*70) + + ok = True + + with dataweave.DataWeave() as dw: + print("\n[*] Multiple operations with same runtime:") + + script = "2 + 2" + result = dw.run(script) + ok = assert_result(script, result, "4") and ok + + script = "x + y + z" + result = dw.run(script, {"x": 1, "y": 2, "z": 3}) + ok = assert_result(script, result, "6") and ok + + script = "numbers map $ * multiplier" + result = dw.run(script, {"numbers": [1, 2, 3, 4, 5], "multiplier": 10}) + ok = assert_result(script, result, "[\n 10, \n 20, \n 30, \n 40, \n 50\n]") and ok + + print("\n[OK] Context manager automatically cleaned up resources") + + return ok + + +def example_explicit_format(): + """Example using explicit content/mimeType format""" + print("\n" + "="*70) + print("Example 3: Explicit Format (Advanced)") + print("="*70) + + print("\n[*] Using explicit content and mimeType:") + + ok = True + + script = "payload.message" + result = dataweave.run_script(script, {"payload": {"content": '{"message": "Hello from JSON!", "value": 42}', "mimeType": "application/json"}}) + ok = assert_result(script, result, '"Hello from JSON!"') and ok + + script = "payload.value + offset" + result = dataweave.run_script(script, {"payload": {"content": '{"value": 100}', "mimeType": "application/json"}, "offset": 50}) + ok = assert_result(script, result, "150") and ok + + return ok + + +def example_error_handling(): + """Example with error handling""" + print("\n" + "="*70) + print("Example 4: Error Handling") + print("="*70) + + try: + print("\n[*] Invalid script (will show error):") + result = dataweave.run_script("invalid syntax here", {}) + print(f" Result: {result} {'[OK]' if result.success == False else '[FAIL]'}") + + except dataweave.DataWeaveLibraryNotFoundError as e: + print(f"[ERROR] Library not found: {e}") + print(" Please build the library first: ./gradlew nativeCompile") + except dataweave.DataWeaveError as e: + print(f"[ERROR] DataWeave error: {e}") + + +def main(): + """Run all examples""" + print("\n" + "="*70) + print("DataWeave Python Module - Examples") + print("="*70) + print("\nThis module abstracts all GraalVM/native complexity!") + print("Just import and use - no ctypes, no manual memory management.\n") + + try: + all_ok = True + all_ok = example_simple_functions() and all_ok + all_ok = example_context_manager() and all_ok + all_ok = example_explicit_format() and all_ok + example_error_handling() + + print("\n" + "="*70) + if all_ok: + print("[OK] All examples completed successfully!") + else: + print("[FAIL] One or more examples failed") + print("="*70) + + except dataweave.DataWeaveLibraryNotFoundError as e: + print(f"\n[ERROR] {e}") + print("\nPlease build the native library first:") + print(" ./gradlew nativeCompile") + except Exception as e: + print(f"\n[ERROR] Unexpected error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/native-lib/example_streaming.py b/native-lib/example_streaming.py new file mode 100755 index 0000000..9e55ba9 --- /dev/null +++ b/native-lib/example_streaming.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 + +import sys +from pathlib import Path + +_PYTHON_SRC_DIR = Path(__file__).resolve().parent / "python" / "src" +sys.path.insert(0, str(_PYTHON_SRC_DIR)) + +import dataweave +import resource +import psutil, os +import time + +def example_streaming_input_output_callback(): + print("\nTesting streaming input and output using callbacks (square numbers)...") + try: + start_time = time.monotonic() + num_elements = 1_000_000 * 50 + + script = """output application/json deferred=true +--- +payload map ($ * $)""" + + # -- input generator (called by native on a background thread) -- + input_iter = iter(range(num_elements)) + input_started = False + input_done = False + pending_token = None + + def read_callback(buf_size): + nonlocal input_started, input_done, pending_token + if input_done: + return b"" + parts = [] + if not input_started: + parts.append(b"[") + input_started = True + remaining = buf_size - sum(len(p) for p in parts) + if pending_token is not None: + if len(pending_token) <= remaining: + parts.append(pending_token) + remaining -= len(pending_token) + pending_token = None + else: + return b"".join(parts) + try: + while remaining > 0: + i = next(input_iter) + token = (b"," if i > 0 else b"") + str(i).encode("utf-8") + if len(token) > remaining: + pending_token = token + break + parts.append(token) + remaining -= len(token) + except StopIteration: + parts.append(b"]") + input_done = True + return b"".join(parts) + + # -- output collector (called by native on the calling thread) -- + chunk_count = 0 + total_bytes = 0 + + def write_callback(data): + nonlocal chunk_count, total_bytes + chunk_count += 1 + total_bytes += len(data) + if chunk_count % 5000 == 0: + usage = resource.getrusage(resource.RUSAGE_SELF) + current_rss = psutil.Process(os.getpid()).memory_info().rss + print(f"--- chunk {chunk_count}: {len(data)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---") + return 0 + + usage = resource.getrusage(resource.RUSAGE_SELF) + current_rss = psutil.Process(os.getpid()).memory_info().rss + print(f">>> Before run_input_output_callback, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---") + + result = dataweave.run_input_output_callback( + script, + input_name="payload", + input_mime_type="application/json", + read_callback=read_callback, + write_callback=write_callback, + input_charset="utf-8", + ) + + if not result.success: + raise Exception(result.error or "Unknown error") + + peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576 + elapsed = time.monotonic() - start_time + mins, secs = divmod(elapsed, 60) + print(f"\n[OK] Streaming input/output callback done ({chunk_count} chunks, {total_bytes / 1048576:.1f} MB, {num_elements:,} elements) - Time: {int(mins)}:{secs:06.3f}") + print(f"Peak memory (max RSS): {peak_rss:.1f} MB") + return True + except Exception as e: + print(f"[FAIL] Streaming input/output callback failed: {e}") + import traceback + traceback.print_exc() + return False + + +def example_streaming_run_transform(): + print("\nTesting streaming input and output using run_transform (square numbers)...") + try: + start_time = time.monotonic() + num_elements = 1_000_000 * 50 + + script = """output application/json deferred=true +--- +payload map ($ * $)""" + + # -- input as a generator of byte chunks -- + def input_chunks(): + buf_size = 8192 + input_iter = iter(range(num_elements)) + started = False + done = False + pending_token = None + + while not done: + parts = [] + if not started: + parts.append(b"[") + started = True + remaining = buf_size - sum(len(p) for p in parts) + if pending_token is not None: + if len(pending_token) <= remaining: + parts.append(pending_token) + remaining -= len(pending_token) + pending_token = None + else: + yield b"".join(parts) + continue + try: + while remaining > 0: + i = next(input_iter) + token = (b"," if i > 0 else b"") + str(i).encode("utf-8") + if len(token) > remaining: + pending_token = token + break + parts.append(token) + remaining -= len(token) + except StopIteration: + parts.append(b"]") + done = True + if parts: + yield b"".join(parts) + + usage = resource.getrusage(resource.RUSAGE_SELF) + current_rss = psutil.Process(os.getpid()).memory_info().rss + print(f">>> Before run_transform, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---") + + stream = dataweave.run_transform( + script, + input_stream=input_chunks(), + input_mime_type="application/json", + input_charset="utf-8", + ) + + chunk_count = 0 + total_bytes = 0 + + for data in stream: + chunk_count += 1 + total_bytes += len(data) + if chunk_count % 5000 == 0: + usage = resource.getrusage(resource.RUSAGE_SELF) + current_rss = psutil.Process(os.getpid()).memory_info().rss + print(f"--- chunk {chunk_count}: {len(data)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---") + + if not stream.metadata.success: + raise Exception(stream.metadata.error) + + peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576 + elapsed = time.monotonic() - start_time + mins, secs = divmod(elapsed, 60) + print(f"\n[OK] run_transform done ({chunk_count} chunks, {total_bytes / 1048576:.1f} MB, {num_elements:,} elements) - Time: {int(mins)}:{secs:06.3f}") + print(f"Peak memory (max RSS): {peak_rss:.1f} MB") + return True + except Exception as e: + print(f"[FAIL] run_transform failed: {e}") + import traceback + traceback.print_exc() + return False + + +def doc_example(): + json_input = b'[1,2,3,4,5]' + pos = 0 + + def read_cb(buf_size): + nonlocal pos + chunk = json_input[pos:pos + buf_size] + pos += len(chunk) + return chunk # return b"" when done + + chunks = [] + def write_cb(data): + chunks.append(data) + return 0 # 0 = success + + result = dataweave.run_input_output_callback( + "output application/json deferred=true --- payload map ($ * $)", + input_name="payload", + input_mime_type="application/json", + read_callback=read_cb, + write_callback=write_cb, + ) + + print(result) # {"success": True} + print(b"".join(chunks)) # [1,4,9,16,25] + + +def main(): + print("=" * 70) + print("run_input_output_callback (low-level callbacks)") + print("=" * 70) + example_streaming_input_output_callback() + + print("\n") + print("=" * 70) + print("run_transform (Pythonic generator API)") + print("=" * 70) + example_streaming_run_transform() + + print("\n") + doc_example() + + +if __name__ == "__main__": + main() diff --git a/native-lib/python/pyproject.toml b/native-lib/python/pyproject.toml new file mode 100644 index 0000000..642ab3c --- /dev/null +++ b/native-lib/python/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" diff --git a/native-lib/python/setup.cfg b/native-lib/python/setup.cfg new file mode 100644 index 0000000..a1635ae --- /dev/null +++ b/native-lib/python/setup.cfg @@ -0,0 +1,18 @@ +[metadata] +name = dataweave-native +version = 0.0.1 +description = Python bindings for the DataWeave native library + +[options] +package_dir = + = src +packages = find: +include_package_data = True +python_requires = >=3.9 + +[options.packages.find] +where = src + +[options.package_data] +dataweave = + native/* diff --git a/native-lib/python/setup.py b/native-lib/python/setup.py new file mode 100644 index 0000000..8c32d49 --- /dev/null +++ b/native-lib/python/setup.py @@ -0,0 +1,109 @@ +""" +Custom setup.py to build platform-specific wheels for dataweave-native. + +Since this package bundles a native shared library (dwlib), the wheel must +be tagged with the correct platform (e.g., macosx_11_0_arm64, manylinux, etc.) +rather than the generic 'any' platform. + +This is achieved by overriding the bdist_wheel command to set platform-specific +tags based on the current build environment. +""" + +import platform +import struct +import sys + +from setuptools import setup + +try: + from wheel.bdist_wheel import bdist_wheel as _bdist_wheel +except ImportError: + _bdist_wheel = None + + +def get_platform_tag(): + """ + Determine the platform tag for the wheel based on the current system. + """ + system = platform.system().lower() + machine = platform.machine().lower() + + # Normalize machine architecture names + if machine in ("x86_64", "amd64"): + machine = "x86_64" + elif machine in ("arm64", "aarch64"): + machine = "arm64" + elif machine in ("i386", "i686"): + machine = "i686" + + if system == "darwin": + # macOS: use macosx_11_0 as minimum for universal compatibility + # Adjust based on actual deployment target if needed + mac_ver = platform.mac_ver()[0] + if mac_ver: + parts = mac_ver.split(".") + major = int(parts[0]) + minor = int(parts[1]) if len(parts) > 1 else 0 + # Use at least 11.0 for arm64, 10.9 for x86_64 + if machine == "arm64": + major = max(major, 11) + minor = 0 + else: + major = max(major, 10) + minor = max(minor, 9) if major == 10 else 0 + else: + major, minor = (11, 0) if machine == "arm64" else (10, 9) + + return f"macosx_{major}_{minor}_{machine}" + + elif system == "linux": + # Linux: use manylinux2014 for broad compatibility + # manylinux2014 supports glibc 2.17+ + return f"manylinux2014_{machine}" + + elif system == "windows": + # Windows: win_amd64 or win32 + bits = struct.calcsize("P") * 8 + if bits == 64: + return "win_amd64" + else: + return "win32" + + else: + # Fallback: use the platform module's platform tag + return None + + +if _bdist_wheel is not None: + + class bdist_wheel(_bdist_wheel): + """ + Custom bdist_wheel that forces platform-specific tags for native library wheels. + """ + + def finalize_options(self): + super().finalize_options() + # Mark as platform-specific (not pure Python) + self.root_is_pure = False + + def get_tag(self): + # Get the default tags + python, abi, plat = super().get_tag() + + # Override with platform-specific tag + platform_tag = get_platform_tag() + if platform_tag: + plat = platform_tag + + # Use py3 and none for Python/ABI since we don't have compiled Python extensions + return "py3", "none", plat + +else: + bdist_wheel = None + + +cmdclass = {} +if bdist_wheel is not None: + cmdclass["bdist_wheel"] = bdist_wheel + +setup(cmdclass=cmdclass) diff --git a/native-lib/python/src/dataweave/__init__.py b/native-lib/python/src/dataweave/__init__.py new file mode 100644 index 0000000..977361e --- /dev/null +++ b/native-lib/python/src/dataweave/__init__.py @@ -0,0 +1,1002 @@ +""" +DataWeave Python Module + +Execute DataWeave scripts from Python via a GraalVM native shared library. +Supports buffered execution, output streaming, and bidirectional streaming +with constant memory overhead. + +Basic usage: + import dataweave + + result = dataweave.run("2 + 2") + print(result.get_string()) # "4" + +Output streaming (yields chunks as produced): + stream = dataweave.run_streaming("output json --- (1 to 10000) map {id: $}") + for chunk in stream: + sys.stdout.buffer.write(chunk) + +Bidirectional streaming (iterable in, generator out): + with open("large.json", "rb") as f: + stream = dataweave.run_transform( + "output csv --- payload", + input_stream=iter(lambda: f.read(8192), b""), + input_mime_type="application/json", + ) + for chunk in stream: + process(chunk) + +Context manager (explicit lifecycle control): + from dataweave import DataWeave + + with DataWeave() as dw: + result = dw.run("2 + 2") + print(result.get_string()) + +Error handling: + try: + result = dataweave.run("invalid", raise_on_error=True) + except dataweave.DataWeaveScriptError as e: + print(e.result.error) + +Native resources are released automatically at interpreter exit via atexit. +Call dataweave.cleanup() to release them earlier if needed. +""" + +import base64 +import ctypes +import json +from dataclasses import dataclass +from pathlib import Path +from queue import Queue +from threading import Thread +from typing import Any, Callable, Dict, Generator, Iterable, Optional, Union + + +class DataWeaveError(Exception): + pass + + +class DataWeaveScriptError(DataWeaveError): + """Raised when a DataWeave script fails (compile or runtime error). + + Carries the full result object so callers can inspect details. + """ + + def __init__(self, result): + self.result = result + super().__init__(result.error or "Script execution failed") + + +class DataWeaveLibraryNotFoundError(Exception): + pass + + +# ctypes callback signatures matching NativeCallbacks.WriteCallback / ReadCallback. +# Buffer parameters use c_void_p (not c_char_p) because ctypes gives c_char_p +# special treatment that prevents writing into the buffer. +# int (*WriteCallback)(void *ctx, const char *buffer, int length) +WRITE_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int) +# int (*ReadCallback)(void *ctx, char *buffer, int bufferSize) +READ_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int) + + +WriteCallback = Callable[[bytes], int] +ReadCallback = Callable[[int], bytes] + +_ENV_NATIVE_LIB = "DATAWEAVE_NATIVE_LIB" + + +@dataclass +class InputValue: + content: Union[str, bytes] + mime_type: Optional[str] = None + charset: Optional[str] = None + properties: Optional[Dict[str, Union[str, int, bool]]] = None + + def encode_content(self) -> str: + if isinstance(self.content, bytes): + raw = self.content + else: + raw = self.content.encode(self.charset or "utf-8") + return base64.b64encode(raw).decode("ascii") + + +@dataclass(repr=False) +class ExecutionResult: + success: bool + result: Optional[str] + error: Optional[str] + binary: bool + mime_type: Optional[str] + charset: Optional[str] + + def __repr__(self): + if not self.success: + return f"ExecutionResult(success=False, error={self.error!r})" + preview = (self.result[:50] + "...") if self.result and len(self.result) > 50 else self.result + return f"ExecutionResult(success=True, mime_type={self.mime_type!r}, charset={self.charset!r}, result={preview!r})" + + def get_bytes(self) -> Optional[bytes]: + if not self.success or self.result is None: + return None + return base64.b64decode(self.result) + + def get_string(self) -> Optional[str]: + if not self.success or self.result is None: + return None + if self.binary: + return self.result + return self.get_bytes().decode(self.charset or "utf-8") + + +@dataclass +class StreamingResult: + """Metadata returned after a streaming execution completes.""" + success: bool + error: Optional[str] + mime_type: Optional[str] + charset: Optional[str] + binary: bool + + +class Stream: + """Wrapper around a streaming generator that captures metadata. + + Iterate to consume output chunks. After iteration completes, + access ``.metadata`` for the :class:`StreamingResult`. + """ + + def __init__(self, gen: Generator[bytes, None, StreamingResult]): + self._gen = gen + self._metadata: Optional[StreamingResult] = None + + def __iter__(self): + return self + + def __next__(self) -> bytes: + try: + return next(self._gen) + except StopIteration as e: + self._metadata = e.value + raise + + @property + def metadata(self) -> Optional[StreamingResult]: + return self._metadata + + +def _parse_native_encoded_response(raw: str) -> ExecutionResult: + if raw is None: + return ExecutionResult(False, None, "Native returned null", False, None, None) + + if raw == "": + return ExecutionResult(False, None, "Native returned empty response", False, None, None) + + try: + parsed = json.loads(raw) + except Exception as e: + return ExecutionResult(False, None, f"Failed to parse native JSON response: {e}", False, None, None) + + if not isinstance(parsed, dict): + return ExecutionResult(False, None, "Native response JSON is not an object", False, None, None) + + success = bool(parsed.get("success", False)) + if not success: + return ExecutionResult(False, None, parsed.get("error"), False, None, None) + + return ExecutionResult( + success=True, + result=parsed.get("result"), + error=None, + binary=bool(parsed.get("binary", False)), + mime_type=parsed.get("mimeType"), + charset=parsed.get("charset"), + ) + + +def _parse_streaming_result(meta: dict) -> StreamingResult: + success = meta.get("success", False) + if not success: + return StreamingResult( + success=False, + error=meta.get("error"), + mime_type=None, + charset=None, + binary=False, + ) + return StreamingResult( + success=True, + error=None, + mime_type=meta.get("mimeType"), + charset=meta.get("charset"), + binary=meta.get("binary", False), + ) + + +def _candidate_library_paths() -> list[Path]: + paths: list[Path] = [] + + env_value = (__import__("os").environ.get(_ENV_NATIVE_LIB) or "").strip() + if env_value: + paths.append(Path(env_value)) + + pkg_dir = Path(__file__).resolve().parent + native_dir = pkg_dir / "native" + paths.append(native_dir / "dwlib.dylib") + paths.append(native_dir / "dwlib.so") + paths.append(native_dir / "dwlib.dll") + + # Dev fallback: if this package is being used from the data-weave-cli repo + # tree, locate native-lib/build/native/nativeCompile. + for parent in pkg_dir.parents: + build_dir = parent / "build" / "native" / "nativeCompile" + if build_dir.exists(): + paths.append(build_dir / "dwlib.dylib") + paths.append(build_dir / "dwlib.so") + paths.append(build_dir / "dwlib.dll") + break + + # CWD fallback + paths.append(Path("dwlib.dylib")) + paths.append(Path("dwlib.so")) + paths.append(Path("dwlib.dll")) + + return paths + + +def _find_library() -> str: + for p in _candidate_library_paths(): + if p.exists() and p.is_file(): + return str(p) + + raise DataWeaveLibraryNotFoundError( + "Could not find DataWeave native library (dwlib). " + f"Set {_ENV_NATIVE_LIB} to an absolute path or install a wheel that bundles the native library." + ) + + +def _normalize_input_value(value: Any, mime_type: Optional[str] = None) -> Dict[str, Any]: + if isinstance(value, dict): + allowed_keys = {"content", "mimeType", "charset", "properties"} + extra_keys = set(value.keys()) - allowed_keys + if extra_keys: + raise DataWeaveError( + "Explicit input dict contains unsupported keys: " + ", ".join(sorted(extra_keys)) + ) + + if "content" in value or "mimeType" in value: + if "content" not in value or "mimeType" not in value: + raise DataWeaveError( + "Explicit input dict must include both 'content' and 'mimeType'" + ) + + raw_content = value.get("content") + charset = value.get("charset") or "utf-8" + if isinstance(raw_content, bytes): + encoded_content = base64.b64encode(raw_content).decode("ascii") + else: + encoded_content = base64.b64encode(str(raw_content).encode(charset)).decode("ascii") + + normalized: Dict[str, Any] = { + "content": encoded_content, + "mimeType": value.get("mimeType"), + } + if "charset" in value: + normalized["charset"] = value.get("charset") + if "properties" in value: + normalized["properties"] = value.get("properties") + return normalized + + if isinstance(value, InputValue): + out: Dict[str, Any] = { + "content": value.encode_content(), + "mimeType": value.mime_type or mime_type, + } + if value.charset is not None: + out["charset"] = value.charset + if value.properties is not None: + out["properties"] = value.properties + return out + + if isinstance(value, str): + content = value + default_mime = "text/plain" + elif isinstance(value, (int, float, bool)): + content = json.dumps(value) + default_mime = "application/json" + elif value is None: + content = "null" + default_mime = "application/json" + else: + try: + content = json.dumps(value) + default_mime = "application/json" + except (TypeError, ValueError): + content = str(value) + default_mime = "text/plain" + + charset = "utf-8" + encoded_content = base64.b64encode(content.encode(charset)).decode("ascii") + + return { + "content": encoded_content, + "mimeType": mime_type or default_mime, + "charset": charset, + } + + +class DataWeave: + def __init__(self, lib_path: Optional[str] = None): + self._lib_path = lib_path or _find_library() + self._lib = None + self._isolate = None + self._thread = None + self._initialized = False + + def _load_library(self): + try: + self._lib = ctypes.CDLL(self._lib_path) + except OSError as e: + raise DataWeaveError(f"Failed to load library from {self._lib_path}: {e}") + + def _setup_graal_structures(self): + class graal_isolate_t(ctypes.Structure): + pass + + class graal_isolatethread_t(ctypes.Structure): + pass + + self._graal_isolate_t_ptr = ctypes.POINTER(graal_isolate_t) + self._graal_isolatethread_t_ptr = ctypes.POINTER(graal_isolatethread_t) + + def _create_isolate(self): + self._lib.graal_create_isolate.argtypes = [ + ctypes.c_void_p, + ctypes.POINTER(self._graal_isolate_t_ptr), + ctypes.POINTER(self._graal_isolatethread_t_ptr), + ] + self._lib.graal_create_isolate.restype = ctypes.c_int + + self._isolate = self._graal_isolate_t_ptr() + self._thread = self._graal_isolatethread_t_ptr() + + result = self._lib.graal_create_isolate(None, ctypes.byref(self._isolate), ctypes.byref(self._thread)) + if result != 0: + raise DataWeaveError(f"Failed to create GraalVM isolate. Error code: {result}") + + def _setup_functions(self): + if not hasattr(self._lib, "run_script"): + raise DataWeaveError("Native library does not export run_script") + + self._lib.run_script.argtypes = [ + self._graal_isolatethread_t_ptr, + ctypes.c_char_p, + ctypes.c_char_p, + ] + self._lib.run_script.restype = ctypes.c_void_p + + if hasattr(self._lib, "free_cstring"): + self._lib.free_cstring.argtypes = [self._graal_isolatethread_t_ptr, ctypes.c_void_p] + self._lib.free_cstring.restype = None + + # Thread attachment for background threads + if hasattr(self._lib, "graal_attach_thread"): + self._lib.graal_attach_thread.argtypes = [self._graal_isolate_t_ptr, ctypes.POINTER(self._graal_isolatethread_t_ptr)] + self._lib.graal_attach_thread.restype = ctypes.c_int + if hasattr(self._lib, "graal_detach_thread"): + self._lib.graal_detach_thread.argtypes = [self._graal_isolatethread_t_ptr] + self._lib.graal_detach_thread.restype = ctypes.c_int + if hasattr(self._lib, "graal_tear_down_isolate"): + self._lib.graal_tear_down_isolate.argtypes = [self._graal_isolatethread_t_ptr] + self._lib.graal_tear_down_isolate.restype = ctypes.c_int + + # Callback-based Streaming API + if hasattr(self._lib, "run_script_callback"): + self._lib.run_script_callback.argtypes = [ + self._graal_isolatethread_t_ptr, + ctypes.c_char_p, + ctypes.c_char_p, + WRITE_CALLBACK, + ctypes.c_void_p, + ] + self._lib.run_script_callback.restype = ctypes.c_void_p + + self._has_callback_streaming = True + else: + self._has_callback_streaming = False + + if hasattr(self._lib, "run_script_input_output_callback"): + self._lib.run_script_input_output_callback.argtypes = [ + self._graal_isolatethread_t_ptr, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + READ_CALLBACK, + WRITE_CALLBACK, + ctypes.c_void_p, + ] + self._lib.run_script_input_output_callback.restype = ctypes.c_void_p + + self._has_callback_input_output = True + else: + self._has_callback_input_output = False + + def _decode_and_free(self, ptr: Optional[int]) -> str: + if not ptr: + return "" + + try: + result_bytes = ctypes.string_at(ptr) + return result_bytes.decode("utf-8") + finally: + if self._lib is not None and hasattr(self._lib, "free_cstring"): + self._lib.free_cstring(self._thread, ptr) + + def initialize(self): + if self._initialized: + return + + self._load_library() + self._setup_graal_structures() + self._create_isolate() + self._setup_functions() + self._initialized = True + + def cleanup(self): + if not self._initialized: + return + + if hasattr(self._lib, "graal_tear_down_isolate") and self._thread: + try: + self._lib.graal_tear_down_isolate(self._thread) + except Exception: + pass + elif hasattr(self._lib, "graal_detach_thread") and self._thread: + try: + self._lib.graal_detach_thread(self._thread) + except Exception: + pass + + self._initialized = False + self._thread = None + self._isolate = None + self._lib = None + + def run_callback( + self, + script: str, + write_callback: WriteCallback, + inputs: Optional[Dict[str, Any]] = None, + ) -> StreamingResult: + """Execute a DataWeave script and stream the output via a write callback. + + The native side reads the output internally and invokes *write_callback* + for each chunk. + + :param script: the DataWeave script source + :param write_callback: callable ``(data: bytes) -> int`` invoked with each + output chunk. Must return ``0`` on success or non-zero to abort. + :param inputs: optional input bindings (same format as :meth:`run`) + :return: a :class:`StreamingResult` with metadata + :raises DataWeaveError: if the runtime is not initialized or the callback API + is not available + """ + if not self._initialized: + raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.") + if not self._has_callback_streaming: + raise DataWeaveError( + "Native library does not support callback streaming API (run_script_callback not found)." + ) + + if inputs is None: + inputs = {} + + normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()} + inputs_json = json.dumps(normalized_inputs) + + @WRITE_CALLBACK + def _write_cb(_ctx, buf, length): + try: + data = ctypes.string_at(buf, length) + return write_callback(data) + except Exception: + return -1 + + try: + result_ptr = self._lib.run_script_callback( + self._thread, + script.encode("utf-8"), + inputs_json.encode("utf-8"), + _write_cb, + None, + ) + raw = self._decode_and_free(result_ptr) + meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"} + except Exception as e: + raise DataWeaveError(f"Failed to execute callback streaming: {e}") + + return _parse_streaming_result(meta) + + def run_streaming( + self, + script: str, + inputs: Optional[Dict[str, Any]] = None, + ) -> Stream: + """Execute a DataWeave script and yield output chunks as they arrive. + + Chunks are yielded in real-time as the native engine produces them, + using a background thread and queue. The caller sees data before the + script finishes executing. + + Usage:: + + with DataWeave() as dw: + stream = dw.run_streaming("output json --- {items: (1 to 100)}") + for chunk in stream: + sys.stdout.buffer.write(chunk) + metadata = stream.metadata # StreamingResult with mime_type, charset, etc. + + :param script: the DataWeave script source + :param inputs: optional input bindings (same format as :meth:`run`) + :return: a :class:`Stream` yielding ``bytes`` chunks; after iteration, + ``.metadata`` holds a :class:`StreamingResult` + :raises DataWeaveError: if the runtime is not initialized or the callback API + is not available + """ + return Stream(self._run_streaming_gen(script, inputs)) + + def _run_streaming_gen( + self, + script: str, + inputs: Optional[Dict[str, Any]] = None, + ) -> Generator[bytes, None, StreamingResult]: + if not self._initialized: + raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.") + if not self._has_callback_streaming: + raise DataWeaveError( + "Native library does not support callback streaming API (run_script_callback not found)." + ) + + if inputs is None: + inputs = {} + + normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()} + inputs_json = json.dumps(normalized_inputs) + + _SENTINEL = object() + q: Queue = Queue() + + @WRITE_CALLBACK + def _write_cb(_ctx, buf, length): + try: + q.put(ctypes.string_at(buf, length)) + return 0 + except Exception: + return -1 + + def _run_native(): + worker_thread = self._graal_isolatethread_t_ptr() + rc = self._lib.graal_attach_thread(self._isolate, ctypes.byref(worker_thread)) + if rc != 0: + q.put({"success": False, "error": f"Failed to attach worker thread to isolate (code {rc})"}) + q.put(_SENTINEL) + return + try: + result_ptr = self._lib.run_script_callback( + worker_thread, + script.encode("utf-8"), + inputs_json.encode("utf-8"), + _write_cb, + None, + ) + raw_ptr = result_ptr + if raw_ptr: + raw = ctypes.string_at(raw_ptr).decode("utf-8") + self._lib.free_cstring(worker_thread, raw_ptr) + else: + raw = "" + meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"} + q.put(meta) + except Exception as e: + q.put({"success": False, "error": str(e)}) + finally: + self._lib.graal_detach_thread(worker_thread) + q.put(_SENTINEL) + + worker = Thread(target=_run_native, name="dw-streaming-worker", daemon=True) + worker.start() + + meta = None + while True: + item = q.get() + if item is _SENTINEL: + break + if isinstance(item, dict): + meta = item + else: + yield item + + worker.join() + + if meta is None: + meta = {"success": False, "error": "No metadata received from native call"} + + success = meta.get("success", False) + if not success: + return StreamingResult( + success=False, + error=meta.get("error"), + mime_type=None, + charset=None, + binary=False, + ) + + return StreamingResult( + success=True, + error=None, + mime_type=meta.get("mimeType"), + charset=meta.get("charset"), + binary=meta.get("binary", False), + ) + + def run_transform( + self, + script: str, + input_stream: Iterable[bytes], + input_name: str = "payload", + input_mime_type: str = "application/json", + input_charset: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, + ) -> Stream: + """Execute a DataWeave script with streaming input and output. + + Input data is pulled from *input_stream* (any iterable of bytes) and + output chunks are yielded as they are produced — fully streaming in + both directions with constant memory overhead. + + Usage:: + + with DataWeave() as dw: + with open("large.json", "rb") as f: + stream = dw.run_transform( + "output application/csv --- payload", + input_stream=iter(lambda: f.read(8192), b""), + input_mime_type="application/json", + ) + for chunk in stream: + sys.stdout.buffer.write(chunk) + metadata = stream.metadata + + :param script: the DataWeave script source + :param input_stream: iterable yielding ``bytes`` chunks for the input binding + :param input_name: binding name for the streamed input (default ``"payload"``) + :param input_mime_type: MIME type of the streamed input + :param input_charset: charset of the streamed input (default UTF-8) + :param inputs: optional additional input bindings (same format as :meth:`run`) + :return: a :class:`Stream` yielding ``bytes`` output chunks; after iteration, + ``.metadata`` holds a :class:`StreamingResult` + :raises DataWeaveError: if the runtime is not initialized or the API is missing + """ + return Stream(self._run_transform_gen( + script, input_stream, input_name, input_mime_type, input_charset, inputs, + )) + + def _run_transform_gen( + self, + script: str, + input_stream: Iterable[bytes], + input_name: str = "payload", + input_mime_type: str = "application/json", + input_charset: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, + ) -> Generator[bytes, None, StreamingResult]: + if not self._initialized: + raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.") + if not self._has_callback_input_output: + raise DataWeaveError( + "Native library does not support callback input/output API " + "(run_script_input_output_callback not found)." + ) + + if inputs is None: + inputs = {} + + normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()} + inputs_json = json.dumps(normalized_inputs) + + _SENTINEL = object() + q: Queue = Queue() + + @WRITE_CALLBACK + def _write_cb(_ctx, buf, length): + try: + q.put(ctypes.string_at(buf, length)) + return 0 + except Exception: + return -1 + + input_iter = iter(input_stream) + + @READ_CALLBACK + def _read_cb(_ctx, buf, buf_size): + try: + data = next(input_iter, b"") + if not data: + return 0 + n = min(len(data), buf_size) + ctypes.memmove(buf, data, n) + return n + except Exception: + return -1 + + def _run_native(): + worker_thread = self._graal_isolatethread_t_ptr() + rc = self._lib.graal_attach_thread(self._isolate, ctypes.byref(worker_thread)) + if rc != 0: + q.put({"success": False, "error": f"Failed to attach worker thread to isolate (code {rc})"}) + q.put(_SENTINEL) + return + try: + result_ptr = self._lib.run_script_input_output_callback( + worker_thread, + script.encode("utf-8"), + inputs_json.encode("utf-8"), + input_name.encode("utf-8"), + input_mime_type.encode("utf-8"), + input_charset.encode("utf-8") if input_charset else None, + _read_cb, + _write_cb, + None, + ) + if result_ptr: + raw = ctypes.string_at(result_ptr).decode("utf-8") + self._lib.free_cstring(worker_thread, result_ptr) + else: + raw = "" + meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"} + q.put(meta) + except Exception as e: + q.put({"success": False, "error": str(e)}) + finally: + self._lib.graal_detach_thread(worker_thread) + q.put(_SENTINEL) + + worker = Thread(target=_run_native, name="dw-transform-worker", daemon=True) + worker.start() + + meta = None + while True: + item = q.get() + if item is _SENTINEL: + break + if isinstance(item, dict): + meta = item + else: + yield item + + worker.join() + + if meta is None: + meta = {"success": False, "error": "No metadata received from native call"} + + success = meta.get("success", False) + if not success: + return StreamingResult( + success=False, + error=meta.get("error"), + mime_type=None, + charset=None, + binary=False, + ) + + return StreamingResult( + success=True, + error=None, + mime_type=meta.get("mimeType"), + charset=meta.get("charset"), + binary=meta.get("binary", False), + ) + + def run_input_output_callback( + self, + script: str, + input_name: str, + input_mime_type: str, + read_callback: ReadCallback, + write_callback: WriteCallback, + input_charset: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, + ) -> StreamingResult: + """Execute a DataWeave script with callback-driven input *and* output streaming. + + The native side calls *read_callback* on a background thread to pull input + data for the binding named *input_name*, and calls *write_callback* on the + calling thread to push output chunks. + + :param script: the DataWeave script source + :param input_name: the binding name for the callback-supplied input + :param input_mime_type: MIME type of the callback-supplied input + :param read_callback: callable ``(buf_size: int) -> bytes`` returning the + next chunk, empty bytes ``b""`` on EOF, or raising on error + :param write_callback: callable ``(data: bytes) -> int`` returning ``0`` on + success or non-zero to abort + :param input_charset: charset of the callback-supplied input (default UTF-8) + :param inputs: optional additional input bindings + :return: a :class:`StreamingResult` with metadata + :raises DataWeaveError: if the runtime is not initialized or the API is missing + """ + if not self._initialized: + raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.") + if not self._has_callback_input_output: + raise DataWeaveError( + "Native library does not support callback input/output API " + "(run_script_input_output_callback not found)." + ) + + if inputs is None: + inputs = {} + + normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()} + inputs_json = json.dumps(normalized_inputs) + + @READ_CALLBACK + def _read_cb(_ctx, buf, buf_size): + try: + data = read_callback(buf_size) + if not data: + return 0 # EOF + n = min(len(data), buf_size) + ctypes.memmove(buf, data, n) + return n + except Exception: + return -1 + + @WRITE_CALLBACK + def _write_cb(_ctx, buf, length): + try: + data = ctypes.string_at(buf, length) + return write_callback(data) + except Exception: + return -1 + + try: + result_ptr = self._lib.run_script_input_output_callback( + self._thread, + script.encode("utf-8"), + inputs_json.encode("utf-8"), + input_name.encode("utf-8"), + input_mime_type.encode("utf-8"), + input_charset.encode("utf-8") if input_charset else None, + _read_cb, + _write_cb, + None, + ) + raw = self._decode_and_free(result_ptr) + meta = json.loads(raw) if raw else {"success": False, "error": "Empty response"} + except Exception as e: + raise DataWeaveError(f"Failed to execute callback input/output streaming: {e}") + + return _parse_streaming_result(meta) + + def run(self, script: str, inputs: Optional[Dict[str, Any]] = None, raise_on_error: bool = False) -> ExecutionResult: + if not self._initialized: + raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.") + + if inputs is None: + inputs = {} + + normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()} + inputs_json = json.dumps(normalized_inputs) + + try: + result_ptr = self._lib.run_script( + self._thread, + script.encode("utf-8"), + inputs_json.encode("utf-8"), + ) + raw = self._decode_and_free(result_ptr) + result = _parse_native_encoded_response(raw) + except Exception as e: + raise DataWeaveError(f"Failed to execute script: {e}") + + if raise_on_error and not result.success: + raise DataWeaveScriptError(result) + return result + + def __enter__(self): + self.initialize() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.cleanup() + return False + + +_global_instance: Optional[DataWeave] = None + + +def _get_global_instance() -> DataWeave: + global _global_instance + if _global_instance is None: + import atexit + _global_instance = DataWeave() + _global_instance.initialize() + atexit.register(cleanup) + return _global_instance + + +def run(script: str, inputs: Optional[Dict[str, Any]] = None, raise_on_error: bool = False) -> ExecutionResult: + return _get_global_instance().run(script, inputs, raise_on_error=raise_on_error) + + +def run_streaming( + script: str, inputs: Optional[Dict[str, Any]] = None, +) -> Stream: + """Execute a script and yield output chunks. See :meth:`DataWeave.run_streaming`.""" + return _get_global_instance().run_streaming(script, inputs) + + +def run_callback(script: str, write_callback: WriteCallback, inputs: Optional[Dict[str, Any]] = None) -> StreamingResult: + """Execute a script and stream output via a write callback. See :meth:`DataWeave.run_callback`.""" + return _get_global_instance().run_callback(script, write_callback, inputs) + + +def run_transform( + script: str, + input_stream: Iterable[bytes], + input_name: str = "payload", + input_mime_type: str = "application/json", + input_charset: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, +) -> Stream: + """Execute a script with streaming input and output. See :meth:`DataWeave.run_transform`.""" + return _get_global_instance().run_transform( + script, input_stream, input_name, input_mime_type, input_charset, inputs, + ) + + +def run_input_output_callback( + script: str, + input_name: str, + input_mime_type: str, + read_callback: ReadCallback, + write_callback: WriteCallback, + input_charset: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, +) -> StreamingResult: + """Execute a script with callback-driven input and output. See :meth:`DataWeave.run_input_output_callback`.""" + return _get_global_instance().run_input_output_callback( + script, input_name, input_mime_type, read_callback, write_callback, input_charset, inputs, + ) + + +def cleanup(): + global _global_instance + if _global_instance is not None: + _global_instance.cleanup() + _global_instance = None + + +__all__ = [ + "DataWeave", + "DataWeaveError", + "DataWeaveLibraryNotFoundError", + "DataWeaveScriptError", + "ExecutionResult", + "InputValue", + "ReadCallback", + "Stream", + "StreamingResult", + "WriteCallback", + "READ_CALLBACK", + "WRITE_CALLBACK", + "run", + "run_callback", + "run_input_output_callback", + "run_streaming", + "run_transform", + "cleanup", +] diff --git a/native-lib/python/tests/person.xml b/native-lib/python/tests/person.xml new file mode 100644 index 0000000..376a6b7 Binary files /dev/null and b/native-lib/python/tests/person.xml differ diff --git a/native-lib/python/tests/test_dataweave_module.py b/native-lib/python/tests/test_dataweave_module.py new file mode 100755 index 0000000..bbf4a32 --- /dev/null +++ b/native-lib/python/tests/test_dataweave_module.py @@ -0,0 +1,494 @@ +#!/usr/bin/env python3 +""" +Quick test script for the DataWeave Python module. +""" + +import sys +from pathlib import Path + +_PYTHON_SRC_DIR = Path(__file__).resolve().parents[1] / "src" +sys.path.insert(0, str(_PYTHON_SRC_DIR)) + +import dataweave + +def test_basic(): + """Test basic functionality""" + print("Testing basic script execution...") + try: + result = dataweave.run("2 + 2", {}) + assert result.get_string() == "4", f"Expected '4', got '{result.get_string()}'" + print("[OK] Basic script execution works") + return True + except Exception as e: + print(f"[FAIL] Basic script execution failed: {e}") + return False + +def test_with_inputs(): + """Test script with inputs""" + print("\nTesting script with inputs...") + try: + result = dataweave.run("num1 + num2", {"num1": 25, "num2": 17}) + assert result.get_string() == "42", f"Expected '42', got '{result.get_string()}'" + print("[OK] Script with inputs works") + return True + except Exception as e: + print(f"[FAIL] Script with inputs failed: {e}") + return False + +def test_context_manager(): + """Test context manager""" + print("\nTesting with context manager...") + try: + with dataweave.DataWeave() as dw: + + result = dw.run("sqrt(144)") + assert result.get_string() == "12", f"Expected '12', got '{result.get_string()}'" + result = dw.run("sqrt(10000)") + assert result.get_string() == "100", f"Expected '100', got '{result.get_string()}'" + print("[OK] Script execution witch context manager works") + return True + except Exception as e: + print(f"[FAIL] Script execution witch context manager failed: {e}") + return False + +def test_encoding(): + """Test reading UTF-16 XML input and producing CSV output""" + print("\nTesting encoding (UTF-16 XML -> CSV)...") + try: + xml_path = ( + Path(__file__).resolve().parent / "person.xml" + ) + xml_bytes = xml_path.read_bytes() + + script = """output application/csv header=true +--- +[payload.person] +""" + + result = dataweave.run( + script, + { + "payload": { + "content": xml_bytes, + "mimeType": "application/xml", + "charset": "UTF-16", + } + }, + ) + + out = result.get_string() or "" + print(f"out: \n{out}") + assert result.success is True, f"Expected success=true, got: {result}" + assert "name" in out and "age" in out, f"CSV header missing, got: {out!r}" + assert "Billy" in out, f"Expected name 'Billy' in CSV, got: {out!r}" + assert "31" in out, f"Expected age '31' in CSV, got: {out!r}" + + print("[OK] Encoding conversion works") + return True + except Exception as e: + print(f"[FAIL] Encoding conversion failed: {e}") + return False + +def test_auto_conversion(): + """Test auto-conversion of different types""" + print("\nTesting auto-conversion...") + try: + + # Test array + result = dataweave.run( + "numbers[0]", + {"numbers": [1, 2, 3]} + ) + assert result.get_string() == "1", f"Expected '1', got '{result.get_string()}'" + + print("[OK] Auto-conversion works") + return True + except Exception as e: + print(f"[FAIL] Auto-conversion failed: {e}") + return False + +def test_callback_output_basic(): + """Test callback-based output streaming""" + print("\nTesting callback output basic...") + try: + chunks = [] + + def on_write(data: bytes) -> int: + chunks.append(data) + return 0 + + result = dataweave.run_callback("2 + 2", on_write) + assert result.success is True, f"Expected success, got: {result}" + full = b"".join(chunks) + text = full.decode(result.charset or "utf-8") + assert text == "4", f"Expected '4', got '{text}'" + print(f"[OK] Callback output basic works (chunks={len(chunks)}, result='{text}')") + return True + except Exception as e: + print(f"[FAIL] Callback output basic failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_callback_output_with_inputs(): + """Test callback-based output streaming with inputs""" + print("\nTesting callback output with inputs...") + try: + chunks = [] + + def on_write(data: bytes) -> int: + chunks.append(data) + return 0 + + result = dataweave.run_callback("num1 + num2", on_write, inputs={"num1": 25, "num2": 17}) + assert result.success is True, f"Expected success, got: {result}" + full = b"".join(chunks) + text = full.decode(result.charset or "utf-8") + assert text == "42", f"Expected '42', got '{text}'" + print(f"[OK] Callback output with inputs works (result='{text}')") + return True + except Exception as e: + print(f"[FAIL] Callback output with inputs failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_callback_input_output(): + """Test callback-based input and output streaming""" + print("\nTesting callback input+output...") + try: + import io as _io + + source = _io.BytesIO(b'[10, 20, 30, 40, 50]') + output_chunks = [] + + def on_read(buf_size: int) -> bytes: + return source.read(buf_size) + + def on_write(data: bytes) -> int: + output_chunks.append(data) + return 0 + + script = "output application/json\n---\npayload map ($ * 2)" + result = dataweave.run_input_output_callback( + script, + input_name="payload", + input_mime_type="application/json", + read_callback=on_read, + write_callback=on_write, + ) + assert result.success is True, f"Expected success, got: {result}" + full = b"".join(output_chunks) + text = full.decode(result.charset or "utf-8") + assert "20" in text, f"Expected 20 in result (10*2), got: {text}" + assert "100" in text, f"Expected 100 in result (50*2), got: {text}" + print(f"[OK] Callback input+output works (chunks={len(output_chunks)}, result={text.strip()[:80]}...)") + return True + except Exception as e: + print(f"[FAIL] Callback input+output failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_run_streaming_basic(): + """Test run_streaming yields chunks and returns metadata""" + print("\nTesting run_streaming basic...") + try: + stream = dataweave.run_streaming("output application/json --- {a: 1, b: 2}") + chunks = [] + try: + while True: + chunks.append(next(stream)) + except StopIteration as e: + metadata = e.value + + full = b"".join(chunks) + assert len(full) > 0, "Expected non-empty output" + text = full.decode(metadata.charset or "utf-8") + assert '"a": 1' in text or '"a":1' in text, f"Expected key 'a' in JSON, got: {text}" + assert metadata.success is True, f"Expected success, got: {metadata}" + assert metadata.mime_type == "application/json", f"Expected json mime, got: {metadata.mime_type}" + print(f"[OK] run_streaming basic works (chunks={len(chunks)}, result={text.strip()[:60]})") + return True + except Exception as e: + print(f"[FAIL] run_streaming basic failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_run_streaming_large(): + """Test run_streaming with large output to verify true streaming (multiple chunks)""" + print("\nTesting run_streaming large...") + try: + script = 'output application/json --- (1 to 5000) map {id: $, name: "item_" ++ $}' + stream = dataweave.run_streaming(script) + chunks = [] + try: + while True: + chunks.append(next(stream)) + except StopIteration as e: + metadata = e.value + + full = b"".join(chunks) + text = full.decode(metadata.charset or "utf-8") + assert metadata.success is True, f"Expected success, got: {metadata}" + assert len(chunks) > 1, f"Expected multiple chunks for large output, got {len(chunks)}" + assert '"id": 5000' in text or '"id":5000' in text, f"Expected last item in output" + print(f"[OK] run_streaming large works (chunks={len(chunks)}, bytes={len(full)})") + return True + except Exception as e: + print(f"[FAIL] run_streaming large failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_run_streaming_error(): + """Test run_streaming with an invalid script returns error metadata""" + print("\nTesting run_streaming error...") + try: + stream = dataweave.run_streaming("output application/json --- invalid_var") + chunks = [] + try: + while True: + chunks.append(next(stream)) + except StopIteration as e: + metadata = e.value + + assert metadata.success is False, f"Expected failure, got: {metadata}" + assert metadata.error is not None, "Expected error message" + assert len(chunks) == 0, f"Expected no chunks on error, got {len(chunks)}" + print(f"[OK] run_streaming error works (error={metadata.error[:60]}...)") + return True + except Exception as e: + print(f"[FAIL] run_streaming error failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_run_streaming_with_inputs(): + """Test run_streaming with input bindings""" + print("\nTesting run_streaming with inputs...") + try: + stream = dataweave.run_streaming("num1 + num2", {"num1": 25, "num2": 17}) + chunks = [] + try: + while True: + chunks.append(next(stream)) + except StopIteration as e: + metadata = e.value + + full = b"".join(chunks) + text = full.decode(metadata.charset or "utf-8") + assert metadata.success is True, f"Expected success, got: {metadata}" + assert text.strip() == "42", f"Expected '42', got '{text.strip()}'" + print(f"[OK] run_streaming with inputs works (result='{text.strip()}')") + return True + except Exception as e: + print(f"[FAIL] run_streaming with inputs failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_callback_input_output_large(): + """Test callback-based input+output streaming with large data""" + print("\nTesting callback input+output large...") + try: + import io as _io + + # Build a large JSON array + parts = [b"["] + for i in range(1, 1001): + if i > 1: + parts.append(b",") + parts.append(f'{{"id":{i}}}'.encode()) + parts.append(b"]") + source = _io.BytesIO(b"".join(parts)) + output_chunks = [] + + def on_read(buf_size: int) -> bytes: + return source.read(buf_size) + + def on_write(data: bytes) -> int: + output_chunks.append(data) + return 0 + + result = dataweave.run_input_output_callback( + "output application/json\n---\nsizeOf(payload)", + input_name="payload", + input_mime_type="application/json", + read_callback=on_read, + write_callback=on_write, + ) + assert result.success is True, f"Expected success, got: {result}" + full = b"".join(output_chunks) + text = full.decode(result.charset or "utf-8") + assert text == "1000", f"Expected '1000', got '{text}'" + print(f"[OK] Callback input+output large works (result='{text}')") + return True + except Exception as e: + print(f"[FAIL] Callback input+output large failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_run_transform_basic(): + """Test run_transform with an iterable input and streaming output""" + print("\nTesting run_transform basic...") + try: + input_data = [b'[10, 20, 30, 40, 50]'] + script = "output application/json\n---\npayload map ($ * 2)" + stream = dataweave.run_transform(script, input_stream=input_data, input_mime_type="application/json") + chunks = [] + try: + while True: + chunks.append(next(stream)) + except StopIteration as e: + metadata = e.value + + full = b"".join(chunks) + text = full.decode(metadata.charset or "utf-8") + assert metadata.success is True, f"Expected success, got: {metadata}" + assert "20" in text, f"Expected 20 in result (10*2), got: {text}" + assert "100" in text, f"Expected 100 in result (50*2), got: {text}" + print(f"[OK] run_transform basic works (chunks={len(chunks)}, result={text.strip()[:60]})") + return True + except Exception as e: + print(f"[FAIL] run_transform basic failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_run_transform_large(): + """Test run_transform with large chunked input to verify streaming both directions""" + print("\nTesting run_transform large...") + try: + import io as _io + + # Build a large JSON array as chunked input + parts = [b"["] + for i in range(1, 1001): + if i > 1: + parts.append(b",") + parts.append(f'{{"id":{i}}}'.encode()) + parts.append(b"]") + full_input = b"".join(parts) + + # Feed in 4KB chunks (simulating a file/network read) + def chunked(data, size=4096): + for i in range(0, len(data), size): + yield data[i:i+size] + + script = "output application/json\n---\nsizeOf(payload)" + stream = dataweave.run_transform( + script, + input_stream=chunked(full_input), + input_mime_type="application/json", + ) + chunks = [] + try: + while True: + chunks.append(next(stream)) + except StopIteration as e: + metadata = e.value + + full = b"".join(chunks) + text = full.decode(metadata.charset or "utf-8") + assert metadata.success is True, f"Expected success, got: {metadata}" + assert text == "1000", f"Expected '1000', got '{text}'" + print(f"[OK] run_transform large works (result='{text}')") + return True + except Exception as e: + print(f"[FAIL] run_transform large failed: {e}") + import traceback + traceback.print_exc() + return False + +def test_run_transform_with_file(): + """Test run_transform reading from a file-like object""" + print("\nTesting run_transform with file...") + try: + from pathlib import Path + + xml_path = Path(__file__).resolve().parent / "person.xml" + + with open(xml_path, "rb") as f: + stream = dataweave.run_transform( + "output application/csv header=true\n---\n[payload.person]", + input_stream=iter(lambda: f.read(4096), b""), + input_mime_type="application/xml", + input_charset="UTF-16", + ) + chunks = [] + try: + while True: + chunks.append(next(stream)) + except StopIteration as e: + metadata = e.value + + full = b"".join(chunks) + text = full.decode(metadata.charset or "utf-8") + assert metadata.success is True, f"Expected success, got: {metadata}" + assert "Billy" in text, f"Expected 'Billy' in CSV, got: {text}" + assert "31" in text, f"Expected '31' in CSV, got: {text}" + print(f"[OK] run_transform with file works (result={text.strip()[:60]})") + return True + except Exception as e: + print(f"[FAIL] run_transform with file failed: {e}") + import traceback + traceback.print_exc() + return False + +def main(): + """Run all tests""" + print("="*70) + print("DataWeave Python Module - Test Suite") + print("="*70) + + try: + results = [] + results.append(test_basic()) + results.append(test_with_inputs()) + results.append(test_context_manager()) + results.append(test_encoding()) + results.append(test_auto_conversion()) + results.append(test_callback_output_basic()) + results.append(test_callback_output_with_inputs()) + results.append(test_callback_input_output()) + results.append(test_callback_input_output_large()) + results.append(test_run_streaming_basic()) + results.append(test_run_streaming_large()) + results.append(test_run_streaming_error()) + results.append(test_run_streaming_with_inputs()) + results.append(test_run_transform_basic()) + results.append(test_run_transform_large()) + results.append(test_run_transform_with_file()) + + # Cleanup + dataweave.cleanup() + + print("\n" + "="*70) + passed = sum(results) + total = len(results) + print(f"Results: {passed}/{total} tests passed") + print("="*70) + + if passed == total: + print("\n[OK] All tests passed!") + sys.exit(0) + else: + print(f"\n[FAIL] {total - passed} test(s) failed") + sys.exit(1) + + except dataweave.DataWeaveLibraryNotFoundError as e: + print(f"\n[ERROR] {e}") + print("\nPlease build the native library first:") + print(" ./gradlew nativeCompile") + sys.exit(2) + except Exception as e: + print(f"\n[ERROR] Unexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java b/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java new file mode 100644 index 0000000..6d1537b --- /dev/null +++ b/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java @@ -0,0 +1,125 @@ +package org.mule.weave.lib; + +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manages a {@link PipedInputStream}/{@link PipedOutputStream} pair that allows FFI callers + * to stream data into the DataWeave engine as an input binding. + * + *

The caller writes bytes into the {@link PipedOutputStream} via {@link #write(byte[], int)} + * while the DW engine reads from the paired {@link PipedInputStream} on a separate thread.

+ * + *

Instances are stored in a static registry keyed by a monotonically increasing handle so that + * native callers can reference them across {@code @CEntryPoint} invocations.

+ */ +public class InputStreamSession { + + private static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>(); + private static final AtomicLong NEXT_HANDLE = new AtomicLong(1); + + private static final int PIPE_BUFFER_SIZE = 64 * 1024; + + private final PipedInputStream pipedInputStream; + private final PipedOutputStream pipedOutputStream; + private final String mimeType; + private final String charset; + + /** + * Creates a new input stream session with the given metadata. + * + * @param mimeType the MIME type of the input data + * @param charset the character set of the input data (may be {@code null}, defaults to UTF-8) + */ + public InputStreamSession(String mimeType, String charset) { + try { + this.pipedInputStream = new PipedInputStream(PIPE_BUFFER_SIZE); + this.pipedOutputStream = new PipedOutputStream(pipedInputStream); + } catch (IOException e) { + throw new RuntimeException("Failed to create piped streams", e); + } + this.mimeType = mimeType; + this.charset = charset != null ? charset : "UTF-8"; + } + + /** + * Registers this session and returns its handle. + * + * @return a unique handle that callers use to reference this session + */ + public long register() { + long handle = NEXT_HANDLE.getAndIncrement(); + SESSIONS.put(handle, this); + return handle; + } + + /** + * Looks up a previously registered session. + * + * @param handle the handle returned by {@link #register()} + * @return the session, or {@code null} if not found + */ + public static InputStreamSession get(long handle) { + return SESSIONS.get(handle); + } + + /** + * Removes a session from the registry and closes both ends of the pipe. + * + * @param handle the session handle + */ + public static void close(long handle) { + InputStreamSession session = SESSIONS.remove(handle); + if (session != null) { + try { + session.pipedOutputStream.close(); + } catch (IOException ignored) { + } + try { + session.pipedInputStream.close(); + } catch (IOException ignored) { + } + } + } + + /** + * Writes bytes into the pipe. The DW engine will read these from the paired + * {@link PipedInputStream}. + * + * @param data the byte array to write from + * @param length the number of bytes to write + * @throws IOException if an I/O error occurs + */ + public void write(byte[] data, int length) throws IOException { + pipedOutputStream.write(data, 0, length); + } + + /** + * Closes the write end of the pipe, signalling EOF to the reader. + * + * @throws IOException if an I/O error occurs + */ + public void closeWriter() throws IOException { + pipedOutputStream.close(); + } + + /** + * Returns the {@link PipedInputStream} that the DW engine should read from. + * + * @return the read end of the pipe + */ + public PipedInputStream getInputStream() { + return pipedInputStream; + } + + public String getMimeType() { + return mimeType; + } + + public String getCharset() { + return charset; + } +} diff --git a/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java b/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java new file mode 100644 index 0000000..3872354 --- /dev/null +++ b/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java @@ -0,0 +1,49 @@ +package org.mule.weave.lib; + +import org.graalvm.nativeimage.c.function.CFunctionPointer; +import org.graalvm.nativeimage.c.function.InvokeCFunctionPointer; +import org.graalvm.nativeimage.c.type.CCharPointer; +import org.graalvm.word.PointerBase; + +/** + * Function-pointer (callback) interfaces used by the callback-based streaming API in + * {@link NativeLib}. + * + *

FFI callers pass C function pointers that conform to these signatures. The Java side + * invokes them via {@link InvokeCFunctionPointer} to push/pull data without requiring + * the session-based open/read|write/close round-trips.

+ */ +public final class NativeCallbacks { + + private NativeCallbacks() { + } + + /** + * Callback the native caller provides to receive output data. + * + *

Signature in C: + *

{@code int (*WriteCallback)(void *ctx, const char *buffer, int length);}
+ * + *

The Java side calls this repeatedly with chunks of the script result. + * The callback must return {@code 0} on success or a non-zero value to abort.

+ */ + public interface WriteCallback extends CFunctionPointer { + @InvokeCFunctionPointer + int invoke(PointerBase ctx, CCharPointer buffer, int length); + } + + /** + * Callback the native caller provides to supply input data. + * + *

Signature in C: + *

{@code int (*ReadCallback)(void *ctx, char *buffer, int bufferSize);}
+ * + *

The Java side calls this to pull the next chunk of input bytes. The callback must + * write up to {@code bufferSize} bytes into {@code buffer} and return the number of + * bytes written, {@code 0} on EOF, or {@code -1} on error.

+ */ + public interface ReadCallback extends CFunctionPointer { + @InvokeCFunctionPointer + int invoke(PointerBase ctx, CCharPointer buffer, int bufferSize); + } +} diff --git a/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java b/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java new file mode 100644 index 0000000..de776e7 --- /dev/null +++ b/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java @@ -0,0 +1,333 @@ +package org.mule.weave.lib; + +import org.graalvm.nativeimage.IsolateThread; +import org.graalvm.nativeimage.UnmanagedMemory; +import org.graalvm.nativeimage.c.function.CEntryPoint; +import org.graalvm.nativeimage.c.type.CCharPointer; +import org.graalvm.nativeimage.c.type.CTypeConversion; +import org.graalvm.word.PointerBase; +import org.graalvm.word.WordFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * GraalVM native entry points exposed for FFI consumers. + * + *

This class provides C-callable functions to execute DataWeave scripts and to free the returned + * unmanaged strings.

+ */ +public class NativeLib { + + /** + * Native method that executes a DataWeave script with inputs and returns the result. + * Can be called from Python via FFI. + * + * @param thread the isolate thread (automatically provided by GraalVM) + * @param script the DataWeave script to execute (C string pointer) + * @param inputsJson JSON string containing the inputs map with content (base64 encoded), mimeType, properties and charset for each binding + * @return the script execution result base64 encoded (C string pointer) + */ + @CEntryPoint(name = "run_script") + public static CCharPointer runDwScriptEncoded(IsolateThread thread, CCharPointer script, CCharPointer inputsJson) { + String dwScript = CTypeConversion.toJavaString(script); + String inputs = CTypeConversion.toJavaString(inputsJson); + + ScriptRuntime runtime = ScriptRuntime.getInstance(); + String result = runtime.run(dwScript, inputs); + return toUnmanagedCString(result); + } + + /** + * Frees a C string previously returned by {@link #runDwScriptEncoded(IsolateThread, CCharPointer, CCharPointer)}. + * + * @param thread the isolate thread (automatically provided by GraalVM) + * @param pointer the pointer to the unmanaged C string to free; if null, this is a no-op + */ + @CEntryPoint(name = "free_cstring") + public static void freeCString(IsolateThread thread, CCharPointer pointer) { + if (pointer.isNull()) { + return; + } + UnmanagedMemory.free(pointer); + } + + // ── Callback-based Streaming API ───────────────────────────────────── + + private static final int CALLBACK_BUFFER_SIZE = 8 * 1024; + + /** + * Executes a DataWeave script and streams the result to a caller-supplied write callback. + * + *

Instead of the session-based open/read/close cycle, the caller passes a + * {@code WriteCallback} function pointer. The Java side reads the output stream in chunks + * and invokes the callback for each chunk until the stream is exhausted.

+ * + *

The returned C string is a JSON object with the execution metadata: + *

+ * The caller must free the returned pointer with {@link #freeCString}.

+ * + * @param thread the isolate thread + * @param script the DataWeave script (C string) + * @param inputsJson JSON-encoded inputs map (C string), may be null + * @param writeCallback function pointer invoked with each output chunk; must return 0 on success + * @param ctx opaque context pointer forwarded to every callback invocation + * @return an unmanaged C string with JSON metadata/error + */ + @CEntryPoint(name = "run_script_callback") + public static CCharPointer runScriptCallback( + IsolateThread thread, + CCharPointer script, + CCharPointer inputsJson, + NativeCallbacks.WriteCallback writeCallback, + PointerBase ctx) { + + String dwScript = CTypeConversion.toJavaString(script); + String inputs = inputsJson.isNull() ? null : CTypeConversion.toJavaString(inputsJson); + + ScriptRuntime runtime = ScriptRuntime.getInstance(); + StreamSession session = runtime.runStreaming(dwScript, inputs); + + if (session.isError()) { + return toUnmanagedCString("{\"success\":false,\"error\":\"" + + escapeJsonString(session.getError()) + "\"}"); + } + + try { + byte[] buf = new byte[CALLBACK_BUFFER_SIZE]; + CCharPointer nativeBuf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE); + try { + int n; + while ((n = session.read(buf, buf.length)) > 0) { + for (int i = 0; i < n; i++) { + nativeBuf.write(i, buf[i]); + } + int rc = writeCallback.invoke(ctx, nativeBuf, n); + if (rc != 0) { + return toUnmanagedCString("{\"success\":false,\"error\":\"" + + "Write callback returned error: " + rc + "\"}"); + } + } + } finally { + UnmanagedMemory.free(nativeBuf); + } + } catch (IOException e) { + return toUnmanagedCString("{\"success\":false,\"error\":\"" + + escapeJsonString(e.getMessage()) + "\"}"); + } finally { + session.closeStream(); + } + + return toUnmanagedCString("{\"success\":true" + + ",\"mimeType\":\"" + session.getMimeType() + "\"" + + ",\"charset\":\"" + session.getCharset() + "\"" + + ",\"binary\":" + session.isBinary() + + "}"); + } + + /** + * Executes a DataWeave script whose output is streamed via a write callback, and whose + * input named {@code inputName} is fed via a read callback. + * + *

The read callback is invoked on a background thread to pull input data while the + * output is pushed to the write callback on the calling thread. This allows fully + * callback-driven input and output streaming in a single call.

+ * + *

The returned C string follows the same JSON schema as + * {@link #runScriptCallback}.

+ * + * @param thread the isolate thread + * @param script the DataWeave script (C string) + * @param inputsJson JSON-encoded inputs map (C string), may be null; entries for + * {@code inputName} are ignored since the read callback supplies that input + * @param inputName the binding name for the callback-supplied input (C string) + * @param inputMimeType the MIME type of the callback-supplied input (C string) + * @param inputCharset the charset of the callback-supplied input (C string), may be null for UTF-8 + * @param readCallback function pointer invoked to read the next chunk; must return bytes written, + * 0 on EOF, or -1 on error + * @param writeCallback function pointer invoked with each output chunk; must return 0 on success + * @param ctx opaque context pointer forwarded to every callback invocation + * @return an unmanaged C string with JSON metadata/error + */ + @CEntryPoint(name = "run_script_input_output_callback") + public static CCharPointer runScriptInputOutputCallback( + IsolateThread thread, + CCharPointer script, + CCharPointer inputsJson, + CCharPointer inputName, + CCharPointer inputMimeType, + CCharPointer inputCharset, + NativeCallbacks.ReadCallback readCallback, + NativeCallbacks.WriteCallback writeCallback, + PointerBase ctx) { + + String dwScript = CTypeConversion.toJavaString(script); + String inputs = inputsJson.isNull() ? null : CTypeConversion.toJavaString(inputsJson); + String inName = CTypeConversion.toJavaString(inputName); + String inMime = CTypeConversion.toJavaString(inputMimeType); + String inCharset = inputCharset.isNull() ? null : CTypeConversion.toJavaString(inputCharset); + + // Create a piped input stream session for the callback-supplied input + InputStreamSession inputSession = new InputStreamSession(inMime, inCharset); + long inputHandle = inputSession.register(); + + // Merge the stream handle into the inputs JSON + String streamEntry = "{\"streamHandle\":\"" + inputHandle + "\",\"mimeType\":\"" + inMime + "\"" + + (inCharset != null ? ",\"charset\":\"" + inCharset + "\"" : "") + "}"; + String mergedInputs = mergeInputEntry(inputs, inName, streamEntry); + + // Start a background thread that calls the readCallback and feeds data into the pipe. + // Word types (CCharPointer, CFunctionPointer, PointerBase) cannot be captured in + // lambdas in GraalVM Native Image, so we use an explicit Runnable that stores their + // raw addresses and reconstitutes them via WordFactory. + final long readCallbackAddr = readCallback.rawValue(); + final long ctxAddr = ctx.rawValue(); + Thread feeder = new Thread(new InputCallbackFeeder( + readCallbackAddr, ctxAddr, inputSession), "dw-input-callback-feeder"); + feeder.setDaemon(true); + feeder.start(); + + // Execute the script and stream output via the writeCallback + ScriptRuntime runtime = ScriptRuntime.getInstance(); + StreamSession session = runtime.runStreaming(dwScript, mergedInputs); + + if (session.isError()) { + cleanupFeeder(feeder, inputHandle); + return toUnmanagedCString("{\"success\":false,\"error\":\"" + + escapeJsonString(session.getError()) + "\"}"); + } + + try { + byte[] buf = new byte[CALLBACK_BUFFER_SIZE]; + CCharPointer writeBuf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE); + try { + int n; + while ((n = session.read(buf, buf.length)) > 0) { + for (int i = 0; i < n; i++) { + writeBuf.write(i, buf[i]); + } + int rc = writeCallback.invoke(ctx, writeBuf, n); + if (rc != 0) { + cleanupFeeder(feeder, inputHandle); + return toUnmanagedCString("{\"success\":false,\"error\":\"" + + "Write callback returned error: " + rc + "\"}"); + } + } + } finally { + UnmanagedMemory.free(writeBuf); + } + } catch (IOException e) { + cleanupFeeder(feeder, inputHandle); + return toUnmanagedCString("{\"success\":false,\"error\":\"" + + escapeJsonString(e.getMessage()) + "\"}"); + } finally { + session.closeStream(); + } + + cleanupFeeder(feeder, inputHandle); + + return toUnmanagedCString("{\"success\":true" + + ",\"mimeType\":\"" + session.getMimeType() + "\"" + + ",\"charset\":\"" + session.getCharset() + "\"" + + ",\"binary\":" + session.isBinary() + + "}"); + } + + /** + * Merges a single input entry into an existing JSON inputs string. + */ + private static String mergeInputEntry(String existingJson, String name, String entryJson) { + org.json.JSONObject obj = (existingJson == null || existingJson.trim().isEmpty()) + ? new org.json.JSONObject() + : new org.json.JSONObject(existingJson); + obj.put(name, new org.json.JSONObject(entryJson)); + return obj.toString(); + } + + /** + * Waits for the feeder thread to finish and closes the input session. + */ + private static void cleanupFeeder(Thread feeder, long inputHandle) { + try { + feeder.join(5000); + } catch (InterruptedException ignored) { + } + InputStreamSession.close(inputHandle); + } + + /** + * Explicit {@link Runnable} that drives the read-callback loop on a background thread. + * + *

GraalVM Native Image forbids capturing {@code Word} types (such as + * {@link CCharPointer} or {@link CFunctionPointer}) inside lambdas. This class stores + * the raw addresses as plain {@code long} values and reconstitutes the pointers via + * {@link WordFactory#pointer(long)} inside {@link #run()}.

+ * + *

The feeder allocates its own native read buffer and frees it in its {@code finally} + * block, ensuring no shared native memory between threads.

+ */ + private static final class InputCallbackFeeder implements Runnable { + private final long readCallbackAddr; + private final long ctxAddr; + private final InputStreamSession inputSession; + + InputCallbackFeeder(long readCallbackAddr, long ctxAddr, + InputStreamSession inputSession) { + this.readCallbackAddr = readCallbackAddr; + this.ctxAddr = ctxAddr; + this.inputSession = inputSession; + } + + @Override + public void run() { + NativeCallbacks.ReadCallback cb = WordFactory.pointer(readCallbackAddr); + PointerBase ctx = WordFactory.pointer(ctxAddr); + CCharPointer buf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE); + try { + while (true) { + int n = cb.invoke(ctx, buf, CALLBACK_BUFFER_SIZE); + if (n <= 0) { + break; // 0 = EOF, negative = error + } + byte[] tmp = new byte[n]; + for (int i = 0; i < n; i++) { + tmp[i] = buf.read(i); + } + inputSession.write(tmp, n); + } + } catch (IOException e) { + // pipe broken – DW engine will see the error + } finally { + UnmanagedMemory.free(buf); + try { + inputSession.closeWriter(); + } catch (IOException ignored) { + } + } + } + } + + private static String escapeJsonString(String input) { + if (input == null) return ""; + return input + .replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t"); + } + + private static CCharPointer toUnmanagedCString(String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + CCharPointer ptr = UnmanagedMemory.malloc(bytes.length + 1); + for (int i = 0; i < bytes.length; i++) { + ptr.write(i, bytes[i]); + } + ptr.write(bytes.length, (byte) 0); + return ptr; + } + +} diff --git a/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java b/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java new file mode 100644 index 0000000..9cb4007 --- /dev/null +++ b/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java @@ -0,0 +1,234 @@ +package org.mule.weave.lib; + +import org.json.JSONObject; +import org.mule.weave.v2.runtime.BindingValue; +import org.mule.weave.v2.runtime.DataWeaveResult; +import org.mule.weave.v2.runtime.ScriptingBindings; +import org.mule.weave.v2.runtime.api.DWResult; +import org.mule.weave.v2.runtime.api.DWScript; +import org.mule.weave.v2.runtime.api.DWScriptingEngine; +import scala.Option; +import scala.Tuple2; +import scala.collection.immutable.Map; +import scala.collection.immutable.Map$; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Base64; + +/** + * Singleton wrapper around a {@link DWScriptingEngine} used to compile and execute DataWeave scripts. + * + *

Execution results are returned as a JSON string containing a base64-encoded payload plus metadata + * (mime type, charset, and whether the result is binary). Errors are returned as a JSON string with + * {@code success=false} and an escaped error message.

+ */ +public class ScriptRuntime { + + private static final ScriptRuntime INSTANCE = new ScriptRuntime(); + + /** + * Returns the singleton instance. + * + * @return the shared {@link ScriptRuntime} + */ + public static ScriptRuntime getInstance() { + return INSTANCE; + } + + private DWScriptingEngine engine; + + private ScriptRuntime() { + engine = DWScriptingEngine.builder().build(); + } + + /** + * Executes a DataWeave script with no input bindings. + * + * @param script the DataWeave script source + * @return a JSON string describing either the successful result or an error + */ + public String run(String script) { + return run(script, null); + } + + /** + * Executes a DataWeave script with optional input bindings encoded as JSON. + * + *

The expected JSON structure maps binding names to an object containing {@code content} + * (base64), {@code mimeType}, optional {@code charset}, and optional {@code properties}.

+ * + * @param script the DataWeave script source + * @param inputsJson JSON string encoding the input bindings map, or {@code null} + * @return a JSON string describing either the successful result or an error + */ + public String run(String script, String inputsJson) { + ScriptingBindings bindings = parseJsonInputsToBindings(inputsJson); + String[] inputs = bindings.bindingNames(); + + try { + DWScript compiled = engine.compileDWScript(script, inputs); + DWResult dwResult = compiled.writeDWResult(bindings); + + String encodedResult; + if (dwResult.getContent() instanceof InputStream) { + try { + byte[] ba = ((InputStream) dwResult.getContent()).readAllBytes(); + encodedResult = Base64.getEncoder().encodeToString(ba); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("Result is not an InputStream: " + dwResult.getContent().getClass().getName()); + } + + return "{" + + "\"success\":true," + + "\"result\":\"" + encodedResult + "\"," + + "\"mimeType\":\"" + dwResult.getMimeType() + "\"," + + "\"charset\":\"" + dwResult.getCharset() + "\"," + + "\"binary\":" + ((DataWeaveResult) dwResult).isBinary() + + "}"; + } catch (Exception e) { + String message = e.getMessage(); + if (message == null || message.trim().isEmpty()) { + message = e.toString(); + } + + return "{" + + "\"success\":false," + + "\"error\":\"" + escapeJsonString(message) + "\"" + + "}"; + } + } + + /** + * Executes a DataWeave script and returns a {@link StreamSession} whose {@link java.io.InputStream} + * can be read incrementally, avoiding loading the entire result into memory. + * + * @param script the DataWeave script source + * @param inputsJson JSON string encoding the input bindings map, or {@code null} + * @return a {@link StreamSession} with the result stream and metadata, or an error session + */ + public StreamSession runStreaming(String script, String inputsJson) { + ScriptingBindings bindings = parseJsonInputsToBindings(inputsJson); + String[] inputs = bindings.bindingNames(); + + try { + DWScript compiled = engine.compileDWScript(script, inputs); + DWResult dwResult = compiled.writeDWResult(bindings); + + if (dwResult.getContent() instanceof InputStream) { + return new StreamSession( + (InputStream) dwResult.getContent(), + dwResult.getMimeType(), + dwResult.getCharset().name(), + ((DataWeaveResult) dwResult).isBinary() + ); + } else { + return StreamSession.ofError("Result is not an InputStream: " + dwResult.getContent().getClass().getName()); + } + } catch (Exception e) { + String message = e.getMessage(); + if (message == null || message.trim().isEmpty()) { + message = e.toString(); + } + return StreamSession.ofError(message); + } + } + + private ScriptingBindings parseJsonInputsToBindings(String inputsJson) { + ScriptingBindings bindings = new ScriptingBindings(); + + if (inputsJson == null || inputsJson.trim().isEmpty()) { + return bindings; + } + + try { + JSONObject root = new JSONObject(inputsJson); + + for (String name : root.keySet()) { + JSONObject entry = root.getJSONObject(name); + + if (entry.has("streamHandle")) { + long streamHandle = Long.parseLong(entry.getString("streamHandle")); + InputStreamSession inputSession = InputStreamSession.get(streamHandle); + if (inputSession == null) { + throw new RuntimeException("Invalid streamHandle " + streamHandle + " for input '" + name + "'"); + } + String mimeTypeRaw = entry.optString("mimeType", inputSession.getMimeType()); + String charsetRaw = entry.optString("charset", inputSession.getCharset()); + Charset charset = Charset.forName(charsetRaw); + Option mimeType = Option.apply(mimeTypeRaw); + + BindingValue bindingValue = new BindingValue(inputSession.getInputStream(), mimeType, Map$.MODULE$.empty(), charset); + bindings.addBinding(name, bindingValue); + + } else if (entry.has("content")) { + String contentRaw = entry.getString("content"); + String mimeTypeRaw = entry.optString("mimeType", null); + String charsetRaw = entry.optString("charset", "UTF-8"); + + Map properties = Map$.MODULE$.empty(); + if (entry.has("properties") && !entry.isNull("properties")) { + JSONObject propsObj = entry.getJSONObject("properties"); + properties = parseJsonProperties(propsObj); + } + + Charset charset = Charset.forName(charsetRaw); + Option mimeType = Option.apply(mimeTypeRaw); + + byte[] content = Base64.getDecoder().decode(contentRaw); + BindingValue bindingValue = new BindingValue(content, mimeType, properties, charset); + bindings.addBinding(name, bindingValue); + } + } + } catch (Exception e) { + System.err.println("Error parsing JSON inputs: " + e.getMessage()); + e.printStackTrace(); + } + + return bindings; + } + + @SuppressWarnings("unchecked") + private Map parseJsonProperties(JSONObject propsObj) { + Map result = Map$.MODULE$.empty(); + + for (String key : propsObj.keySet()) { + Object val = propsObj.get(key); + if (val instanceof String || val instanceof Boolean) { + result = (Map) result.$plus(new Tuple2<>(key, val)); + } else if (val instanceof Number) { + Number num = (Number) val; + Object boxed; + if (val instanceof Double || val instanceof Float) { + boxed = num.doubleValue(); + } else { + boxed = num.longValue(); + } + result = (Map) result.$plus(new Tuple2<>(key, boxed)); + } else if (val == JSONObject.NULL) { + throw new IllegalArgumentException("properties values cannot be null (key '" + key + "')"); + } else { + throw new IllegalArgumentException("properties values must be primitive (string/number/boolean) (key '" + key + "')"); + } + } + + return result; + } + + private String escapeJsonString(String input) { + if (input == null) { + return ""; + } + + return input + .replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", "\\n") + .replace("\r", "\\r") + .replace("\t", "\\t"); + } +} diff --git a/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java b/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java new file mode 100644 index 0000000..b54f299 --- /dev/null +++ b/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java @@ -0,0 +1,136 @@ +package org.mule.weave.lib; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Holds an open {@link InputStream} and associated metadata for a streaming script execution. + * + *

Instances are stored in a static registry keyed by a monotonically increasing handle so that + * native callers can reference them across {@code @CEntryPoint} invocations.

+ */ +public class StreamSession { + + private static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>(); + private static final AtomicLong NEXT_HANDLE = new AtomicLong(1); + + private final InputStream inputStream; + private final String mimeType; + private final String charset; + private final boolean binary; + private final String error; + + StreamSession(InputStream inputStream, String mimeType, String charset, boolean binary) { + this.inputStream = inputStream; + this.mimeType = mimeType; + this.charset = charset; + this.binary = binary; + this.error = null; + } + + private StreamSession(String error) { + this.inputStream = null; + this.mimeType = null; + this.charset = null; + this.binary = false; + this.error = error; + } + + /** + * Creates an error session that carries only an error message. + * + * @param error the error message + * @return an error session + */ + public static StreamSession ofError(String error) { + return new StreamSession(error); + } + + /** + * Registers this session and returns its handle. + * + * @return a unique handle that callers use to reference this session + */ + public long register() { + long handle = NEXT_HANDLE.getAndIncrement(); + SESSIONS.put(handle, this); + return handle; + } + + /** + * Looks up a previously registered session. + * + * @param handle the handle returned by {@link #register()} + * @return the session, or {@code null} if not found + */ + public static StreamSession get(long handle) { + return SESSIONS.get(handle); + } + + /** + * Removes and closes a session. + * + * @param handle the session handle + */ + public static void close(long handle) { + StreamSession session = SESSIONS.remove(handle); + if (session != null && session.inputStream != null) { + try { + session.inputStream.close(); + } catch (IOException ignored) { + } + } + } + + /** + * Reads up to {@code len} bytes into the provided byte array. + * + * @param buf destination buffer + * @param len maximum number of bytes to read + * @return number of bytes actually read, or {@code -1} on EOF + * @throws IOException if an I/O error occurs + */ + public int read(byte[] buf, int len) throws IOException { + return inputStream.read(buf, 0, len); + } + + public String getMimeType() { + return mimeType; + } + + public String getCharset() { + return charset; + } + + public boolean isBinary() { + return binary; + } + + /** + * Closes the underlying input stream. Safe to call on error sessions (no-op). + */ + public void closeStream() { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException ignored) { + } + } + } + + /** + * Returns the error message if this is an error session, or {@code null} otherwise. + */ + public String getError() { + return error; + } + + /** + * Returns {@code true} if this session represents a failed execution. + */ + public boolean isError() { + return error != null; + } +} diff --git a/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat new file mode 100644 index 0000000..b1a22df --- /dev/null +++ b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat @@ -0,0 +1,9 @@ +org.mule.weave.v2.interpreted.module.WeaveDataFormat +org.mule.weave.v2.module.core.json.JsonDataFormat +org.mule.weave.v2.module.core.xml.XmlDataFormat +org.mule.weave.v2.module.core.csv.CSVDataFormat +org.mule.weave.v2.module.core.octetstream.OctetStreamDataFormat +org.mule.weave.v2.module.core.textplain.TextPlainDataFormat +org.mule.weave.v2.module.core.urlencoded.UrlEncodedDataFormat +org.mule.weave.v2.module.core.multipart.MultiPartDataFormat +org.mule.weave.v2.module.core.properties.PropertiesDataFormat \ No newline at end of file diff --git a/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader new file mode 100644 index 0000000..ef3215b --- /dev/null +++ b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader @@ -0,0 +1 @@ +org.mule.weave.v2.compilation.loader.WeaveBinaryResourceModuleLoader \ No newline at end of file diff --git a/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java b/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java new file mode 100644 index 0000000..70f8044 --- /dev/null +++ b/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java @@ -0,0 +1,613 @@ +package org.mule.weave.lib; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Base64; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +class ScriptRuntimeTest { + + @Test + void runSimpleScript() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Running sqrt(144) 10 times with timing:"); + System.out.println("=".repeat(50)); + + for (int i = 1; i <= 20; i++) { + long startTime = System.nanoTime(); + String result = runtime.run("sqrt(144)"); + long endTime = System.nanoTime(); + double executionTimeMs = (endTime - startTime) / 1_000_000.0; + + assertEquals("12", Result.parse(result).result); + System.out.printf("Run %2d: %.3f ms - Result: %s%n", i, executionTimeMs, result); + } + + System.out.println("=".repeat(50)); + } + + @Test + void runParseError() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Running sqrt(144) 10 times with timing:"); + System.out.println("=".repeat(50)); + + String result = runtime.run("invalid syntax here"); + + String error = Result.parse(result).error; + assertTrue(error.contains("Unable to resolve reference")); + System.out.printf("Error: %s%n", result); + + System.out.println("=".repeat(50)); + } + + @Test + void runWithInputs() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing runWithInputs with two integer numbers:"); + System.out.println("=".repeat(50)); + + // Test 1: Sum 25 + 17 + int num1 = 25; + int num2 = 17; + int expected = num1 + num2; + + // Create inputs JSON with content and mimeType for each binding + String inputsJson = String.format( + "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " + + "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}", + encode(num1), encode(num2) + ); + + String script = "num1 + num2"; + + System.out.printf("Test 1: %d + %d%n", num1, num2); + System.out.printf("Script: %s%n", script); + System.out.printf("Inputs: %s%n", inputsJson); + + long startTime = System.nanoTime(); + String result = Result.parse(runtime.run(script, inputsJson)).result; + long endTime = System.nanoTime(); + double executionTimeMs = (endTime - startTime) / 1_000_000.0; + + System.out.printf("Result: %s%n", result); + System.out.printf("Expected: %d%n", expected); + System.out.printf("Execution time: %.3f ms%n", executionTimeMs); + + assertEquals(String.valueOf(expected), result); + System.out.println("✓ Test 1 passed!"); + + System.out.println("-".repeat(50)); + + // Test 2: Sum 100 + 250 + num1 = 100; + num2 = 250; + expected = num1 + num2; + + inputsJson = String.format( + "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " + + "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}", + encode(num1), encode(num2) + ); + + System.out.printf("Test 2: %d + %d%n", num1, num2); + System.out.printf("Script: %s%n", script); + + startTime = System.nanoTime(); + result = Result.parse(runtime.run(script, inputsJson)).result; + endTime = System.nanoTime(); + executionTimeMs = (endTime - startTime) / 1_000_000.0; + + System.out.printf("Result: %s%n", result); + System.out.printf("Expected: %d%n", expected); + System.out.printf("Execution time: %.3f ms%n", executionTimeMs); + + assertEquals(String.valueOf(expected), result); + System.out.println("✓ Test 2 passed!"); + + System.out.println("=".repeat(50)); + } + + private String encode(Object value) { + byte[] bytes = value instanceof byte[] ? (byte[]) value : String.valueOf(value).getBytes(); + return Base64.getEncoder().encodeToString(bytes); + + } + + @Test + void runWithXmlInput() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing runWithInputs with XML input to calculate average age:"); + System.out.println("=".repeat(50)); + + // XML input with two people + String xmlInput = """ + + + 19 + john + + + 25 + jane + + + """; + + String inputsJson = String.format( + "{\"people\": {\"content\": \"%s\", \"mimeType\": \"application/xml\"}}", + encode(xmlInput) + ); + + // DataWeave script to calculate average age + String script = """ + output application/json + --- + avg(people.people.*person.age) + """; + + System.out.printf("XML Input:%n%s%n", xmlInput); + System.out.printf("Script:%n%s%n", script); + + long startTime = System.nanoTime(); + String result = runtime.run(script, inputsJson); + long endTime = System.nanoTime(); + double executionTimeMs = (endTime - startTime) / 1_000_000.0; + + System.out.printf("Result: %s%n", result); + System.out.printf("Expected: 22 (average of 19 and 25)%n"); + System.out.printf("Execution time: %.3f ms%n", executionTimeMs); + + // The average of 19 and 25 is 22 + assertEquals("22", Result.parse(result).result); + System.out.println("✓ Test passed!"); + + System.out.println("=".repeat(50)); + } + + @Test + void runWithJsonObjectInput() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing runWithInputs with JSON object input:"); + System.out.println("=".repeat(50)); + + String jsonInput = "{\"name\": \"John\", \"age\": 30}"; + + String inputsJson = String.format( + "{\"payload\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}", + encode(jsonInput) + ); + + // DataWeave script to extract name + String script = "output application/json\n---\npayload.name"; + + System.out.printf("JSON Input: %s%n", jsonInput); + System.out.printf("Script: %s%n", script); + + long startTime = System.nanoTime(); + String result = Result.parse(runtime.run(script, inputsJson)).result; + long endTime = System.nanoTime(); + double executionTimeMs = (endTime - startTime) / 1_000_000.0; + + System.out.printf("Result: %s%n", result); + System.out.printf("Expected: \"John\"%n"); + System.out.printf("Execution time: %.3f ms%n", executionTimeMs); + + assertEquals("\"John\"", result); + System.out.println("✓ Test passed!"); + + System.out.println("=".repeat(50)); + } + + @Test + void runWithBinaryResult() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Running fromBase64 10 times with timing:"); + System.out.println("=".repeat(50)); + + for (int i = 1; i <= 1; i++) { + long startTime = System.nanoTime(); + Result result = Result.parse(runtime.run("import fromBase64 from dw::core::Binaries\n" + + "output application/octet-stream\n" + + "---\n" + + "fromBase64(\"12345678\")", "")); + long endTime = System.nanoTime(); + double executionTimeMs = (endTime - startTime) / 1_000_000.0; + + assertEquals("12345678", result.result); + System.out.printf("Run %2d: %.3f ms - Result: %s%n", i, executionTimeMs, result.result); + } + + System.out.println("=".repeat(50)); + } + + @Test + void runWithInputProperties() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + String encodedIn0 = Base64.getEncoder().encodeToString("1234567".getBytes()); + Result result = Result.parse(runtime.run("in0.column_1[0] as Number", + "{\"in0\": " + + "{\"content\": \"" + encodedIn0 + "\", " + + "\"mimeType\": \"application/csv\", " + + "\"properties\": {\"header\": false, \"separator\": \"4\"}}}")); + assertEquals("567", result.result); + + } + + @Test + void streamSimpleScript() throws IOException { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing streaming simple script:"); + System.out.println("=".repeat(50)); + + StreamSession session = runtime.runStreaming("sqrt(144)", null); + assertFalse(session.isError(), "Expected successful session"); + assertNull(session.getError()); + assertNotNull(session.getMimeType()); + + byte[] buf = new byte[64]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int n; + while ((n = session.read(buf, buf.length)) > 0) { + bos.write(buf, 0, n); + } + String result = bos.toString(session.getCharset()); + assertEquals("12", result); + StreamSession.close(session.register()); // clean up handle + + System.out.println("Result: " + result); + System.out.println("✓ Streaming simple script passed!"); + System.out.println("=".repeat(50)); + } + + @Test + void streamWithInputs() throws IOException { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing streaming with inputs:"); + System.out.println("=".repeat(50)); + + String inputsJson = String.format( + "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " + + "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}", + encode(25), encode(17) + ); + + StreamSession session = runtime.runStreaming("num1 + num2", inputsJson); + assertFalse(session.isError()); + + byte[] buf = new byte[64]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int n; + while ((n = session.read(buf, buf.length)) > 0) { + bos.write(buf, 0, n); + } + String result = bos.toString(session.getCharset()); + assertEquals("42", result); + StreamSession.close(session.register()); + + System.out.println("Result: " + result); + System.out.println("✓ Streaming with inputs passed!"); + System.out.println("=".repeat(50)); + } + + @Test + void streamChunkedRead() throws IOException { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing streaming chunked read:"); + System.out.println("=".repeat(50)); + + String script = "output application/json\n---\n{items: (1 to 100) map {id: $, name: \"item_\" ++ $}}"; + + StreamSession session = runtime.runStreaming(script, null); + assertFalse(session.isError()); + + byte[] smallBuf = new byte[32]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int n; + int chunkCount = 0; + while ((n = session.read(smallBuf, smallBuf.length)) > 0) { + bos.write(smallBuf, 0, n); + chunkCount++; + } + String result = bos.toString(session.getCharset()); + assertTrue(chunkCount > 1, "Expected multiple chunks, got " + chunkCount); + assertTrue(result.contains("item_1")); + assertTrue(result.contains("item_100")); + StreamSession.close(session.register()); + + System.out.printf("Read %d chunks, total %d bytes%n", chunkCount, bos.size()); + System.out.println("✓ Streaming chunked read passed!"); + System.out.println("=".repeat(50)); + } + + @Test + void streamWithStreamingInput() throws Exception { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing streaming with streaming input:"); + System.out.println("=".repeat(50)); + + // Create an input stream session for JSON data + InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8"); + long inputHandle = inputSession.register(); + + // Build inputs JSON referencing the streamHandle + String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}"; + + // The DW engine will read from the PipedInputStream on the main thread, + // so we must feed data from a separate thread. + CountDownLatch started = new CountDownLatch(1); + AtomicReference feedError = new AtomicReference<>(); + + Thread feeder = new Thread(() -> { + try { + started.countDown(); + String jsonData = "{\"name\": \"Alice\", \"age\": 30}"; + byte[] bytes = jsonData.getBytes("UTF-8"); + inputSession.write(bytes, bytes.length); + inputSession.closeWriter(); + } catch (Exception e) { + feedError.set(e); + } + }); + feeder.start(); + started.await(); + + // Run streaming with the piped input + StreamSession session = runtime.runStreaming("output application/json\n---\npayload.name", inputsJson); + assertFalse(session.isError(), "Expected successful session but got: " + session.getError()); + + byte[] buf = new byte[64]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int n; + while ((n = session.read(buf, buf.length)) > 0) { + bos.write(buf, 0, n); + } + String result = bos.toString(session.getCharset()); + assertEquals("\"Alice\"", result); + StreamSession.close(session.register()); + InputStreamSession.close(inputHandle); + feeder.join(5000); + assertNull(feedError.get(), "Feeder thread threw: " + feedError.get()); + + System.out.println("Result: " + result); + System.out.println("✓ Streaming with streaming input passed!"); + System.out.println("=".repeat(50)); + } + + @Test + void streamWithLargeStreamingInput() throws Exception { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing streaming with large streaming input:"); + System.out.println("=".repeat(50)); + + InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8"); + long inputHandle = inputSession.register(); + + String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}"; + + // Feed a large JSON array from a separate thread + AtomicReference feedError = new AtomicReference<>(); + Thread feeder = new Thread(() -> { + try { + StringBuilder sb = new StringBuilder("["); + for (int i = 1; i <= 1000; i++) { + if (i > 1) sb.append(","); + sb.append("{\"id\":").append(i).append(",\"val\":\"item_").append(i).append("\"}"); + } + sb.append("]"); + byte[] bytes = sb.toString().getBytes("UTF-8"); + // Write in chunks to simulate streaming + int chunkSize = 4096; + for (int off = 0; off < bytes.length; off += chunkSize) { + int len = Math.min(chunkSize, bytes.length - off); + byte[] chunk = new byte[len]; + System.arraycopy(bytes, off, chunk, 0, len); + inputSession.write(chunk, len); + } + inputSession.closeWriter(); + } catch (Exception e) { + feedError.set(e); + } + }); + feeder.start(); + + StreamSession session = runtime.runStreaming("output application/json\n---\nsizeOf(payload)", inputsJson); + assertFalse(session.isError(), "Expected successful session but got: " + session.getError()); + + byte[] buf = new byte[256]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int n; + while ((n = session.read(buf, buf.length)) > 0) { + bos.write(buf, 0, n); + } + String result = bos.toString(session.getCharset()); + assertEquals("1000", result); + StreamSession.close(session.register()); + InputStreamSession.close(inputHandle); + feeder.join(10000); + assertNull(feedError.get(), "Feeder thread threw: " + feedError.get()); + + System.out.println("Result: " + result); + System.out.println("✓ Streaming with large streaming input passed!"); + System.out.println("=".repeat(50)); + } + + @Test + void streamErrorSession() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing streaming error session:"); + System.out.println("=".repeat(50)); + + StreamSession session = runtime.runStreaming("invalid syntax here", null); + assertTrue(session.isError()); + assertNotNull(session.getError()); + assertTrue(session.getError().contains("Unable to resolve reference")); + + System.out.println("Error: " + session.getError()); + System.out.println("✓ Streaming error session passed!"); + System.out.println("=".repeat(50)); + } + + // ── Callback-based streaming pattern tests ────────────────────────── + + @Test + void callbackOutputStreaming() throws IOException { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing callback-based output streaming:"); + System.out.println("=".repeat(50)); + + StreamSession session = runtime.runStreaming("output application/json\n---\n{items: (1 to 50) map {id: $}}", null); + assertFalse(session.isError()); + + // Simulate the write-callback pattern: read chunks and collect them + ByteArrayOutputStream collected = new ByteArrayOutputStream(); + byte[] buf = new byte[64]; + int callbackCount = 0; + int n; + while ((n = session.read(buf, buf.length)) > 0) { + // This is what the write callback would receive + collected.write(buf, 0, n); + callbackCount++; + } + String result = collected.toString(session.getCharset()); + assertTrue(result.contains("\"id\": 1"), "Expected id 1 in result"); + assertTrue(result.contains("\"id\": 50"), "Expected id 50 in result"); + assertTrue(callbackCount > 0, "Expected at least one callback invocation"); + StreamSession.close(session.register()); + + System.out.printf("Callback invoked %d times, total %d bytes%n", callbackCount, collected.size()); + System.out.println("✓ Callback output streaming passed!"); + System.out.println("=".repeat(50)); + } + + @Test + void callbackInputOutputStreaming() throws Exception { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing callback-based input+output streaming:"); + System.out.println("=".repeat(50)); + + // Simulate the read-callback pattern: a feeder thread pulls from a data source + // and pushes into an InputStreamSession, while the main thread reads the output. + InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8"); + long inputHandle = inputSession.register(); + + String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}"; + + // Simulate read callback: feeds a JSON array in chunks + byte[] sourceData = "[10, 20, 30, 40, 50]".getBytes("UTF-8"); + AtomicReference feedError = new AtomicReference<>(); + + Thread feeder = new Thread(() -> { + try { + int chunkSize = 8; + for (int off = 0; off < sourceData.length; off += chunkSize) { + int len = Math.min(chunkSize, sourceData.length - off); + byte[] chunk = new byte[len]; + System.arraycopy(sourceData, off, chunk, 0, len); + inputSession.write(chunk, len); + } + inputSession.closeWriter(); + } catch (Exception e) { + feedError.set(e); + } + }, "test-read-callback-feeder"); + feeder.start(); + + StreamSession session = runtime.runStreaming("output application/json\n---\npayload map ($ * 2)", inputsJson); + assertFalse(session.isError(), "Expected successful session but got: " + session.getError()); + + // Simulate write callback: collect output chunks + ByteArrayOutputStream collected = new ByteArrayOutputStream(); + byte[] buf = new byte[32]; + int callbackCount = 0; + int n; + while ((n = session.read(buf, buf.length)) > 0) { + collected.write(buf, 0, n); + callbackCount++; + } + String result = collected.toString(session.getCharset()); + assertTrue(result.contains("20"), "Expected 20 in result (10*2)"); + assertTrue(result.contains("100"), "Expected 100 in result (50*2)"); + + StreamSession.close(session.register()); + InputStreamSession.close(inputHandle); + feeder.join(5000); + assertNull(feedError.get(), "Feeder thread threw: " + feedError.get()); + + System.out.printf("Read callback fed %d bytes, write callback invoked %d times, output: %s%n", + sourceData.length, callbackCount, result.trim()); + System.out.println("✓ Callback input+output streaming passed!"); + System.out.println("=".repeat(50)); + } + + @Test + void callbackOutputStreamingError() { + ScriptRuntime runtime = ScriptRuntime.getInstance(); + + System.out.println("Testing callback-based output streaming with error:"); + System.out.println("=".repeat(50)); + + StreamSession session = runtime.runStreaming("invalid syntax here", null); + assertTrue(session.isError()); + assertNotNull(session.getError()); + + System.out.println("Error correctly returned: " + session.getError()); + System.out.println("✓ Callback output streaming error passed!"); + System.out.println("=".repeat(50)); + } + + static class Result { + boolean success; + String result; + String error; + boolean binary; + String mimeType; + String charset; + + static Result parse(String json) { + Result result = new Result(); + org.json.JSONObject obj = new org.json.JSONObject(json); + + result.success = obj.getBoolean("success"); + if (result.success) { + result.binary = obj.getBoolean("binary"); + result.mimeType = obj.getString("mimeType"); + result.charset = obj.getString("charset"); + String encoded = obj.getString("result"); + if (result.binary) { + result.result = encoded; + } else { + result.result = new String(Base64.getDecoder().decode(encoded), Charset.forName(result.charset)); + } + } else { + result.error = obj.getString("error"); + } + return result; + } + } + +} diff --git a/settings.gradle b/settings.gradle index a47c02c..befbb96 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,3 @@ include 'native-cli' include 'native-cli-integration-tests' - +include 'native-lib'