Skip to content

feat(condo) : DOMA-12967 add ai streaming#7443

Open
tolmachev21 wants to merge 6 commits intomainfrom
feat/condo/DOMA-12967/add-ai-streaming
Open

feat(condo) : DOMA-12967 add ai streaming#7443
tolmachev21 wants to merge 6 commits intomainfrom
feat/condo/DOMA-12967/add-ai-streaming

Conversation

@tolmachev21
Copy link
Copy Markdown
Contributor

@tolmachev21 tolmachev21 commented Apr 9, 2026

Summary by CodeRabbit

  • New Features

    • Real-time AI streaming for live, incremental responses (feature-flag gated).
    • Per-flow authentication for AI integrations (flow-specific API keys).
    • Streamed AI output surfaces immediately in comment generation and flow UI.
  • Bug Fixes

    • Better event propagation and structured error reporting.
    • Reliable fallback to polling when streaming or connection is unavailable.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 9, 2026

📝 Walkthrough

Walkthrough

Adapters now accept a per-flowType API key and an event callback; N8N adapter parses chunked, newline-delimited JSON streams and emits events. New streaming event/chunk constants and an AI_STREAMING feature flag were added. Task execution publishes lifecycle and flow events to messaging; the hook subscribes and exposes streaming text state.

Changes

Cohort / File(s) Summary
Adapter Core
apps/condo/domains/ai/adapters/AbstractAdapter.js, apps/condo/domains/ai/adapters/N8NAdapter.js
Updated execute signature to (predictUrl, context, flowType, onEvent). JSDoc return shape changed (result now object, _response renamed). N8NAdapter uses per-flowType API keys (Authorization: Bearer), validates key presence, and supports chunked newline-delimited JSON streams: decodes events, calls onEvent for start/item/end/error, accumulates answer, and returns stream-aware _response.
Streaming Constants
apps/condo/domains/ai/constants.js
Added EVENT_TYPES, CHUNK_TYPES, and CHUNK_TYPES_LIST exports for event and chunk identification.
Feature Flags
apps/condo/domains/common/constants/featureflags.js
Added AI_STREAMING feature flag constant.
Hook & UI Integration
apps/condo/domains/ai/hooks/useAIFlow.tsx, apps/condo/domains/common/components/Comments/index.tsx
Hook gains streaming mode gated by AI_STREAMING, subscribes to messaging, exposes streamDataText in return shape, appends FLOW_ITEM chunks to it, and skips polling when streaming connected; Comments now consumes streamDataText as fallback for answers.
Task Execution & Messaging
apps/condo/domains/ai/tasks/executeAIFlow.js
Publishes TASK_START/TASK_END/TASK_ERROR, maps adapter EVENT_TYPESCHUNK_TYPES (publishing FLOW_* events), constructs per-user publish topic, and publishes streaming item/error events. Also publishes validation errors and structured error payloads on failures.

Sequence Diagram

sequenceDiagram
    participant Client as Client Component
    participant Hook as useAIFlow Hook
    participant Task as executeAIFlow Task
    participant Adapter as N8NAdapter
    participant N8N as N8N Backend
    participant Messaging as Messaging System
    participant WS as WebSocket Stream

    Client->>Hook: execute(taskData)
    Hook->>Task: create/resume task (sets currentTaskId)
    Task->>Messaging: publish CHUNK_TYPES.TASK_START
    Task->>Adapter: execute(predictUrl, context, flowType, onEvent)

    Adapter->>N8N: POST (Authorization: Bearer apiKey for flowType)
    N8N-->>Adapter: chunked newline-delimited JSON stream

    Adapter->>Adapter: parse lines -> EVENT_TYPES (start/item/end/error)
    Adapter->>Task: onEvent(ITEM with content)
    Task->>Messaging: publish CHUNK_TYPES.FLOW_ITEM (with item)
    Messaging-->>WS: deliver FLOW_ITEM
    WS-->>Hook: receive FLOW_ITEM
    Hook->>Hook: append item.content to streamDataText

    Adapter-->>Task: onEvent(END)
    Task->>Messaging: publish CHUNK_TYPES.TASK_END
    Messaging-->>WS: deliver TASK_END
    WS-->>Hook: receive TASK_END
    Hook->>Hook: set loading false
    Hook-->>Client: return { data, streamDataText, loading }
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🐰 I hopped through streams of JSON bright,

nibbling events in morning light,
items tumble, answers grow,
messages carry the steady flow,
a streaming rabbit cheers: "Nice code!" ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(condo) : DOMA-12967 add ai streaming' accurately summarizes the main change: adding AI streaming functionality across multiple adapter and hook files in the condo application.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/condo/DOMA-12967/add-ai-streaming

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@tolmachev21 tolmachev21 marked this pull request as ready for review April 10, 2026 10:34
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 309b15a2de

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

