Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions inc/Abilities/Engine/RunFlowAbility.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace DataMachine\Abilities\Engine;

use DataMachine\Abilities\Flow\QueueAbility;
use DataMachine\Core\Agents\AgentIdentityResolver;
use DataMachine\Engine\ExecutionPlan;

Expand Down Expand Up @@ -67,8 +68,10 @@ private function registerAbility(): void {
'properties' => array(
'success' => array( 'type' => 'boolean' ),
'flow_id' => array( 'type' => 'integer' ),
'job_id' => array( 'type' => 'integer' ),
'job_id' => array( 'type' => array( 'integer', 'null' ) ),
'first_step' => array( 'type' => 'string' ),
'skipped' => array( 'type' => 'boolean' ),
'reason' => array( 'type' => 'string' ),
'error' => array( 'type' => 'string' ),
),
),
Expand Down Expand Up @@ -131,6 +134,30 @@ public function execute( array $input ): array {
}

$pipeline_id = (int) $flow['pipeline_id'];
$flow_config = datamachine_normalize_engine_config( $flow['flow_config'] ?? array() );

$empty_drain_skip = $this->getEmptyDrainQueueSkip( $flow_config );
if ( null !== $empty_drain_skip ) {
do_action(
'datamachine_log',
'info',
'Flow execution skipped - drain queue is empty',
array(
'flow_id' => $flow_id,
'pipeline_id' => $pipeline_id,
'flow_step_id' => $empty_drain_skip['flow_step_id'],
)
);

return array(
'success' => true,
'flow_id' => $flow_id,
'job_id' => null,
'first_step' => $empty_drain_skip['flow_step_id'],
'skipped' => true,
'reason' => 'empty_drain_queue',
);
}

$agent_identity = null;
if ( ! empty( $flow['agent_id'] ) ) {
Expand Down Expand Up @@ -187,15 +214,13 @@ public function execute( array $input ): array {
// Transition job from pending to processing.
$this->db_jobs->start_job( $job_id );

$flow_config = $flow['flow_config'] ?? array();
$scheduling_config = $flow['scheduling_config'] ?? array();
$run_artifact_policy = \DataMachine\Engine\Bundle\BundleSchema::normalize_run_artifact_egress_policy( $scheduling_config['run_artifacts'] ?? array() );

// Load pipeline config.
$pipeline = $this->db_pipelines->get_pipeline( $pipeline_id );
$pipeline_config = $pipeline['pipeline_config'] ?? array();

$flow_config = datamachine_normalize_engine_config( $flow_config );
$pipeline_config = datamachine_normalize_engine_config( $pipeline_config );

$job_snapshot = array(
Expand Down Expand Up @@ -327,4 +352,38 @@ public function execute( array $input ): array {
'first_step' => $first_flow_step_id,
);
}

/**
* Return first-step drain queue details when a scheduled flow has no work.
*
* @param array $flow_config Normalized flow config.
* @return array{flow_step_id:string}|null
*/
private function getEmptyDrainQueueSkip( array $flow_config ): ?array {
try {
$first_flow_step_id = ExecutionPlan::from_flow_config( $flow_config )->first_step_id();
} catch ( \InvalidArgumentException $e ) {
return null;
}

if ( ! $first_flow_step_id || ! isset( $flow_config[ $first_flow_step_id ] ) || ! is_array( $flow_config[ $first_flow_step_id ] ) ) {
return null;
}

$first_step = $flow_config[ $first_flow_step_id ];
if ( 'fetch' !== (string) ( $first_step['step_type'] ?? '' ) ) {
return null;
}

if ( 'drain' !== (string) ( $first_step['queue_mode'] ?? 'static' ) ) {
return null;
}

$queue = $first_step[ QueueAbility::SLOT_CONFIG_PATCH_QUEUE ] ?? array();
if ( is_array( $queue ) && ! empty( $queue ) ) {
return null;
}

return array( 'flow_step_id' => (string) $first_flow_step_id );
}
}
39 changes: 39 additions & 0 deletions tests/run-flow-empty-drain-queue-smoke.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php
/**
* Pure-PHP smoke test for empty drain queue run-flow short-circuiting.
*
* Run with: php tests/run-flow-empty-drain-queue-smoke.php
*
* @package DataMachine\Tests
*/

$run_flow_file = __DIR__ . '/../inc/Abilities/Engine/RunFlowAbility.php';
$run_flow_src = file_get_contents( $run_flow_file ) ?: '';

$assertions = 0;

function assert_run_flow_empty_drain_true( bool $condition, string $message ): void {
global $assertions;
++$assertions;

if ( ! $condition ) {
fwrite( STDERR, "FAIL: {$message}\n" );
exit( 1 );
}
}

function assert_run_flow_empty_drain_contains( string $needle, string $haystack, string $message ): void {
assert_run_flow_empty_drain_true( false !== strpos( $haystack, $needle ), $message );
}

assert_run_flow_empty_drain_contains( 'use DataMachine\\Abilities\\Flow\\QueueAbility;', $run_flow_src, 'run-flow ability imports queue slot constants' );
assert_run_flow_empty_drain_contains( '$empty_drain_skip = $this->getEmptyDrainQueueSkip( $flow_config );', $run_flow_src, 'run-flow checks empty drain queues' );
assert_run_flow_empty_drain_contains( "'reason' => 'empty_drain_queue'", $run_flow_src, 'empty drain skip returns explicit reason' );
assert_run_flow_empty_drain_contains( '\'drain\' !== (string) ( $first_step[\'queue_mode\'] ?? \'static\' )', $run_flow_src, 'skip is scoped to drain mode' );
assert_run_flow_empty_drain_contains( 'QueueAbility::SLOT_CONFIG_PATCH_QUEUE', $run_flow_src, 'skip checks the config patch queue slot' );

$skip_pos = strpos( $run_flow_src, '$empty_drain_skip = $this->getEmptyDrainQueueSkip( $flow_config );' );
$create_job_pos = strpos( $run_flow_src, '$job_id = $this->db_jobs->create_job( $job_data );' );
assert_run_flow_empty_drain_true( false !== $skip_pos && false !== $create_job_pos && $skip_pos < $create_job_pos, 'empty drain queue is skipped before job creation' );

echo "OK ({$assertions} assertions)\n";
Loading