Skip to content

[CELEBORN-2230] SparkUtils#shouldReportShuffleFetchFailure method should retrieve the number of task failures from TaskSetManager#3650

Open
leixm wants to merge 5 commits intoapache:mainfrom
leixm:CELEBORN-2230
Open

[CELEBORN-2230] SparkUtils#shouldReportShuffleFetchFailure method should retrieve the number of task failures from TaskSetManager#3650
leixm wants to merge 5 commits intoapache:mainfrom
leixm:CELEBORN-2230

Conversation

@leixm
Copy link
Copy Markdown
Contributor

@leixm leixm commented Apr 7, 2026

What changes were proposed in this pull request?

Reopen from #3556, retrieve the number of task failures from TaskSetManager in SparkUtils#shouldReportShuffleFetchFailure method

Why are the changes needed?

https://github.com/apache/celeborn/blob/main/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java#L514 We record the failure counts for task attempts in the "UNKNOWN" and "FAILED" states, but spark might not record the failure counts for task attempts in the FAILED state. This is a common occurrence in our production environment where task attempts fail due to container preemption. This situation happens frequently and failure counts should not be recorded, as existing code logic makes it easier for stageRerun to be triggered prematurely. Therefore, obtaining the failure counts for task attempts from the taskSetManager would be more accurate.

Does this PR resolve a correctness bug?

No.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UTs.

@leixm
Copy link
Copy Markdown
Contributor Author

leixm commented Apr 7, 2026

cc @turboFei

…uld retrieve the number of task failures from TaskSetManager
@leixm
Copy link
Copy Markdown
Contributor Author

leixm commented Apr 8, 2026

cc @turboFei

@RexXiong
Copy link
Copy Markdown
Contributor

RexXiong commented Apr 9, 2026

Overall, this change looks good to me. The approach of retrieving failure counts directly from TaskSetManager.numFailures is more accurate than manually counting failed task attempts, especially for cases like container preemption where Spark doesn't increment the failure count.

One suggestion: It would be helpful to add a test case that verifies the failure count is correctly incremented after an actual task failure (e.g., simulate a task failure and then verify that getTaskFailureCount returns the expected increased value). Currently, the test only validates boundary conditions (initial value, out-of-bounds indices), but doesn't cover the actual failure counting scenario.


by claude

@leixm
Copy link
Copy Markdown
Contributor Author

leixm commented Apr 9, 2026

Overall, this change looks good to me. The approach of retrieving failure counts directly from TaskSetManager.numFailures is more accurate than manually counting failed task attempts, especially for cases like container preemption where Spark doesn't increment the failure count.

One suggestion: It would be helpful to add a test case that verifies the failure count is correctly incremented after an actual task failure (e.g., simulate a task failure and then verify that getTaskFailureCount returns the expected increased value). Currently, the test only validates boundary conditions (initial value, out-of-bounds indices), but doesn't cover the actual failure counting scenario.

by claude

Added a UT, PTAL.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates Celeborn’s Spark integration to determine whether to report shuffle fetch failures using Spark’s internal task failure counters from TaskSetManager, improving accuracy for cases where Spark does not count certain failed attempts (e.g., preemption-related failures).

