diff --git a/inc/Abilities/Engine/RunFlowAbility.php b/inc/Abilities/Engine/RunFlowAbility.php index 75a83badc..fab3d29d8 100644 --- a/inc/Abilities/Engine/RunFlowAbility.php +++ b/inc/Abilities/Engine/RunFlowAbility.php @@ -14,6 +14,7 @@ namespace DataMachine\Abilities\Engine; +use DataMachine\Abilities\Flow\QueueAbility; use DataMachine\Core\Agents\AgentIdentityResolver; use DataMachine\Engine\ExecutionPlan; @@ -67,8 +68,10 @@ private function registerAbility(): void { 'properties' => array( 'success' => array( 'type' => 'boolean' ), 'flow_id' => array( 'type' => 'integer' ), - 'job_id' => array( 'type' => 'integer' ), + 'job_id' => array( 'type' => array( 'integer', 'null' ) ), 'first_step' => array( 'type' => 'string' ), + 'skipped' => array( 'type' => 'boolean' ), + 'reason' => array( 'type' => 'string' ), 'error' => array( 'type' => 'string' ), ), ), @@ -131,6 +134,30 @@ public function execute( array $input ): array { } $pipeline_id = (int) $flow['pipeline_id']; + $flow_config = datamachine_normalize_engine_config( $flow['flow_config'] ?? array() ); + + $empty_drain_skip = $this->getEmptyDrainQueueSkip( $flow_config ); + if ( null !== $empty_drain_skip ) { + do_action( + 'datamachine_log', + 'info', + 'Flow execution skipped - drain queue is empty', + array( + 'flow_id' => $flow_id, + 'pipeline_id' => $pipeline_id, + 'flow_step_id' => $empty_drain_skip['flow_step_id'], + ) + ); + + return array( + 'success' => true, + 'flow_id' => $flow_id, + 'job_id' => null, + 'first_step' => $empty_drain_skip['flow_step_id'], + 'skipped' => true, + 'reason' => 'empty_drain_queue', + ); + } $agent_identity = null; if ( ! empty( $flow['agent_id'] ) ) { @@ -187,7 +214,6 @@ public function execute( array $input ): array { // Transition job from pending to processing. $this->db_jobs->start_job( $job_id ); - $flow_config = $flow['flow_config'] ?? array(); $scheduling_config = $flow['scheduling_config'] ?? array(); $run_artifact_policy = \DataMachine\Engine\Bundle\BundleSchema::normalize_run_artifact_egress_policy( $scheduling_config['run_artifacts'] ?? array() ); @@ -195,7 +221,6 @@ public function execute( array $input ): array { $pipeline = $this->db_pipelines->get_pipeline( $pipeline_id ); $pipeline_config = $pipeline['pipeline_config'] ?? array(); - $flow_config = datamachine_normalize_engine_config( $flow_config ); $pipeline_config = datamachine_normalize_engine_config( $pipeline_config ); $job_snapshot = array( @@ -327,4 +352,38 @@ public function execute( array $input ): array { 'first_step' => $first_flow_step_id, ); } + + /** + * Return first-step drain queue details when a scheduled flow has no work. + * + * @param array $flow_config Normalized flow config. + * @return array{flow_step_id:string}|null + */ + private function getEmptyDrainQueueSkip( array $flow_config ): ?array { + try { + $first_flow_step_id = ExecutionPlan::from_flow_config( $flow_config )->first_step_id(); + } catch ( \InvalidArgumentException $e ) { + return null; + } + + if ( ! $first_flow_step_id || ! isset( $flow_config[ $first_flow_step_id ] ) || ! is_array( $flow_config[ $first_flow_step_id ] ) ) { + return null; + } + + $first_step = $flow_config[ $first_flow_step_id ]; + if ( 'fetch' !== (string) ( $first_step['step_type'] ?? '' ) ) { + return null; + } + + if ( 'drain' !== (string) ( $first_step['queue_mode'] ?? 'static' ) ) { + return null; + } + + $queue = $first_step[ QueueAbility::SLOT_CONFIG_PATCH_QUEUE ] ?? array(); + if ( is_array( $queue ) && ! empty( $queue ) ) { + return null; + } + + return array( 'flow_step_id' => (string) $first_flow_step_id ); + } } diff --git a/tests/run-flow-empty-drain-queue-smoke.php b/tests/run-flow-empty-drain-queue-smoke.php new file mode 100644 index 000000000..119c9ddd3 --- /dev/null +++ b/tests/run-flow-empty-drain-queue-smoke.php @@ -0,0 +1,39 @@ +getEmptyDrainQueueSkip( $flow_config );', $run_flow_src, 'run-flow checks empty drain queues' ); +assert_run_flow_empty_drain_contains( "'reason' => 'empty_drain_queue'", $run_flow_src, 'empty drain skip returns explicit reason' ); +assert_run_flow_empty_drain_contains( '\'drain\' !== (string) ( $first_step[\'queue_mode\'] ?? \'static\' )', $run_flow_src, 'skip is scoped to drain mode' ); +assert_run_flow_empty_drain_contains( 'QueueAbility::SLOT_CONFIG_PATCH_QUEUE', $run_flow_src, 'skip checks the config patch queue slot' ); + +$skip_pos = strpos( $run_flow_src, '$empty_drain_skip = $this->getEmptyDrainQueueSkip( $flow_config );' ); +$create_job_pos = strpos( $run_flow_src, '$job_id = $this->db_jobs->create_job( $job_data );' ); +assert_run_flow_empty_drain_true( false !== $skip_pos && false !== $create_job_pos && $skip_pos < $create_job_pos, 'empty drain queue is skipped before job creation' ); + +echo "OK ({$assertions} assertions)\n";