From dde47be16b2431e040e2136305b8040a49bd1a7c Mon Sep 17 00:00:00 2001 From: Manan Bhatt Date: Wed, 18 Mar 2026 20:48:27 +0530 Subject: [PATCH 1/2] Fix backward compat: fall back to v1 update endpoint on Orkes Conductor < v5 The /api/tasks/update-v2 endpoint introduced in SDK 1.3.5 only exists on Orkes Conductor v5+. Customers on older instances (e.g. v4.x) receive a 404, causing all task updates to fail. Changes: - On HTTP 404 from update_task_v2, automatically fall back to the v1 update_task endpoint and set _use_update_v2=False for the runner lifetime - Non-404 API errors continue to retry with v2 (no change in behaviour) - Tight execute loop preserved for v1: after each v1 update, immediately poll for the next task rather than waiting for the polling interval - Same fix applied to both TaskRunner (sync) and AsyncTaskRunner - 7 new unit tests covering fallback, flag behaviour, and tight loop Fixes: task updates failing with HTTP 404 when SDK 1.3.5+ is used against Orkes Conductor instances older than major version 5. Co-Authored-By: Claude Sonnet 4.6 --- .../client/automator/async_task_runner.py | 58 ++++++- src/conductor/client/automator/task_runner.py | 58 ++++++- .../automator/test_task_runner_coverage.py | 146 +++++++++++++++++- 3 files changed, 249 insertions(+), 13 deletions(-) diff --git a/src/conductor/client/automator/async_task_runner.py b/src/conductor/client/automator/async_task_runner.py index 97c2f1ac..ab54128d 100644 --- a/src/conductor/client/automator/async_task_runner.py +++ b/src/conductor/client/automator/async_task_runner.py @@ -24,7 +24,7 @@ from conductor.client.http.models.task_result import TaskResult from conductor.client.http.models.task_result_status import TaskResultStatus from conductor.client.http.models.schema_def import SchemaDef, SchemaType -from conductor.client.http.rest import AuthorizationException +from conductor.client.http.rest import AuthorizationException, ApiException from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient from conductor.client.telemetry.metrics_collector import MetricsCollector @@ -111,6 +111,7 @@ def __init__( # Semaphore will be created in run() within the event loop self._semaphore = None self._shutdown = False # Flag to indicate graceful shutdown + self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint async def run(self) -> None: """Main async loop - runs continuously in single event loop.""" @@ -583,6 +584,11 @@ async def __async_execute_and_update_task(self, task: Task) -> None: return # Update task and get next task from v2 response task = await self.__async_update_task(task_result) + # v2 returns the next task; if v1 was used (returns None), immediately + # poll for the next task to preserve tight-loop behaviour on older servers + if task is None and not self._use_update_v2 and not self._shutdown: + tasks = await self.__async_batch_poll(1) + task = tasks[0] if tasks else None except Exception as e: logger.error( "Error executing/updating task %s: %s", @@ -815,15 +821,55 @@ async def __async_update_task(self, task_result: TaskResult): # Exponential backoff: [10s, 20s, 30s] before retry await asyncio.sleep(attempt * 10) try: - next_task = await self.async_task_client.update_task_v2(body=task_result) - logger.debug( - "Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s", + if self._use_update_v2: + next_task = await self.async_task_client.update_task_v2(body=task_result) + logger.debug( + "Updated async task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s", + task_result.task_id, + task_result.workflow_instance_id, + task_definition_name, + next_task.task_id if next_task else None + ) + return next_task + else: + await self.async_task_client.update_task(body=task_result) + logger.debug( + "Updated async task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s", + task_result.task_id, + task_result.workflow_instance_id, + task_definition_name, + ) + return None + except ApiException as e: + if e.status == 404 and self._use_update_v2: + logger.warning( + "Server does not support update-task-v2 endpoint (HTTP 404). " + "Falling back to v1 update endpoint. " + "Upgrade your Orkes Conductor instance to v5+ to enable the v2 endpoint." + ) + self._use_update_v2 = False + # Retry immediately with v1 + try: + await self.async_task_client.update_task(body=task_result) + return None + except Exception as fallback_e: + last_exception = fallback_e + continue + last_exception = e + if self.metrics_collector is not None: + self.metrics_collector.increment_task_update_error( + task_definition_name, type(e) + ) + logger.error( + "Failed to update async task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s", + attempt + 1, + retry_count, task_result.task_id, task_result.workflow_instance_id, task_definition_name, - next_task.task_id if next_task else None + traceback.format_exc() ) - return next_task + continue except Exception as e: last_exception = e if self.metrics_collector is not None: diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 16c8f432..553c9dca 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -25,7 +25,7 @@ from conductor.client.http.models.task_result import TaskResult from conductor.client.http.models.task_result_status import TaskResultStatus from conductor.client.http.models.schema_def import SchemaDef, SchemaType -from conductor.client.http.rest import AuthorizationException +from conductor.client.http.rest import AuthorizationException, ApiException from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient from conductor.client.orkes.orkes_schema_client import OrkesSchemaClient from conductor.client.telemetry.metrics_collector import MetricsCollector @@ -92,6 +92,7 @@ def __init__( self._last_poll_time = 0 # Track last poll to avoid excessive polling when queue is empty self._consecutive_empty_polls = 0 # Track empty polls to implement backoff self._shutdown = False # Flag to indicate graceful shutdown + self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint def run(self) -> None: if self.configuration is not None: @@ -523,6 +524,11 @@ def __execute_and_update_task(self, task: Task) -> None: return # Update task and get next task from v2 response task = self.__update_task(task_result) + # v2 returns the next task; if v1 was used (returns None), immediately + # poll for the next task to preserve tight-loop behaviour on older servers + if task is None and not self._use_update_v2 and not self._shutdown: + tasks = self.__batch_poll_tasks(1) + task = tasks[0] if tasks else None except Exception as e: logger.error( "Error executing/updating task %s: %s", @@ -845,15 +851,55 @@ def __update_task(self, task_result: TaskResult): # Exponential backoff: [10s, 20s, 30s] before retry time.sleep(attempt * 10) try: - next_task = self.task_client.update_task_v2(body=task_result) - logger.debug( - "Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s", + if self._use_update_v2: + next_task = self.task_client.update_task_v2(body=task_result) + logger.debug( + "Updated task (v2), id: %s, workflow_instance_id: %s, task_definition_name: %s, next_task: %s", + task_result.task_id, + task_result.workflow_instance_id, + task_definition_name, + next_task.task_id if next_task else None + ) + return next_task + else: + self.task_client.update_task(body=task_result) + logger.debug( + "Updated task (v1), id: %s, workflow_instance_id: %s, task_definition_name: %s", + task_result.task_id, + task_result.workflow_instance_id, + task_definition_name, + ) + return None + except ApiException as e: + if e.status == 404 and self._use_update_v2: + logger.warning( + "Server does not support update-task-v2 endpoint (HTTP 404). " + "Falling back to v1 update endpoint. " + "Upgrade your Orkes Conductor instance to v5+ to enable the v2 endpoint." + ) + self._use_update_v2 = False + # Retry immediately with v1 + try: + self.task_client.update_task(body=task_result) + return None + except Exception as fallback_e: + last_exception = fallback_e + continue + last_exception = e + if self.metrics_collector is not None: + self.metrics_collector.increment_task_update_error( + task_definition_name, type(e) + ) + logger.error( + "Failed to update task (attempt %d/%d), id: %s, workflow_instance_id: %s, task_definition_name: %s, reason: %s", + attempt + 1, + retry_count, task_result.task_id, task_result.workflow_instance_id, task_definition_name, - next_task.task_id if next_task else None + traceback.format_exc() ) - return next_task + continue except Exception as e: last_exception = e if self.metrics_collector is not None: diff --git a/tests/unit/automator/test_task_runner_coverage.py b/tests/unit/automator/test_task_runner_coverage.py index 1686d12f..d7b0ba91 100644 --- a/tests/unit/automator/test_task_runner_coverage.py +++ b/tests/unit/automator/test_task_runner_coverage.py @@ -23,7 +23,7 @@ from conductor.client.http.models.task import Task from conductor.client.http.models.task_result import TaskResult from conductor.client.http.models.task_result_status import TaskResultStatus -from conductor.client.http.rest import AuthorizationException +from conductor.client.http.rest import AuthorizationException, ApiException from conductor.client.worker.worker_interface import WorkerInterface @@ -803,6 +803,150 @@ def test_update_task_with_metrics_on_error(self): 4 ) + # ======================================== + # v1 Fallback Tests (backward compat with Orkes Conductor < v5) + # ======================================== + + @patch('time.sleep', Mock(return_value=None)) + def test_update_task_v2_404_falls_back_to_v1(self): + """When server returns 404 for v2 endpoint, should fall back to v1 and return None.""" + worker = MockWorker('test_task') + task_runner = TaskRunner(worker=worker) + + task_result = TaskResult( + task_id='test_id', + workflow_instance_id='wf_id', + worker_id=worker.get_identity(), + status=TaskResultStatus.COMPLETED + ) + + with patch.object(TaskResourceApi, 'update_task_v2', + side_effect=ApiException(status=404)) as mock_v2, \ + patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1: + result = task_runner._TaskRunner__update_task(task_result) + + mock_v2.assert_called_once() + mock_v1.assert_called_once() + self.assertIsNone(result) + + @patch('time.sleep', Mock(return_value=None)) + def test_update_task_v2_404_sets_v1_flag(self): + """After a 404 on v2, _use_update_v2 flag must be False.""" + worker = MockWorker('test_task') + task_runner = TaskRunner(worker=worker) + self.assertTrue(task_runner._use_update_v2) + + task_result = TaskResult( + task_id='test_id', + workflow_instance_id='wf_id', + worker_id=worker.get_identity(), + status=TaskResultStatus.COMPLETED + ) + + with patch.object(TaskResourceApi, 'update_task_v2', + side_effect=ApiException(status=404)), \ + patch.object(TaskResourceApi, 'update_task', return_value='ok'): + task_runner._TaskRunner__update_task(task_result) + + self.assertFalse(task_runner._use_update_v2) + + @patch('time.sleep', Mock(return_value=None)) + def test_update_task_uses_v1_only_after_flag_set(self): + """Once _use_update_v2 is False, v2 is never called again.""" + worker = MockWorker('test_task') + task_runner = TaskRunner(worker=worker) + task_runner._use_update_v2 = False # pre-set as if fallback already happened + + task_result = TaskResult( + task_id='test_id', + workflow_instance_id='wf_id', + worker_id=worker.get_identity(), + status=TaskResultStatus.COMPLETED + ) + + with patch.object(TaskResourceApi, 'update_task_v2') as mock_v2, \ + patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1: + result = task_runner._TaskRunner__update_task(task_result) + + mock_v2.assert_not_called() + mock_v1.assert_called_once() + self.assertIsNone(result) + + @patch('time.sleep', Mock(return_value=None)) + def test_update_task_non_404_api_exception_does_not_fallback(self): + """A non-404 ApiException (e.g. 500) should not trigger v1 fallback.""" + worker = MockWorker('test_task') + task_runner = TaskRunner(worker=worker) + + task_result = TaskResult( + task_id='test_id', + workflow_instance_id='wf_id', + worker_id=worker.get_identity(), + status=TaskResultStatus.COMPLETED + ) + + with patch.object(TaskResourceApi, 'update_task_v2', + side_effect=ApiException(status=500)) as mock_v2, \ + patch.object(TaskResourceApi, 'update_task') as mock_v1: + result = task_runner._TaskRunner__update_task(task_result) + + # v2 called 4 times (all retries), v1 never called, flag unchanged + self.assertEqual(mock_v2.call_count, 4) + mock_v1.assert_not_called() + self.assertTrue(task_runner._use_update_v2) + self.assertIsNone(result) + + @patch('time.sleep', Mock(return_value=None)) + def test_execute_and_update_task_tight_loop_with_v1_polls_for_next(self): + """When v1 is used, the tight loop should poll immediately for the next task.""" + worker = MockWorker('test_task') + task_runner = TaskRunner(worker=worker) + task_runner._use_update_v2 = False # simulate post-fallback state + + first_task = Task(task_id='task_1', workflow_instance_id='wf_1') + second_task = Task(task_id='task_2', workflow_instance_id='wf_1') + + # Execute returns a result, update v1 returns None, poll returns second task then empty + with patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1, \ + patch.object(TaskResourceApi, 'batch_poll', + side_effect=[[second_task], []]) as mock_poll: + task_runner._TaskRunner__execute_and_update_task(first_task) + + # update_task called twice (once per task), poll called twice (second_task then empty) + self.assertEqual(mock_v1.call_count, 2) + self.assertEqual(mock_poll.call_count, 2) + + @patch('time.sleep', Mock(return_value=None)) + def test_execute_and_update_task_tight_loop_stops_when_queue_empty_on_v1(self): + """With v1, if poll returns nothing the tight loop exits cleanly.""" + worker = MockWorker('test_task') + task_runner = TaskRunner(worker=worker) + task_runner._use_update_v2 = False + + task = Task(task_id='task_1', workflow_instance_id='wf_1') + + with patch.object(TaskResourceApi, 'update_task', return_value='ok') as mock_v1, \ + patch.object(TaskResourceApi, 'batch_poll', return_value=[]) as mock_poll: + task_runner._TaskRunner__execute_and_update_task(task) + + mock_v1.assert_called_once() + mock_poll.assert_called_once() + + @patch('time.sleep', Mock(return_value=None)) + def test_execute_and_update_task_tight_loop_not_pollled_when_v2(self): + """With v2, poll is NOT called inside the tight loop (v2 returns next task directly).""" + worker = MockWorker('test_task') + task_runner = TaskRunner(worker=worker) + + first_task = Task(task_id='task_1', workflow_instance_id='wf_1') + + with patch.object(TaskResourceApi, 'update_task_v2', return_value=None) as mock_v2, \ + patch.object(TaskResourceApi, 'batch_poll') as mock_poll: + task_runner._TaskRunner__execute_and_update_task(first_task) + + mock_v2.assert_called_once() + mock_poll.assert_not_called() + # ======================================== # Property and Environment Tests # ======================================== From 02ae2da5cc50009a6a1c81738f63c328fca1900b Mon Sep 17 00:00:00 2001 From: Manan Bhatt Date: Wed, 18 Mar 2026 22:21:01 +0530 Subject: [PATCH 2/2] remove --- src/conductor/client/automator/async_task_runner.py | 2 +- src/conductor/client/automator/task_runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/conductor/client/automator/async_task_runner.py b/src/conductor/client/automator/async_task_runner.py index ab54128d..41e7ab56 100644 --- a/src/conductor/client/automator/async_task_runner.py +++ b/src/conductor/client/automator/async_task_runner.py @@ -845,7 +845,7 @@ async def __async_update_task(self, task_result: TaskResult): logger.warning( "Server does not support update-task-v2 endpoint (HTTP 404). " "Falling back to v1 update endpoint. " - "Upgrade your Orkes Conductor instance to v5+ to enable the v2 endpoint." + "Upgrade your Conductor instance to v5+ to enable the v2 endpoint." ) self._use_update_v2 = False # Retry immediately with v1 diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 553c9dca..7168edf5 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -875,7 +875,7 @@ def __update_task(self, task_result: TaskResult): logger.warning( "Server does not support update-task-v2 endpoint (HTTP 404). " "Falling back to v1 update endpoint. " - "Upgrade your Orkes Conductor instance to v5+ to enable the v2 endpoint." + "Upgrade your Orkes instance to v5+ to enable the v2 endpoint." ) self._use_update_v2 = False # Retry immediately with v1