Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 10 additions & 155 deletions src/app/endpoints/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

"""Handler for REST API call to provide answer using Responses API (LCORE specification)."""

import asyncio
import json
import time
from collections.abc import AsyncIterator, Sequence
Expand Down Expand Up @@ -32,6 +31,11 @@
APIStatusError as OpenAIAPIStatusError,
)

from app.endpoints.responses_telemetry import (
queue_blocked_response_event,
queue_completed_response_event,
queue_responses_error_event,
)
from authentication import get_auth_dependency
from authentication.interface import AuthTuple
from authorization.azure_token_manager import AzureEntraIDManager
Expand Down Expand Up @@ -60,7 +64,6 @@
from models.common.responses.responses_context import ResponsesContext
from models.common.turn_summary import TurnSummary
from models.config import Action
from observability import ResponsesEventData, build_responses_event, send_splunk_event
from utils.conversations import append_turn_items_to_conversation
from utils.endpoints import (
check_configuration_loaded,
Expand Down Expand Up @@ -159,96 +162,6 @@ def _get_user_agent(request: Request) -> Optional[str]:
}


# Strong references for fire-and-forget telemetry tasks so they aren't
# garbage-collected before completion (the event loop only holds weak refs).
_background_splunk_tasks: set[asyncio.Task[None]] = set()


def _queue_responses_splunk_event( # pylint: disable=too-many-arguments,too-many-positional-arguments
background_tasks: Optional[BackgroundTasks],
input_text: str,
response_text: str,
conversation_id: str,
model: str,
rh_identity_context: tuple[str, str],
inference_time: float,
sourcetype: str,
input_tokens: int = 0,
output_tokens: int = 0,
fire_and_forget: bool = False,
user_agent: Optional[str] = None,
) -> None:
"""Build and queue a Splunk telemetry event for the responses endpoint.

No-op when background_tasks is None and fire_and_forget is False
(Splunk telemetry disabled).

Args:
background_tasks: FastAPI background task manager, or None if disabled.
input_text: User input text.
response_text: Response text from LLM or shield.
conversation_id: Conversation identifier.
model: Model name used for inference.
rh_identity_context: Tuple of (org_id, system_id) from RH identity.
inference_time: Request processing duration in seconds.
sourcetype: Splunk sourcetype for the event.
input_tokens: Number of prompt tokens consumed.
output_tokens: Number of completion tokens produced.
fire_and_forget: When True, dispatch via asyncio.create_task() instead
of background_tasks. Use for error paths where an HTTPException
follows, since FastAPI discards BackgroundTasks on non-2xx responses.
user_agent: Sanitized User-Agent string from the request header, or None.
"""
if not fire_and_forget and background_tasks is None:
return
org_id, system_id = rh_identity_context
event_data = ResponsesEventData(
input_text=input_text,
response_text=response_text,
conversation_id=conversation_id,
model=model,
org_id=org_id,
system_id=system_id,
inference_time=inference_time,
input_tokens=input_tokens,
output_tokens=output_tokens,
user_agent=user_agent,
)
event = build_responses_event(event_data)
if fire_and_forget:
task = asyncio.create_task(send_splunk_event(event, sourcetype))
_background_splunk_tasks.add(task)
task.add_done_callback(_background_splunk_tasks.discard)
elif background_tasks is not None:
background_tasks.add_task(send_splunk_event, event, sourcetype)


def _queue_responses_error_event(
error: Exception,
api_params: ResponsesApiParams,
context: ResponsesContext,
) -> None:
"""Queue fire-and-forget Splunk telemetry for a Responses API error.

Args:
error: The backend exception being converted into an HTTP error.
api_params: Responses API parameters for the failed request.
context: Request-scoped Responses API context.
"""
_queue_responses_splunk_event(
background_tasks=context.background_tasks,
input_text=context.input_text,
response_text=str(error),
conversation_id=normalize_conversation_id(api_params.conversation),
model=api_params.model,
rh_identity_context=context.rh_identity_context,
inference_time=(datetime.now(UTC) - context.started_at).total_seconds(),
sourcetype="responses_error",
fire_and_forget=True,
user_agent=context.user_agent,
)