@tolmachev21 tolmachev21 force-pushed the feat/condo/DOMA-12967/add-ai-streaming branch from 309b15a to 4ea5bfd Compare April 10, 2026 10:43
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/condo/domains/ai/adapters/N8NAdapter.js (1)

32-42: ⚠️ Potential issue | 🟠 Major

Validate the requested flow's API key before calling n8n.

isConfigured does not guarantee that AI_ADAPTERS_CONFIG.n8n[flowType] exists. If that entry is missing, Line 41 sends Authorization: Bearer undefined to the webhook instead of surfacing a local config error.

Suggested fix
     async execute (predictUrl, context, flowType, onEvent) {
         if (!this.isConfigured) {
             throw new Error('N8NAdapter not configured!')
         }
+
+        const apiKey = AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey
+        if (!apiKey) {
+            throw new Error(`N8NAdapter apiKey is not configured for flow "${flowType}"`)
+        }
 
         const response = await fetch(
             predictUrl,
             {
                 headers: {
-                    Authorization: `Bearer ${AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey}`,
+                    Authorization: `Bearer ${apiKey}`,
                     'Content-Type': 'application/json',
                 },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/adapters/N8NAdapter.js` around lines 32 - 42, In
execute, isConfigured alone isn't enough—before calling fetch, verify
AI_ADAPTERS_CONFIG.n8n and AI_ADAPTERS_CONFIG.n8n[flowType] and that its apiKey
is present; if missing, throw a clear configuration Error (e.g., "N8NAdapter
missing API key for flowType: <flowType>") instead of allowing Authorization:
Bearer undefined; update the check near the start of execute (in the same
function that currently uses isConfigured and builds Authorization header) to
validate and fail fast when AI_ADAPTERS_CONFIG.n8n[flowType].apiKey is falsy.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/condo/domains/ai/adapters/N8NAdapter.js`:
- Around line 71-83: In _pushEvent, if JSON.parse(line) throws the catch path
calls onEvent with an ERROR but then continues to the switch(event.type) which
will throw because event is undefined; update _pushEvent so after calling await
onEvent({type: EVENT_TYPES.ERROR, ...}) the function returns early (or skips the
switch) to avoid executing switch(event.type) when event is not set; locate the
event variable, the catch block around JSON.parse(line), the onEvent call, and
the switch (event.type) to apply the early return/skip.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx`:
- Around line 108-113: The memoized topic computation in useAIFlow.tsx can throw
because userChannel may be undefined before channels populate; update the
useMemo for topic to guard the lookup by checking that userChannel exists (and
that channels.length > 0 or userChannel !== undefined) before accessing
userChannel.topicPrefix, and return undefined (or null) when the channel is
missing so the subscription code using topic only runs when a valid topic string
is returned; reference the const topic created by useMemo, the channels array,
the userChannel variable, and the topicPrefix property when applying this guard.
- Around line 115-123: The streaming branch never sets the hook's final
data/result: update the onMessage handler (function onMessage) so that when
message.type === CHUNK_TYPES.TASK_END you populate the hook's data with the
accumulated stream text (the state updated via setSteamDataText) before clearing
loading; likewise ensure TASK_ERROR sets data appropriately or clears it. Also
adjust the execute()/resume() resolution logic so they return the final { data:
{ result: <finalText> } } (not { data: null }) when streaming completes. Apply
the same fixes to the other onMessage/finish handlers referenced around lines
268-272 and 303-307 so callers awaiting data.result receive the final payload.
- Line 279: In useAIFlow, the polling fallback path using
startPollingForResult() can finish without clearing loading when
aiStreamingEnabled is true but the websocket is disconnected; update the
finally/cleanup logic so that loading is set to false whenever
startPollingForResult() completes or errors while !isConnected, not only when
!aiStreamingEnabled && !isConnected — ensure both occurrences referenced around
lines with startPollingForResult(), aiStreamingEnabled, isConnected, and loading
are changed so the polling completion always clears loading.

In `@apps/condo/domains/ai/tasks/executeAIFlow.js`:
- Around line 75-80: The publish() calls (e.g., the one sending {type:
CHUNK_TYPES.TASK_START}) must be best-effort so messaging outages don't break
the main task; wrap each publish call in a non-blocking try/catch or a helper
like safePublish that swallows errors and does not await long-running sends
(e.g., fire-and-forget Promise.catch or Promise.race with a short timeout) so
the adapter call and DB update can succeed regardless; update all publish
invocations in executeAIFlow (including the TASK_START publish and the publishes
around lines 93–134, 164–171, 191–196) to use this safe wrapper.
- Around line 216-223: The catch block calls buildUserTopic(task.user.id, ...)
without ensuring task exists, which can throw and hide the original error;
update the publish call in the catch to guard access to task (e.g., use
task?.user?.id or a fallback like a generic admin/error topic) before calling
buildUserTopic, so replace buildUserTopic(task.user.id,
`executionAIFlowTask.${task.id}`) with a safe expression that uses a fallback
topic when task is null and keep the rest of the payload (type:
CHUNK_TYPES.TASK_ERROR, error, errorMessage: i18n(...)) unchanged.

In `@apps/condo/domains/common/constants/featureflags.js`:
- Line 44: The AI_STREAMING feature flag constant has a typo ('ai-streamig') so
feature checks won't match; update the AI_STREAMING constant definition to the
correct slug 'ai-streaming' (change the string value assigned to the
AI_STREAMING constant) so all references using AI_STREAMING resolve to the
expected flag name.

---

Outside diff comments:
In `@apps/condo/domains/ai/adapters/N8NAdapter.js`:
- Around line 32-42: In execute, isConfigured alone isn't enough—before calling
fetch, verify AI_ADAPTERS_CONFIG.n8n and AI_ADAPTERS_CONFIG.n8n[flowType] and
that its apiKey is present; if missing, throw a clear configuration Error (e.g.,
"N8NAdapter missing API key for flowType: <flowType>") instead of allowing
Authorization: Bearer undefined; update the check near the start of execute (in
the same function that currently uses isConfigured and builds Authorization
header) to validate and fail fast when AI_ADAPTERS_CONFIG.n8n[flowType].apiKey
is falsy.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d2cd7372-31dc-4dde-b79c-6c35a6c4f068

📥 Commits

Reviewing files that changed from the base of the PR and between cbda9ca and 309b15a.

📒 Files selected for processing (7)
  • apps/condo/domains/ai/adapters/AbstractAdapter.js
  • apps/condo/domains/ai/adapters/N8NAdapter.js
  • apps/condo/domains/ai/constants.js
  • apps/condo/domains/ai/hooks/useAIFlow.tsx
  • apps/condo/domains/ai/tasks/executeAIFlow.js
  • apps/condo/domains/common/components/Comments/index.tsx
  • apps/condo/domains/common/constants/featureflags.js

Comment on lines +115 to +123
const onMessage = useCallback((message: StreamMessageType) => {
if (message.type === CHUNK_TYPES.FLOW_ITEM && message?.item) {
setSteamDataText(prev => prev += message.item)
} else if (message.type === CHUNK_TYPES.TASK_ERROR) {
setLoading(false)
setError(message.error)
} else if (message.type === CHUNK_TYPES.TASK_END) {
setLoading(false)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Streaming mode drops the hook's final result.

In the streaming branch, execute()/resume() resolve immediately with { data: null }, and on TASK_END you only clear loading. data is never populated, so existing callers that await the hook or read data.result lose the final payload as soon as this flag is enabled.

Also applies to: 268-272, 303-307

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` around lines 115 - 123, The
streaming branch never sets the hook's final data/result: update the onMessage
handler (function onMessage) so that when message.type === CHUNK_TYPES.TASK_END
you populate the hook's data with the accumulated stream text (the state updated
via setSteamDataText) before clearing loading; likewise ensure TASK_ERROR sets
data appropriately or clears it. Also adjust the execute()/resume() resolution
logic so they return the final { data: { result: <finalText> } } (not { data:
null }) when streaming completes. Apply the same fixes to the other
onMessage/finish handlers referenced around lines 268-272 and 303-307 so callers
awaiting data.result receive the final payload.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/condo/domains/ai/adapters/N8NAdapter.js (1)

32-41: ⚠️ Potential issue | 🟠 Major

Verify flowType is configured before making request.

If flowType is not present in the config, AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey evaluates to undefined, resulting in an Authorization: Bearer undefined header. Consider validating the flowType early.

Proposed fix
     async execute (predictUrl, context, flowType, onEvent) {
         if (!this.isConfigured) {
             throw new Error('N8NAdapter not configured!')
         }
+
+        const apiKey = AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey
+        if (!apiKey) {
+            throw new Error(`N8NAdapter not configured for flowType: ${flowType}`)
+        }

         const response = await fetch(
             predictUrl,
             {
                 headers: {
-                    Authorization: `Bearer ${AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey}`,
+                    Authorization: `Bearer ${apiKey}`,
                     'Content-Type': 'application/json',
                 },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/adapters/N8NAdapter.js` around lines 32 - 41, In
execute, verify that the requested flowType exists in AI_ADAPTERS_CONFIG.n8n and
has an apiKey before making the fetch; if missing, throw a clear error or return
early instead of allowing Authorization: Bearer undefined. Update the validation
in the async execute method (referencing AI_ADAPTERS_CONFIG, this.isConfigured,
and the predictUrl/fetch call) to check AI_ADAPTERS_CONFIG?.n8n?.[flowType] and
AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey and handle the missing configuration
by throwing an Error indicating the flowType is not configured.
♻️ Duplicate comments (4)
apps/condo/domains/ai/hooks/useAIFlow.tsx (3)

108-113: ⚠️ Potential issue | 🟠 Major

Guard against undefined userChannel before accessing topicPrefix.

When channels is empty or doesn't contain a 'user' channel, userChannel is undefined and accessing userChannel.topicPrefix throws a TypeError during render.

Proposed fix
     const topic = useMemo(() => {
         if (aiStreamingEnabled && currentTaskId && user?.id === userId) {
             const userChannel = channels.find((channel) => channel.name === 'user')
+            if (!userChannel?.topicPrefix) return undefined
             return `${userChannel.topicPrefix}.executionAIFlowTask.${currentTaskId}`
         }
     }, [aiStreamingEnabled, currentTaskId, user?.id, userId, channels])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` around lines 108 - 113, The
useMemo that computes topic can throw when userChannel is undefined; update the
topic computation in useAIFlow's useMemo to guard that userChannel exists before
accessing userChannel.topicPrefix (e.g., only return
`${userChannel.topicPrefix}....` if userChannel is truthy), otherwise return
undefined/null; ensure the dependency list (aiStreamingEnabled, currentTaskId,
user?.id, userId, channels) remains unchanged and reference the same local
symbols (topic, userChannel, channels, topicPrefix, aiStreamingEnabled,
currentTaskId, user?.id, userId).

279-279: ⚠️ Potential issue | 🟠 Major

Polling fallback leaves loading stuck when streaming is enabled but disconnected.

When aiStreamingEnabled is true but isConnected is false, the code falls back to startPollingForResult(). However, the finally condition !aiStreamingEnabled && !isConnected evaluates to false, so loading is never cleared after polling completes.

Proposed fix (also applies to line 314)
         } finally {
-            if (!aiStreamingEnabled && !isConnected) setLoading(false)
+            if (!(aiStreamingEnabled && isConnected)) setLoading(false)
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` at line 279, In useAIFlow, the
finally blocks that call setLoading(false) use a conjunctive condition
(!aiStreamingEnabled && !isConnected) which prevents clearing loading when
streaming is enabled but the socket is disconnected (the polling fallback path);
change the condition to a disjunction (!aiStreamingEnabled || !isConnected) or
simply call setLoading(false) unconditionally after startPollingForResult
completes so that loading is cleared for the polling fallback; update the
finally blocks around startPollingForResult (and the similar block at the other
occurrence) to reference aiStreamingEnabled, isConnected, and setLoading
accordingly.

115-124: ⚠️ Potential issue | 🟠 Major

Streaming mode never populates data with the final result.

When streaming is enabled, execute()/resume() return { data: null } immediately, and onMessage only accumulates text into streamDataText. On TASK_END, the hook clears loading but never sets data, so callers awaiting data.result receive nothing.

Consider populating data with the accumulated streamDataText when TASK_END is received, or document that callers must use streamDataText instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` around lines 115 - 124, The
streaming handler onMessage currently appends chunks to streamDataText via
setSteamDataText but never populates the hook's data when CHUNK_TYPES.TASK_END
arrives, so callers waiting on execute()/resume() get { data: null }; update the
onMessage TASK_END branch to set the hook's data (e.g., call setData({ result:
streamDataText || accumulated value })) using the accumulated streamDataText
(and clear/reset streamDataText as appropriate), ensuring CHUNK_TYPES.FLOW_ITEM
continues to append via setSteamDataText and CHUNK_TYPES.TASK_ERROR still sets
setError and setLoading(false).
apps/condo/domains/ai/adapters/N8NAdapter.js (1)

71-83: ⚠️ Potential issue | 🔴 Critical

Critical: Missing early return after parse failure causes TypeError.

When JSON.parse(line) throws, event remains undefined. The subsequent switch (event.type) then throws TypeError: Cannot read properties of undefined, masking the actual parse error. Also, typo: "proccess" → "process".

Proposed fix
             const _pushEvent = async (line) => {
                 let event
                 try {
                     if (!line) return
                     event = JSON.parse(line)
                     events.push(event)
                 } catch (err) {
                     await onEvent({
                         type: EVENT_TYPES.ERROR,
-                        error: 'Failed to proccess chunk',
+                        error: 'Failed to process chunk',
                     })
+                    return
                 }
                 switch (event.type) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/adapters/N8NAdapter.js` around lines 71 - 83, The
_pushEvent function currently continues after a JSON.parse failure causing
switch(event.type) to throw; modify the catch block in _pushEvent to call
onEvent with EVENT_TYPES.ERROR including the actual error message, correct the
typo "proccess"→"process", and immediately return (or otherwise skip the switch)
so execution does not reach switch(event.type) when event is undefined; ensure
you still push parsed events into events only on success.
🧹 Nitpick comments (3)
apps/condo/domains/ai/adapters/N8NAdapter.js (1)

136-138: Property name lastEventsType is misleading.

lastEventsType stores the entire event object, not just the type. Consider renaming to lastEvent or extracting just the type.

Proposed fix
                 _response: {
                     stream: true,
                     events,
                     totalevents: eventsLength,
-                    lastEventsType: events[eventsLength - 1],
+                    lastEvent: events[eventsLength - 1],
                 },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/adapters/N8NAdapter.js` around lines 136 - 138, The
property name lastEventsType is misleading because it holds the full event
object from the events array; update the code in the block that constructs the
returned object (references: lastEventsType, events, eventsLength) to either
rename the property to lastEvent to reflect it contains the entire event object
or keep lastEventsType but set its value to events[eventsLength - 1]?.type to
extract only the type; ensure any callers reading lastEventsType are updated to
the new name or continue to expect a string type accordingly.
apps/condo/domains/ai/hooks/useAIFlow.tsx (2)

92-92: Typo in state setter name: setSteamDataTextsetStreamDataText.

Minor naming inconsistency—"Steam" should be "Stream".

Proposed fix (update all occurrences)
-    const [streamDataText, setSteamDataText] = useState('')
+    const [streamDataText, setStreamDataText] = useState('')

Then update lines 117, 243, and 299 accordingly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` at line 92, Rename the typo'd
state setter setSteamDataText to setStreamDataText to match the state variable
streamDataText; locate the useState declaration in useAIFlow (const
[streamDataText, setSteamDataText] = useState('')) and rename the setter symbol
to setStreamDataText, then update every usage of setSteamDataText (e.g., in
handlers and effects that update streamDataText) to the corrected
setStreamDataText so references remain consistent.

117-117: Use prev + message.item instead of compound assignment.

The expression prev += message.item works but is confusing in a functional update callback since it looks like mutation. Prefer explicit concatenation.

Proposed fix
         if (message.type === CHUNK_TYPES.FLOW_ITEM && message?.item) {
-            setSteamDataText(prev => prev += message.item)
+            setSteamDataText(prev => prev + message.item)
         } else if (message.type === CHUNK_TYPES.TASK_ERROR) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` at line 117, The functional update
for setSteamDataText uses a compound assignment which suggests mutation; change
the updater to return an explicit concatenation (use prev + message.item) so the
callback is pure and clearer—update the call to setSteamDataText in
useAIFlow.tsx where setSteamDataText(prev => prev += message.item) is used to
setSteamDataText(prev => prev + message.item).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@apps/condo/domains/ai/adapters/N8NAdapter.js`:
- Around line 32-41: In execute, verify that the requested flowType exists in
AI_ADAPTERS_CONFIG.n8n and has an apiKey before making the fetch; if missing,
throw a clear error or return early instead of allowing Authorization: Bearer
undefined. Update the validation in the async execute method (referencing
AI_ADAPTERS_CONFIG, this.isConfigured, and the predictUrl/fetch call) to check
AI_ADAPTERS_CONFIG?.n8n?.[flowType] and
AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey and handle the missing configuration
by throwing an Error indicating the flowType is not configured.

---

Duplicate comments:
In `@apps/condo/domains/ai/adapters/N8NAdapter.js`:
- Around line 71-83: The _pushEvent function currently continues after a
JSON.parse failure causing switch(event.type) to throw; modify the catch block
in _pushEvent to call onEvent with EVENT_TYPES.ERROR including the actual error
message, correct the typo "proccess"→"process", and immediately return (or
otherwise skip the switch) so execution does not reach switch(event.type) when
event is undefined; ensure you still push parsed events into events only on
success.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx`:
- Around line 108-113: The useMemo that computes topic can throw when
userChannel is undefined; update the topic computation in useAIFlow's useMemo to
guard that userChannel exists before accessing userChannel.topicPrefix (e.g.,
only return `${userChannel.topicPrefix}....` if userChannel is truthy),
otherwise return undefined/null; ensure the dependency list (aiStreamingEnabled,
currentTaskId, user?.id, userId, channels) remains unchanged and reference the
same local symbols (topic, userChannel, channels, topicPrefix,
aiStreamingEnabled, currentTaskId, user?.id, userId).
- Line 279: In useAIFlow, the finally blocks that call setLoading(false) use a
conjunctive condition (!aiStreamingEnabled && !isConnected) which prevents
clearing loading when streaming is enabled but the socket is disconnected (the
polling fallback path); change the condition to a disjunction
(!aiStreamingEnabled || !isConnected) or simply call setLoading(false)
unconditionally after startPollingForResult completes so that loading is cleared
for the polling fallback; update the finally blocks around startPollingForResult
(and the similar block at the other occurrence) to reference aiStreamingEnabled,
isConnected, and setLoading accordingly.
- Around line 115-124: The streaming handler onMessage currently appends chunks
to streamDataText via setSteamDataText but never populates the hook's data when
CHUNK_TYPES.TASK_END arrives, so callers waiting on execute()/resume() get {
data: null }; update the onMessage TASK_END branch to set the hook's data (e.g.,
call setData({ result: streamDataText || accumulated value })) using the
accumulated streamDataText (and clear/reset streamDataText as appropriate),
ensuring CHUNK_TYPES.FLOW_ITEM continues to append via setSteamDataText and
CHUNK_TYPES.TASK_ERROR still sets setError and setLoading(false).

---

Nitpick comments:
In `@apps/condo/domains/ai/adapters/N8NAdapter.js`:
- Around line 136-138: The property name lastEventsType is misleading because it
holds the full event object from the events array; update the code in the block
that constructs the returned object (references: lastEventsType, events,
eventsLength) to either rename the property to lastEvent to reflect it contains
the entire event object or keep lastEventsType but set its value to
events[eventsLength - 1]?.type to extract only the type; ensure any callers
reading lastEventsType are updated to the new name or continue to expect a
string type accordingly.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx`:
- Line 92: Rename the typo'd state setter setSteamDataText to setStreamDataText
to match the state variable streamDataText; locate the useState declaration in
useAIFlow (const [streamDataText, setSteamDataText] = useState('')) and rename
the setter symbol to setStreamDataText, then update every usage of
setSteamDataText (e.g., in handlers and effects that update streamDataText) to
the corrected setStreamDataText so references remain consistent.
- Line 117: The functional update for setSteamDataText uses a compound
assignment which suggests mutation; change the updater to return an explicit
concatenation (use prev + message.item) so the callback is pure and
clearer—update the call to setSteamDataText in useAIFlow.tsx where
setSteamDataText(prev => prev += message.item) is used to setSteamDataText(prev
=> prev + message.item).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e159aff1-a3a8-4fd9-b7e3-440fbfaf6cd4

📥 Commits

Reviewing files that changed from the base of the PR and between 309b15a and 4ea5bfd.

📒 Files selected for processing (7)
  • apps/condo/domains/ai/adapters/AbstractAdapter.js
  • apps/condo/domains/ai/adapters/N8NAdapter.js
  • apps/condo/domains/ai/constants.js
  • apps/condo/domains/ai/hooks/useAIFlow.tsx
  • apps/condo/domains/ai/tasks/executeAIFlow.js
  • apps/condo/domains/common/components/Comments/index.tsx
  • apps/condo/domains/common/constants/featureflags.js
✅ Files skipped from review due to trivial changes (1)
  • apps/condo/domains/common/constants/featureflags.js
🚧 Files skipped from review as they are similar to previous changes (4)
  • apps/condo/domains/common/components/Comments/index.tsx
  • apps/condo/domains/ai/constants.js
  • apps/condo/domains/ai/adapters/AbstractAdapter.js
  • apps/condo/domains/ai/tasks/executeAIFlow.js

}
}

let buffer = ''
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe create an array and push tokens into array? this will create new string in memory for each token

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. so
  2. so it
  3. so it wou
  4. so it would go
  5. so it would go lik
  6. ... and so on

@sonarqubecloud
Copy link
Copy Markdown

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/condo/domains/ai/hooks/useAIFlow.tsx (1)

259-280: ⚠️ Potential issue | 🟠 Major

Don't skip cleanup until streaming has actually started.

The finally condition only checks aiStreamingEnabled && isConnected. If the mutation throws, or Line 264 returns without a task.id, this branch still preserves loading=true even though no stream will ever arrive, and the missing-taskId path also never calls setError.

Suggested fix
+        let waitForStream = false
         try {
             const data = {
                 dv: 1,
@@
             const createdTaskId = createResult.data?.task?.id
-            if (!createdTaskId) { return { data: null, error: new Error('Failed to create a task'), localizedErrorText: null } }
+            if (!createdTaskId) {
+                const err = new Error('Failed to create a task')
+                setError(err)
+                return { data: null, error: err, localizedErrorText: null }
+            }

             setCurrentTaskId(createdTaskId)

-            if (aiStreamingEnabled && isConnected) {
+            waitForStream = aiStreamingEnabled && isConnected
+            if (waitForStream) {
                 return { data: null, error: null, localizedErrorText: null }
             } else {
                 return await startPollingForResult(createdTaskId)
             }
@@
         } finally {
-            if (!(aiStreamingEnabled && isConnected)) setLoading(false)
+            if (!waitForStream) setLoading(false)
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` around lines 259 - 280, The
finally block can leave loading=true when no stream will start; after calling
createExecutionAIFlowMutation ensure you handle the missing task id and error
path by calling setError and setLoading(false) before returning, and only
preserve loading when streaming is actually started. Concretely: in the block
after createResult, if createdTaskId is falsy call setError(new Error('Failed to
create a task')), setCurrentTaskId(null), setLoading(false) and return the error
result; and when taking the aiStreamingEnabled && isConnected branch set a local
flag (e.g. streamingStarted) or setLoading(true) only after confirming the
stream is initiated so the finally() can clear loading unless streamingStarted
is true; update references to createExecutionAIFlowMutation, setCurrentTaskId,
startPollingForResult, setLoading, and setError accordingly.
♻️ Duplicate comments (2)
apps/condo/domains/ai/hooks/useAIFlow.tsx (2)

108-113: ⚠️ Potential issue | 🟠 Major

Guard the user channel lookup before reading topicPrefix.

After setCurrentTaskId, channels can still be empty for a render. In that window channels.find(...) returns undefined, so Line 111 throws before the subscription is established.

Suggested fix
     const topic = useMemo(() => {
         if (aiStreamingEnabled && currentTaskId && user?.id === userId) {
             const userChannel = channels.find((channel) => channel.name === 'user')
+            if (!userChannel?.topicPrefix) return undefined
             return `${userChannel.topicPrefix}.executionAIFlowTask.${currentTaskId}`
         }
     }, [aiStreamingEnabled, currentTaskId, user?.id, userId, channels])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` around lines 108 - 113, In
useAIFlow's useMemo that computes topic, guard the channels.find result before
reading topicPrefix: when aiStreamingEnabled && currentTaskId && user?.id ===
userId, check that const userChannel = channels.find(...) is not undefined (or
use optional chaining like userChannel?.topicPrefix) and only return
`${...}.executionAIFlowTask.${currentTaskId}` when userChannel exists; otherwise
return undefined so the memo won't throw during the render window after
setCurrentTaskId.

115-123: ⚠️ Potential issue | 🟠 Major

Streaming mode never truly finishes.

execute()/resume() return immediately with { data: null }, but on TASK_END/TASK_ERROR the hook only flips loading/error. data stays null, currentTaskId stays set, and timeout is no longer applied once startPollingForResult() is skipped. One lost terminal message leaves the hook in an incomplete state.

Also applies to: 268-272, 303-307

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/hooks/useAIFlow.tsx` around lines 115 - 123, The
streaming handler onMessage in useAIFlow currently only flips loading/error on
CHUNK_TYPES.TASK_END and TASK_ERROR, leaving data null, currentTaskId set, and
polling/timeout not cleared; update the onMessage handler (look for
CHUNK_TYPES.FLOW_ITEM, CHUNK_TYPES.TASK_ERROR, CHUNK_TYPES.TASK_END and the
state setters setStreamDataText, setLoading, setError) so that on TASK_END and
TASK_ERROR you also setData to the final combined value (or an empty result for
errors), clear currentTaskId (setCurrentTaskId(null)), stop any polling or
timeout (clearTimeout or call the internal stopPolling/startPollingForResult
cancel path), and ensure any streaming flags are reset so execute()/resume()
consumers see a terminal state; mirror this fix for the other occurrences around
lines 268-272 and 303-307 where the same chunk handling logic appears.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/condo/domains/ai/adapters/N8NAdapter.js`:
- Around line 23-25: The adapter currently sets this.#isConfigured using
flowTypes.every(...), which makes the adapter unconfigured if any single flow
lacks an apiKey and thereby blocks flows that do have keys; change the logic to
mark the adapter configured if any flow has an apiKey (use some(...) or
equivalent) so configured flows remain usable, keep the per-flow checks that
reference AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey intact, and only log the
"N8NAdapter not configured for any flow type!" warning when no flows have an
apiKey.
- Around line 139-143: The _response currently sets lastEventsType to the entire
last event object (events[eventsLength - 1]) instead of its type string; change
the assignment for lastEventsType in the _response object to use the last
event's type property (e.g., events[eventsLength - 1].type or similar) and guard
against empty arrays (fallback to null or empty string) so callers receive the
actual type string; update the code around the _response construction that
references events, eventsLength, and lastEventsType.

---

Outside diff comments:
In `@apps/condo/domains/ai/hooks/useAIFlow.tsx`:
- Around line 259-280: The finally block can leave loading=true when no stream
will start; after calling createExecutionAIFlowMutation ensure you handle the
missing task id and error path by calling setError and setLoading(false) before
returning, and only preserve loading when streaming is actually started.
Concretely: in the block after createResult, if createdTaskId is falsy call
setError(new Error('Failed to create a task')), setCurrentTaskId(null),
setLoading(false) and return the error result; and when taking the
aiStreamingEnabled && isConnected branch set a local flag (e.g.
streamingStarted) or setLoading(true) only after confirming the stream is
initiated so the finally() can clear loading unless streamingStarted is true;
update references to createExecutionAIFlowMutation, setCurrentTaskId,
startPollingForResult, setLoading, and setError accordingly.

---

Duplicate comments:
In `@apps/condo/domains/ai/hooks/useAIFlow.tsx`:
- Around line 108-113: In useAIFlow's useMemo that computes topic, guard the
channels.find result before reading topicPrefix: when aiStreamingEnabled &&
currentTaskId && user?.id === userId, check that const userChannel =
channels.find(...) is not undefined (or use optional chaining like
userChannel?.topicPrefix) and only return
`${...}.executionAIFlowTask.${currentTaskId}` when userChannel exists; otherwise
return undefined so the memo won't throw during the render window after
setCurrentTaskId.
- Around line 115-123: The streaming handler onMessage in useAIFlow currently
only flips loading/error on CHUNK_TYPES.TASK_END and TASK_ERROR, leaving data
null, currentTaskId set, and polling/timeout not cleared; update the onMessage
handler (look for CHUNK_TYPES.FLOW_ITEM, CHUNK_TYPES.TASK_ERROR,
CHUNK_TYPES.TASK_END and the state setters setStreamDataText, setLoading,
setError) so that on TASK_END and TASK_ERROR you also setData to the final
combined value (or an empty result for errors), clear currentTaskId
(setCurrentTaskId(null)), stop any polling or timeout (clearTimeout or call the
internal stopPolling/startPollingForResult cancel path), and ensure any
streaming flags are reset so execute()/resume() consumers see a terminal state;
mirror this fix for the other occurrences around lines 268-272 and 303-307 where
the same chunk handling logic appears.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ea8fdc09-789f-4854-a5e2-a03db78b70b9

📥 Commits

Reviewing files that changed from the base of the PR and between 4ea5bfd and e70f8d3.

📒 Files selected for processing (3)
  • apps/condo/domains/ai/adapters/N8NAdapter.js
  • apps/condo/domains/ai/hooks/useAIFlow.tsx
  • apps/condo/domains/common/constants/featureflags.js
🚧 Files skipped from review as they are similar to previous changes (1)
  • apps/condo/domains/common/constants/featureflags.js

Comment on lines +23 to +25
const flowTypes = Object.keys(AI_ADAPTERS_CONFIG?.n8n || {})
this.#isConfigured = flowTypes.length > 0 ? flowTypes.every(flowType => !!AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey) : false
if (!this.#isConfigured) console.warn('N8NAdapter not configured for any flow type!')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't gate the whole adapter on every flow having a key.

every(...) makes isConfigured false as soon as one flow entry is missing apiKey, so Line 33 blocks even the flows that are configured. That defeats the per-flow check at Lines 37-40.

Suggested fix
-        const flowTypes = Object.keys(AI_ADAPTERS_CONFIG?.n8n || {})
-        this.#isConfigured = flowTypes.length > 0 ? flowTypes.every(flowType => !!AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey) : false
+        const flowConfigs = Object.values(AI_ADAPTERS_CONFIG?.n8n || {})
+        this.#isConfigured = flowConfigs.some(flowConfig => !!flowConfig?.apiKey)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/adapters/N8NAdapter.js` around lines 23 - 25, The
adapter currently sets this.#isConfigured using flowTypes.every(...), which
makes the adapter unconfigured if any single flow lacks an apiKey and thereby
blocks flows that do have keys; change the logic to mark the adapter configured
if any flow has an apiKey (use some(...) or equivalent) so configured flows
remain usable, keep the per-flow checks that reference
AI_ADAPTERS_CONFIG?.n8n?.[flowType]?.apiKey intact, and only log the "N8NAdapter
not configured for any flow type!" warning when no flows have an apiKey.

Comment on lines +139 to +143
_response: {
stream: true,
events,
totalevents: eventsLength,
lastEventsType: events[eventsLength - 1],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Return the last event type, not the whole event.

lastEventsType currently gets events[eventsLength - 1], so callers receive an object instead of the type string the field name implies.

Suggested fix
-                    lastEventsType: events[eventsLength - 1],
+                    lastEventsType: events[eventsLength - 1]?.type,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/condo/domains/ai/adapters/N8NAdapter.js` around lines 139 - 143, The
_response currently sets lastEventsType to the entire last event object
(events[eventsLength - 1]) instead of its type string; change the assignment for
lastEventsType in the _response object to use the last event's type property
(e.g., events[eventsLength - 1].type or similar) and guard against empty arrays
(fallback to null or empty string) so callers receive the actual type string;
update the code around the _response construction that references events,
eventsLength, and lastEventsType.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants