Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion inc/Engine/Actions/Engine.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ function datamachine_get_file_context( int|string|null $flow_id ): array {
return \DataMachine\Api\FlowFiles::get_file_context( $flow_id );
}

/**
* Check if a flow exists by ID.
*
* Lightweight existence check to avoid loading full flow data.
*
* @param int $flow_id Flow ID to check.
* @return bool True if flow exists, false otherwise.
*/
function datamachine_flow_exists( int $flow_id ): bool {
global $wpdb;
$table_name = $wpdb->prefix . 'datamachine_flows';
// phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.PreparedSQL.NotPrepared
$exists = $wpdb->get_var( $wpdb->prepare( 'SELECT 1 FROM %i WHERE flow_id = %d LIMIT 1', $table_name, $flow_id ) );
return null !== $exists;
}

/**
* Register execution engine action hooks as thin bridges to abilities.
*
Expand All @@ -50,15 +66,37 @@ function datamachine_register_execution_engine() {

/**
* Bridge: datamachine_run_flow_now → datamachine/run-flow ability.
*
* Includes defensive check for orphaned scheduled actions. If the flow
* no longer exists (e.g., was deleted without cleanup), cancels all
* scheduled actions for that flow to prevent recurring errors.
*/
add_action(
'datamachine_run_flow_now',
function ( $flow_id, $job_id = null ) {
$flow_id = (int) $flow_id;

// Defensive: Check if flow exists before executing.
// If flow was deleted without cleaning up scheduled actions,
// cancel the orphaned actions to prevent recurring errors.
if ( ! datamachine_flow_exists( $flow_id ) ) {
if ( function_exists( 'as_unschedule_all_actions' ) ) {
as_unschedule_all_actions( 'datamachine_run_flow_now', array( $flow_id ), 'data-machine' );
}
do_action(
'datamachine_log',
'warning',
'Orphaned scheduled action cleaned up for deleted flow',
array( 'flow_id' => $flow_id )
);
return;
}

$ability = wp_get_ability( 'datamachine/run-flow' );
if ( $ability ) {
$ability->execute(
array(
'flow_id' => (int) $flow_id,
'flow_id' => $flow_id,
'job_id' => $job_id ? (int) $job_id : null,
)
);
Expand Down
Loading