def _http_exception_for_response_api_error(
error: Exception,
api_params: ResponsesApiParams,
Expand Down Expand Up @@ -297,7 +210,7 @@ def _raise_response_api_http_exception(
http_exception = _http_exception_for_response_api_error(error, api_params)
if http_exception is None:
raise error
_queue_responses_error_event(error, api_params, context)
queue_responses_error_event(error, api_params, context)
raise http_exception from error


Expand All @@ -321,31 +234,6 @@ async def _persist_blocked_response_turn(
)


def _queue_blocked_response_event(
api_params: ResponsesApiParams,
context: ResponsesContext,
response_text: str,
) -> None:
"""Queue Splunk telemetry for a shield-blocked Responses API request.

Args:
api_params: Responses API parameters for the blocked request.
context: Request-scoped Responses API context.
response_text: Refusal text sent to the client.
"""
_queue_responses_splunk_event(
background_tasks=context.background_tasks,
input_text=context.input_text,
response_text=response_text,
conversation_id=normalize_conversation_id(api_params.conversation),
model=api_params.model,
rh_identity_context=context.rh_identity_context,
inference_time=(datetime.now(UTC) - context.started_at).total_seconds(),
sourcetype="responses_shield_blocked",
user_agent=context.user_agent,
)


async def _append_previous_response_turn(
api_params: ResponsesApiParams,
context: ResponsesContext,
Expand Down Expand Up @@ -419,39 +307,6 @@ def _store_response_query_results(
)


def _queue_completed_response_event(
api_params: ResponsesApiParams,
context: ResponsesContext,
turn_summary: TurnSummary,
completed_at: datetime,
response_text: str,
) -> None:
"""Queue Splunk telemetry for a completed Responses API request.

Args:
api_params: Responses API parameters for the completed request.
context: Request-scoped Responses API context.
turn_summary: Summary containing token usage for telemetry.
completed_at: Time when response handling completed.
response_text: Final text sent to the client.
"""
if context.moderation_result.decision != "passed":
return
_queue_responses_splunk_event(
background_tasks=context.background_tasks,
input_text=context.input_text,
response_text=response_text,
conversation_id=normalize_conversation_id(api_params.conversation),
model=api_params.model,
rh_identity_context=context.rh_identity_context,
inference_time=(completed_at - context.started_at).total_seconds(),
sourcetype="responses_completed",
input_tokens=turn_summary.token_usage.input_tokens,
output_tokens=turn_summary.token_usage.output_tokens,
user_agent=context.user_agent,
)


@router.post(
"/responses",
responses=responses_response,
Expand Down Expand Up @@ -682,7 +537,7 @@ async def handle_streaming_response(
turn_summary.llm_response = context.moderation_result.message
generator = shield_violation_generator(api_params, context)
await _persist_blocked_response_turn(api_params, context)
_queue_blocked_response_event(
queue_blocked_response_event(
api_params,
context,
context.moderation_result.message,
Expand Down Expand Up @@ -1141,7 +996,7 @@ async def generate_response(
completed_at,
topic_summary,
)
_queue_completed_response_event(
queue_completed_response_event(
api_params,
context,
turn_summary,
Expand Down Expand Up @@ -1178,7 +1033,7 @@ async def handle_non_streaming_response(
**api_params.echoed_params(configuration.rag_id_mapping),
)
await _persist_blocked_response_turn(api_params, context)
_queue_blocked_response_event(api_params, context, output_text)
queue_blocked_response_event(api_params, context, output_text)
else:
inference_start_time = time.monotonic()
inference_metric_recorded = False
Expand Down Expand Up @@ -1258,7 +1113,7 @@ async def handle_non_streaming_response(
completed_at,
topic_summary,
)
_queue_completed_response_event(
queue_completed_response_event(
api_params,
context,
turn_summary,
Expand Down
Loading
Loading