feat: add support for distributed incremental microbatches#444
feat: add support for distributed incremental microbatches#444canbekley wants to merge 2 commits intoClickHouse:mainfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for distributed incremental microbatches by introducing a new model config is_distributed and updating the underlying adapter logic accordingly.
- Updates integration tests to include distributed and non-distributed cases
- Modifies ClickHouseRelation.get_on_cluster and ClickHouseConnectionManager methods to account for the new is_distributed flag
- Adds a get_materialization function in the ClickHouse adapter to properly alias materialization types for distributed models
Reviewed Changes
Copilot reviewed 4 out of 6 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| tests/integration/adapter/incremental/test_incremental_microbatch.py | Updates tests to pass is_distributed values and use dynamic materialized settings for both distributed and non-distributed scenarios |
| dbt/adapters/clickhouse/relation.py | Updates get_on_cluster signature and usage to include the is_distributed flag derived from model configs |
| dbt/adapters/clickhouse/impl.py | Propagates the is_distributed flag in the should_on_cluster method |
| dbt/adapters/clickhouse/init.py | Introduces get_materialization to transform materialized and is_distributed settings for compatibility |
Files not reviewed (2)
- dbt/include/clickhouse/macros/adapters/relation.sql: Language not supported
- dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql: Language not supported
| "input_model.sql": _input_model_sql, | ||
| "microbatch_model.sql": _microbatch_model_sql, | ||
| "input_model.sql": _input_model_sql % "table", | ||
| "microbatch_model.sql": _microbatch_model_sql % "False", # `is_distributed` param |
There was a problem hiding this comment.
The is_distributed parameter is provided as a string ('False' or 'True') in this test, but the adapter functions expect a boolean value. Consider converting the string to a boolean to avoid misinterpretation of truthy values.
| "microbatch_model.sql": _microbatch_model_sql % "False", # `is_distributed` param | |
| "microbatch_model.sql": _microbatch_model_sql % False, # `is_distributed` param |
| is_distributed = relation_config.config.get('extra', {}).get('is_distributed') | ||
| engine = relation_config.config.get('engine') or '' |
There was a problem hiding this comment.
The value for is_distributed may be passed as a string instead of a boolean, which could lead to unintended behavior in get_on_cluster. Consider adding an explicit conversion to boolean.
| is_distributed = relation_config.config.get('extra', {}).get('is_distributed') | |
| engine = relation_config.config.get('engine') or '' | |
| is_distributed = relation_config.config.get('extra', {}).get('is_distributed') | |
| # Ensure is_distributed is a boolean | |
| is_distributed = str(is_distributed).lower() == "true" if is_distributed is not None else False |
Adds support for microbatch distributed incremental materialization, by adding a new model config
is_distributed. Distributed models that run in microbatches would need to be created as following:{{ config( materialized='incremental', is_distributed=True, incremental_strategy='microbatch', ... ) }} ...related issue: #439