Changes:

  • Add reflective access to TaskSetManager.numFailures and a helper method to read per-task failure counts.
  • Update SparkUtils#shouldReportShuffleFetchFailure (Spark 2 / Spark 3) to use numFailures instead of inferring failures from attempt statuses.
  • Add Spark IT coverage for getTaskFailureCount, including a scenario with real task retries/failures.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala Adds integration tests validating getTaskFailureCount behavior and retry scenarios.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Reads TaskSetManager.numFailures and uses it in fetch-failure reporting decisions (Spark 3).
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java Same as Spark 3 changes, adapted for Spark 2 module.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +559 to +564
int previousFailureCount = getTaskFailureCount(taskSetManager, taskInfo.index());
// Fail-safe: if failure count cannot be determined, conservatively trigger
// FetchFailed to avoid silently swallowing the error.
if (previousFailureCount < 0) {
return true;
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When getTaskFailureCount() fails (returns < 0), shouldReportShuffleFetchFailure() immediately returns true. This makes the pre-check aggressively report FetchFailed even if there are other running attempts and the retry limit has not been reached, which can reintroduce premature stage reruns in exactly the scenarios this change is trying to avoid (e.g., if reflective access to TaskSetManager.numFailures breaks on some Spark builds). Consider falling back to the previous attempt/status-based counting (or at least gating on !hasRunningAttempt) instead of unconditional true, and log at WARN once to avoid error spam if the field is unavailable.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is unreasonable. It shouldn't check !hasRunningAttempt, but should directly return true. Otherwise, FetchFailed won't be triggered and app will fail.

Comment on lines +424 to +427
// Fail-safe: if failure count cannot be determined, conservatively trigger
// FetchFailed to avoid silently swallowing the error.
if (previousFailureCount < 0) {
return true;
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When getTaskFailureCount() fails (returns < 0), shouldReportShuffleFetchFailure() immediately returns true. This makes the pre-check aggressively report FetchFailed even if there are other running attempts and the retry limit has not been reached, which can reintroduce premature stage reruns if reflective access to TaskSetManager.numFailures fails on some Spark builds. Consider falling back to the previous attempt/status-based counting (or at least gating on !hasRunningAttempt) rather than unconditional true, and avoid logging this as an error on every call if the field is unavailable.

Suggested change
// Fail-safe: if failure count cannot be determined, conservatively trigger
// FetchFailed to avoid silently swallowing the error.
if (previousFailureCount < 0) {
return true;
// If failure count cannot be determined, fall back to attempt status based
// behavior instead of aggressively reporting FetchFailed. This avoids
// premature stage reruns when reflective access to failure counts is
// unavailable, while still reporting the failure when no other attempt is
// running.
if (previousFailureCount < 0) {
if (!hasRunningAttempt) {
logger.warn(
"StageId={}, index={}, taskId={}, attemptNumber={}: Unable to determine "
+ "previous failure count, and no other running attempt exists. "
+ "Reporting shuffle fetch failure.",
stageId,
taskInfo.index(),
taskId,
taskInfo.attemptNumber());
return true;
} else {
logger.warn(
"StageId={}, index={}, taskId={}, attemptNumber={}: Unable to determine "
+ "previous failure count, but another attempt is still running. "
+ "Deferring shuffle fetch failure report.",
stageId,
taskInfo.index(),
taskId,
taskInfo.attemptNumber());
return false;
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is unreasonable. It shouldn't check !hasRunningAttempt, but should directly return true. Otherwise, FetchFailed won't be triggered and app will fail.

Comment on lines +304 to +309
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
eventually(timeout(10.seconds), interval(100.milliseconds)) {
// taskId 0,1 failed and removed; taskId 2 is the surviving 3rd attempt
val taskSetManager = SparkUtils.getTaskSetManager(taskScheduler, 2)
assert(taskSetManager != null)
assert(SparkUtils.getTaskFailureCount(taskSetManager, 0) == 2)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumes the third attempt’s taskId will be exactly 2 (after two failures). Spark task IDs are globally assigned within a SparkContext and aren’t guaranteed to align with attempt count if any other tasks/stages run (including internal ones), which can make the test brittle across Spark versions/configs. Consider deriving the taskId dynamically (e.g., capturing TaskContext.taskAttemptId() via an accumulator/Promise, or scanning taskScheduler’s taskIdToTaskSetManager for the active TaskSetManager) instead of hardcoding 2.

Copilot uses AI. Check for mistakes.
@RexXiong
Copy link
Copy Markdown
Contributor

I think Copilot's suggestion makes sense here. When previousFailureCount < 0 (reflection fails), checking !hasRunningAttempt before returning true would avoid premature stage reruns while still ensuring the failure is eventually reported.

Could you elaborate on why "app will fail" if we check !hasRunningAttempt? If there's no running attempt, the method would still return true and trigger FetchFailed. The only difference is that when another attempt is running, we defer the failure report to give that attempt a chance to succeed.

This seems consistent with the original logic where !hasRunningAttempt is one of the conditions to trigger FetchFailed.

@leixm
Copy link
Copy Markdown
Contributor Author

leixm commented Apr 20, 2026

I think Copilot's suggestion makes sense here. When previousFailureCount < 0 (reflection fails), checking !hasRunningAttempt before returning true would avoid premature stage reruns while still ensuring the failure is eventually reported.

Could you elaborate on why "app will fail" if we check !hasRunningAttempt? If there's no running attempt, the method would still return true and trigger FetchFailed. The only difference is that when another attempt is running, we defer the failure report to give that attempt a chance to succeed.

This seems consistent with the original logic where !hasRunningAttempt is one of the conditions to trigger FetchFailed.

I made a mistake, you're right.

@github-actions
Copy link
Copy Markdown

This PR is stale because it has been open 20 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions Bot added the stale label May 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants