Skip to content

synthefy plugin#39

Open
ajanbekzat wants to merge 2 commits intoinfluxdata:mainfrom
ajanbekzat:main
Open

synthefy plugin#39
ajanbekzat wants to merge 2 commits intoinfluxdata:mainfrom
ajanbekzat:main

Conversation

@ajanbekzat
Copy link
Copy Markdown

@ajanbekzat ajanbekzat commented Jan 12, 2026

Synthefy Forecasting Plugin for InfluxDB 3

This PR adds a new HTTP trigger plugin that integrates Synthefy Forecasting API with InfluxDB 3 to enable on-demand time series forecasting.

Features

  • On-demand forecasting via HTTP requests
  • Multiple model support: Works with Synthefy models (sfm-tabular, Migas-latest, Chronos2, Moirai2, TimesFM, etc.)
  • Multivariate forecasting: Supports metadata fields as covariates
  • Tag filtering: Filter time series by tags (location, device, etc.)
  • Automatic data writing: Forecasts written back to InfluxDB using Line Protocol

How It Works

  1. Queries historical time series data from InfluxDB based on measurement, field, tags, and time range
  2. Transforms data to Synthefy API v2 request format
  3. Calls Synthefy Forecasting API to generate forecasts
  4. Writes forecast results (with quantiles) back to InfluxDB

Requirements

  • Python 3.7+
  • pandas and httpx/requests packages
  • Synthefy API key (create one here)

Usage Example

curl -X POST http://localhost:8181/api/v3/engine/forecast \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "measurement": "temperature",
    "field": "value",
    "tags": "location=NYC",
    "time_range": "30d",
    "forecast_horizon": "7d",
    "model": "sfm-tabular",
    "api_key": "your-api-key",
    "metadata_fields": "humidity,pressure"
  }'

format

update the library to point

docstrings

update
@garylfowler
Copy link
Copy Markdown
Collaborator

Thank you for this submission; we will be reviewing soon.

"required": false
},
{
"name": "api_key",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For new plugins, it is advisable to only pass sensitive data like api_key through request headers or environment variables

| `field` | string | "value" | Field name to forecast |
| `tags` | string | "" | Tag filters, comma-separated (e.g., "location=NYC,device=sensor1") |
| `time_range` | string | "30d" | Historical data window. Format: `<number><unit>` (e.g., "30d") |
| `forecast_horizon` | string | "7d" | Forecast duration. Format: `<number><unit>` or "<number> points" |
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better understanding, it is worth specifying the supported units (d, h) in the documentation

| `api_key` | string | required | Synthefy API key (create at [console.synthefy.com/api-keys](https://console.synthefy.com/api-keys) or set SYNTHEFY_API_KEY environment variable) |
| `output_measurement` | string | "{measurement}_forecast" | Destination measurement for forecast results |
| `metadata_fields` | string | "" | Comma-separated list of metadata field names to use as covariates |
| `database` | string | "" | Database name for reading and writing data (optional) |
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states that this parameter is also used to specify the database to read data from. However, the plugin's code doesn't support this logic, and triggers can only read data from the database in which they were created

f"Forecasts written successfully to InfluxDB (attempt {attempt + 1})"
)
return
except Exception as e:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the processing engine for the influxdb3_local.write_to_db and influxdb3_local.write functions does not throw errors when writing during trigger execution. The current architecture works as follows: during trigger execution, all records are accumulated in a buffer, and the write itself occurs after the trigger completes. If there are write errors (different field types, etc.), errors are raised at the InfluxDB level. This means that the trigger is unaware of whether the write completed successfully and will never receive an error for the write operation. To address this issue, support for the influxdb3_local.write_sync and write_sync_to_db functions was added in InfluxDB 3.8.2. The write occurs immediately, which allows for error detection. You can see usage examples in other plugins (amqp_subscriber, kafka_subscriber, mqtt_subscriber)

try:
body_dict = json.loads(request_body)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON string: {e}")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to throw an error here and terminate the plugin. If there's an error in the request body, the plugin will continue executing implicitly with the settings in args, even though the user expects the behavior they specified in the request body

|----------------------|--------|----------|-------------------------------------------------------------------|
| `measurement` | string | required | Source measurement containing historical data |
| `field` | string | "value" | Field name to forecast |
| `tags` | string | "" | Tag filters, comma-separated (e.g., "location=NYC,device=sensor1") |
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When passing parameters via --trigger-arguments, avoid commas in the values ​​themselves, as parsing will fail. In the case of --trigger-arguments tags=room=Bedroom,room2=Bedroom,field=temp, parsing will return {"tags": "room=Bedroom", "room2": "Bedroom", "field": "temp"}, which is incorrect.
It is recommended to use a space (tags="room=Bedroom room2=Bedroom",field=temp)

| `model` | string | "sfm-tabular"| Synthefy model to use (e.g., "sfm-tabular", "Migas-latest") |
| `api_key` | string | required | Synthefy API key (create at [console.synthefy.com/api-keys](https://console.synthefy.com/api-keys) or set SYNTHEFY_API_KEY environment variable) |
| `output_measurement` | string | "{measurement}_forecast" | Destination measurement for forecast results |
| `metadata_fields` | string | "" | Comma-separated list of metadata field names to use as covariates |
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same as for the tags field

logger.debug(f"Request data: {json.dumps(request_data, indent=2)}")

try:
if hasattr(httpx, "post"):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The httpx library always has a post method, the else branch is dead code


```bash
influxdb3 install package pandas
influxdb3 install package httpx
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin code tries to import requests if httpx is missing, but the documentation doesn't mention installing requests


where_clause = " AND ".join(where_parts)

query = f"""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When composing SQL, it is recommended to wrap field and table names in single quotes, which will allow their names to be parsed correctly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants