Skip to content

fix: Keep stream open on interrupted state changes#756

Open
kabir wants to merge 3 commits intoa2aproject:mainfrom
kabir:issue-754
Open

fix: Keep stream open on interrupted state changes#756
kabir wants to merge 3 commits intoa2aproject:mainfrom
kabir:issue-754

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Mar 17, 2026

Fixes #754

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves an issue where task event streams were prematurely closed when a task entered an interrupted state, such as INPUT_REQUIRED. By refining the TaskState classification and updating the stream termination logic to align with the A2A Protocol Specification, streams will now correctly persist through interrupted states, allowing for subsequent updates, and will only terminate when a task reaches a definitive final state. This ensures a more robust and compliant event streaming experience.

Highlights

  • Stream Termination Logic Refined: The logic for terminating task event streams was updated to strictly adhere to the A2A Protocol Specification, ensuring streams only close when a task reaches a truly terminal state.
  • TaskState Classification Enhanced: The TaskState enum was extended with an isInterrupted property and corresponding method, allowing for explicit identification of states where a task is paused awaiting external action.
  • INPUT_REQUIRED State Handling Corrected: The isStreamTerminatingTask method in EventConsumer no longer treats TASK_STATE_INPUT_REQUIRED as a stream-terminating condition, preventing premature stream closure.
  • New Test Coverage Added: A comprehensive test case was introduced to validate that event streams correctly remain open for interrupted states and only terminate upon reaching a final task state.
Changelog
  • server-common/src/main/java/io/a2a/server/events/EventConsumer.java
    • Modified isStreamTerminatingTask to remove TASK_STATE_INPUT_REQUIRED from the conditions that trigger stream termination, now relying solely on isFinal().
    • Updated Javadoc for isStreamTerminatingTask to reflect the A2A Protocol Specification regarding stream termination.
  • spec/src/main/java/io/a2a/spec/TaskState.java
    • Introduced a new isInterrupted boolean field to the TaskState enum constructor.
    • Updated all TaskState enum instances to initialize the new isInterrupted field.
    • Added a new public method isInterrupted() to determine if a state is interrupted.
    • Revised Javadoc to categorize states into "Active Transitional States" and "Interrupted States" and to explain the purpose of isInterrupted().
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
    • Added testSubscribeToTaskWithInterruptedStateKeepsStreamOpen, a new integration test to verify that event streams correctly remain open for INPUT_REQUIRED and WORKING states, and only terminate when a task reaches a COMPLETED state.
Activity
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly resolves an issue where event streams were prematurely closing for tasks in an interrupted state, such as INPUT_REQUIRED. The fix aligns the stream termination logic in EventConsumer with the A2A protocol specification, ensuring streams only terminate when a task reaches a final state. The refactoring of the TaskState enum to explicitly define and track isInterrupted states is a solid design improvement that enhances code clarity. Additionally, the new integration test, testSubscribeToTaskWithInterruptedStateKeepsStreamOpen, provides excellent coverage and validation for the corrected behavior. The changes are well-implemented and I have no further suggestions.

@jmesnil
Copy link
Collaborator

jmesnil commented Mar 17, 2026

@kabir The JSON-RPC reproducer still fails:

 # Step 1: Create a non-terminal task (input_required state)
   RESPONSE=$(curl -s -X POST http://localhost:9999 \
     -H "Content-Type: application/json" \
     -d '{
       "jsonrpc":"2.0","id":1,"method":"SendMessage",
       "params":{"message":{"role":"ROLE_USER","parts":[{"text":"test"}],"messageId":"tck-input-required-repro"}}
     }')
   TASK_ID=$(echo "$RESPONSE" | python3 -c "import sys,json; r=json.load(sys.stdin); print(r['result']['task']['id'])")
   echo "Task ID: $TASK_ID  State: TASK_STATE_INPUT_REQUIRED"

At this point, the task is in TASK_STATE_INPUT_REQUIRED.
But when I subscribe to the task, the HTTP POST request returns immediately:

$ curl -s -N -X POST http://localhost:9999 \
     -H "Content-Type: application/json" \
     -H "Accept: text/event-stream" \
     -d "{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"SubscribeToTask\",\"params\":{\"id\":\"$TASK_ID\"}}"

data: {"jsonrpc":"2.0","id":2,"result":{"task":{"id":"938089f1-9ce5-4bff-8cc8-8d092f3e5c84","contextId":"ba5beadd-cd73-4188-ad1f-f011dec726a9","status":{"state":"TASK_STATE_INPUT_REQUIRED","timestamp":"2026-03-17T14:40:49.748656Z"},"artifacts":[],"history":[],"metadata":{}}}}
id: 0

BTW, the id: 0 looks very suspicious in the event stream...

At this point, I was expecting the request to stay open until I complete the task with another message such as:

 curl -s -X POST http://localhost:9999 \
     -H "Content-Type: application/json" \
     -d "{
       \"jsonrpc\":\"2.0\",\"id\":3,\"method\":\"SendMessage\",
       \"params\":{\"message\":{\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"done\"}],\"messageId\":\"tck-complete-task-repro\",\"taskId\":\"$TASK_ID\"}}
     }"

kabir and others added 2 commits March 17, 2026 15:06
The EventConsumer grace period logic was incorrectly closing the queue
after agent completion, even when the task was in an interrupted state
(INPUT_REQUIRED or AUTH_REQUIRED).

Per A2A Protocol Specification 3.1.6, streams must remain open for
interrupted states to receive future state updates, regardless of
whether the agent has completed.

Changes:
- Track last seen task state in EventConsumer
- Prevent grace period queue close if task is in interrupted state
- Add logging for interrupted state detection

This fixes the issue where SubscribeToTask would close immediately
after delivering the initial Task snapshot for INPUT_REQUIRED tasks.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@kabir
Copy link
Collaborator Author

kabir commented Mar 17, 2026

@jmesnil I've tried to make the test do what the reproducer does

@kabir
Copy link
Collaborator Author

kabir commented Mar 17, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request provides a critical fix to ensure that event streams remain open for tasks in an interrupted state (like INPUT_REQUIRED), adhering to the A2A specification. The changes are well-implemented across several files. In EventConsumer, the logic for closing the stream is corrected to check for interrupted states. The TaskState enum is refactored to explicitly include the concept of an "interrupted" state, which is a great design improvement that enhances clarity and maintainability. The isStreamTerminatingTask method is also corrected. The changes are supported by excellent tests, including an updated unit test and a new, comprehensive integration test that thoroughly validates the end-to-end behavior. The code quality is high, and the fix appears robust and correct.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: SubscribeToTask stream closes immediately instead of staying open for state updates

2 participants