feat(proactive): server-side Gemini gRPC service for desktop task extraction#6291
feat(proactive): server-side Gemini gRPC service for desktop task extraction#6291
Conversation
…ction Defines the ProactiveAI service contract with bidi streaming Session RPC. Includes ClientEvent/ServerEvent oneof messages, ToolCallRequest/ToolResult for desktop search delegation, and SessionContext for task state prefetch. Refs #6153 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Auto-generated from proto/proactive/v1/proactive.proto using grpc_tools.protoc. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extracts and verifies Firebase UID from gRPC 'authorization' metadata. Uses contextvars for request-scoped UID propagation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Drives the Gemini generateContent API for task extraction from screenshots. 5 tool declarations (search_similar, search_keywords, extract_task, reject_task, no_task_found). Search tools yield ToolCallRequest for desktop round-trip; terminal tools yield AnalysisOutcome directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Handles ClientHello handshake, context caching, FrameEvent dispatch to ServerTaskAssistant, and heartbeat keepalive. Auth verified once at stream open. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Async gRPC server with Firebase init, keepalive tuning, and 10MB message size limit for screenshot payloads. Port 50051 (configurable via GRPC_PORT). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Python 3.11-slim, installs proactive-specific requirements, exposes port 50051. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
grpcio, grpcio-tools, protobuf, firebase-admin, httpx. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Regenerates Python gRPC stubs from proto/proactive/v1/proactive.proto into backend/proactive/v1/. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5 tests: ClientHello handshake, frame-before-hello error, heartbeat silence, context refresh on frame, auth failure abort. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
14 tests: prompt building (4), function call parsing (3), priority mapping (1), terminal decisions (3), search delegation (1), error handling (1), no-function-call fallback (1). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Required by the proactive AI gRPC service. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Greptile SummaryThis PR introduces a new Key issues found:
Confidence Score: 1/5Not safe to merge — the server will not start due to an invalid gRPC API call, the multi-turn tool loop is architecturally incomplete, and the Gemini API key is exposed in logs and client error messages. Three blocking issues: (1) grpc.method_handlers_generic_handler does not exist in grpc Python, causing an immediate AttributeError at startup; (2) the central feature — the search tool round-trip that drives the cost reduction — is unimplemented (both callbacks are stubs, analyze_frame returns after the first ToolCallRequest with no resume path); (3) the Gemini API key is embedded as a URL query parameter and propagated into logs and client error messages. The proto design and single-turn paths are solid, but the PR cannot be deployed as-is. backend/proactive/task_assistant.py (broken tool loop + API key leak), backend/proactive/service.py (no-op/NotImplementedError callbacks), backend/proactive/v1/proactive_pb2_grpc.py (invalid grpc API — server will not start) Important Files Changed
Sequence DiagramsequenceDiagram
participant D as Desktop Client
participant S as ProactiveAI Server
participant G as Gemini API
D->>S: ClientEvent(ClientHello + SessionContext)
S-->>D: ServerEvent(SessionReady)
D->>S: ClientEvent(FrameEvent + jpeg_bytes)
Note over S: analyze_frame() called
S->>G: generateContent(prompt + image + tools)
G-->>S: FunctionCall(search_similar | search_keywords)
Note over S,D: CURRENTLY BROKEN — returns here
S-->>D: ServerEvent(ToolCallRequest)
D->>S: ClientEvent(ToolResult)
Note over S: receive_tool_result raises NotImplementedError
Note over S: WORKS — terminal decisions
S->>G: generateContent(prompt + image + tools)
G-->>S: FunctionCall(extract_task | reject_task | no_task_found)
S-->>D: ServerEvent(AnalysisOutcome)
D->>S: ClientEvent(Heartbeat)
Note over S: silent — no response
Reviews (1): Last reviewed commit: "docs: add proactive service to CLAUDE.md..." | Re-trigger Greptile |
| confidence=func_args.get('confidence', 0.0), | ||
| ) | ||
| yield pb2.ServerEvent( | ||
| analysis_outcome=pb2.AnalysisOutcome( | ||
| outcome_kind=pb2.EXTRACT_TASK, | ||
| task=task, | ||
| context_summary=func_args.get('context_summary', ''), | ||
| current_activity=func_args.get('current_activity', ''), | ||
| frame_id=frame_id, | ||
| ) | ||
| ) | ||
| return | ||
|
|
||
| # Search tools: delegate to desktop via gRPC stream | ||
| if func_name in ('search_similar', 'search_keywords'): |
There was a problem hiding this comment.
Multi-turn search loop is broken — analyze_frame always returns after one Gemini call
After yielding a ToolCallRequest, analyze_frame sets self._pending_request_id / self._pending_func_name and immediately returns. There is no code path anywhere that reads these instance variables or resumes the iteration with a ToolResult. Additionally, the receive_tool_result callback passed from service.py unconditionally raises NotImplementedError (see _make_tool_receiver).
This means any frame where Gemini wants to call search_similar or search_keywords results in only the ToolCallRequest being sent — the desktop will receive it, execute the search, send back a ToolResult, and the server will silently discard it as an "Unexpected standalone tool_result". The analysis never advances past the first Gemini call, the loop's MAX_ITERATIONS guard (line 210) is never exercised in practice, and the stated cost reduction from collapsing 12 calls per trigger into server-controlled loops is not realized.
The architecture requires one of:
- Converting
analyze_frameto a true async generator that awaits a tool-result future before continuing thefor iterationloop, with the service layer fulfilling that future when the clienttool_resultevent arrives, or - Materialising the entire bidi conversation in the service layer with an
asyncio.Queueper in-flight frame soanalyze_framecanawait queue.get()for each search turn.
Until this is resolved the service correctly handles only no_task_found, extract_task, and reject_task on the very first Gemini response.
backend/proactive/task_assistant.py
Outdated
| and feed the ToolResult back by sending it on the bidi stream. The next | ||
| client message after a ToolCallRequest must be a ToolResult. | ||
| """ | ||
| prompt = _build_prompt(session_context, frame.app_name) | ||
|
|
||
| # Build initial Gemini contents with image |
There was a problem hiding this comment.
API key embedded in URL — will be leaked in logs and error messages
The Gemini API key is appended as a plain query parameter. When httpx raises an HTTPStatusError or ConnectError, the exception message includes the full URL, meaning the key will appear in:
logger.error(... error=%s ...)on line 222 — written to server logs.- The
ServerError.messagefield sent to the desktop client (Gemini API error: {e}).
This violates the project's logging-security rule ("Never log raw sensitive data").
Use the x-goog-api-key request header instead:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f'{GEMINI_API_URL}/{GEMINI_MODEL}:generateContent',
json=body,
headers={'x-goog-api-key': GEMINI_API_KEY},
)| request_deserializer=proactive_dot_v1_dot_proactive__pb2.ClientEvent.FromString, | ||
| response_serializer=proactive_dot_v1_dot_proactive__pb2.ServerEvent.SerializeToString, | ||
| ), | ||
| } |
There was a problem hiding this comment.
grpc.method_handlers_generic_handler does not exist — server will fail to start
grpc.method_handlers_generic_handler is not part of the public grpc Python API. Calling it will raise AttributeError: module 'grpc' has no attribute 'method_handlers_generic_handler' at server startup, before any request is handled.
Standard grpc-tools generated code uses grpc.method_service_handler (grpc ≥ 1.49). For grpc ≥ 1.62 (as pinned in requirements.txt):
| } | |
| generic_handler = grpc.method_service_handler('proactive.v1.ProactiveAI', rpc_method_handlers) |
If regenerating the stubs with grpc_tools.protoc produces different output, use whatever protoc emits — do not hand-edit the generated file.
backend/proactive/service.py
Outdated
| except asyncio.CancelledError: | ||
| logger.info('Session cancelled: uid=%s session=%s', uid, session_id) | ||
| except Exception as e: | ||
| logger.exception('Session error: uid=%s session=%s', uid, session_id) | ||
| yield pb2.ServerEvent( | ||
| server_error=pb2.ServerError( | ||
| code='INTERNAL', | ||
| message='Internal server error', | ||
| retryable=False, | ||
| ) | ||
| ) | ||
| finally: | ||
| logger.info('Session closed: uid=%s session=%s', uid, session_id) | ||
|
|
||
|
|
||
| def _make_tool_sender(context): | ||
| """Create a callback that sends ToolCallRequest to the client stream.""" | ||
|
|
||
| async def send_tool_request(tool_request: pb2.ToolCallRequest): | ||
| # In bidi streaming, we yield from the generator — but since the service | ||
| # method is the generator, we return events from analyze_frame instead. | ||
| # This is a no-op; tool requests are yielded inline from analyze_frame. | ||
| pass | ||
|
|
||
| return send_tool_request | ||
|
|
||
|
|
||
| def _make_tool_receiver(request_iterator, expected_frame_id): | ||
| """Create a callback that waits for a ToolResult from the client.""" | ||
|
|
||
| async def receive_tool_result(request_id: str, timeout_ms: int = 10000) -> pb2.ToolResult: | ||
| # In the bidi stream, the next message from the client should be the ToolResult. | ||
| # This is handled by the task_assistant's analyze_frame loop which reads | ||
| # directly from a queue. For PR1, we use a simple inline approach. | ||
| raise NotImplementedError('Tool result reception is handled inline in analyze_frame') | ||
|
|
||
| return receive_tool_result |
There was a problem hiding this comment.
_make_tool_sender is a no-op and _make_tool_receiver always raises
Both factory functions produce callbacks that are never usable:
_make_tool_sender(send_tool_request) just doespass— it is passed intoanalyze_framebutanalyze_framenever calls it; it yieldsToolCallRequestevents directly._make_tool_receiver(receive_tool_result) unconditionally raisesNotImplementedError. Any future iteration that callsawait receive_tool_result(...)will immediately throw, surfacing as an unhandled exception inside theasync forinSession, terminating the session.
These stubs create a false impression that the round-trip plumbing exists. They should either be replaced with a real implementation (e.g., an asyncio.Queue per frame populated by the tool_result branch of the main event loop) or removed entirely until the feature is ready.
|
|
||
| context.abort.assert_called_once() | ||
| args = context.abort.call_args |
There was a problem hiding this comment.
In-function import violates project import rules
import grpc is placed inside the test function body. Per the project's backend import rules, all imports must be at module top level. Move import grpc to the top of the file alongside the other imports.
Context Used: Backend Python import rules - no in-function impor... (source)
|
|
||
| GRPC_PORT = int(os.environ.get('GRPC_PORT', '50051')) |
There was a problem hiding this comment.
Missing guard for empty API key at startup
The API key defaults to '' if the environment variable is absent. The server will start and accept connections, but every _call_gemini call will fail with a 400, returning a retryable error to every client. Add a fast-fail check inside serve() before _init_firebase():
if not GEMINI_API_KEY:
raise RuntimeError('GEMINI_API_KEY environment variable is required but not set')…estore schema fields Addresses 3 review findings: 1. Error messages no longer leak API key — logs error_type only, not full URL 2. Search tools now await receive_tool_result() and inject results back into Gemini conversation for multi-turn extract/reject/no_task decisions 3. extract_task tool declaration and ExtractedTask construction now include source_category, source_subcategory, and relevance_score for schema parity Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Service layer now runs analyze_frame in a background task and shuttles ToolCallRequest/ToolResult between the generator and the bidi stream. Removes placeholder _make_tool_sender/_make_tool_receiver stubs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… sanitization tests 5 new tests: search→extract full loop, search→reject full loop, tool result timeout, source_category/relevance_score parity, API key not leaked in error messages. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Removes stale send_tool_request parameter from mock_analyze_frame. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Use asyncio.wait with FIRST_COMPLETED for concurrent output/client reads during tool waits (fixes timeout race where stream blocks) - Enforce request_id matching on tool results (discard mismatches) - Accept heartbeats during tool wait periods Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Prevents key exposure in httpx error messages and server logs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
_run_generator now yields a retryable ServerError when analyze_frame raises unexpectedly, instead of silently dropping the frame. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Expand required fields to include description, priority, confidence - Add _safe_int helper to handle non-integer model output gracefully Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
All 12 fields now required, matching desktop TaskAssistant.swift:749. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fail fast on missing key instead of booting and failing every request. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Flow Diagram & Sequence Catalog (CP8.2)Sequence Catalog
Changed Path IDs
by AI for @beastoin |
Replace direct __anext__() calls on request_iterator (which conflicts with async-for iteration) with a dedicated _pump_client task that reads into a queue. The concurrent read pattern now uses client_queue instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- P1: auth extraction (success, missing header, no bearer, missing uid) - P6: session-level bidi tool result routing through client_queue - P15: GEMINI_API_KEY startup guard - P16: generator error surfacing as ServerError Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Verifies API key is sent in x-goog-api-key header, not URL query param. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CP9 Evidence SynthesisL1 SynthesisAll 17 changed paths (P1-P17) proven via 35 unit tests. Server boots successfully with GEMINI_API_KEY=test-dummy-key on port 10140. Startup guard (P15) correctly rejects missing key with RuntimeError. Session handshake (P2) returns SessionReady with protocol_version=1.0, max_iterations=5, supported tools=[SEARCH_SIMILAR, SEARCH_KEYWORDS]. Heartbeat (P14) handled silently. Gemini API error (P11) returns sanitized GEMINI_ERROR without API key in message. Auth failure (P1/S5) returns UNAUTHENTICATED. Generator error (P16) surfaces as retryable ServerError. Non-happy paths: startup guard, auth failure, Gemini error, tool result timeout, bad model output — all covered. L2 SynthesisgRPC server accepts client connections over network (port 10142), correctly processes the gRPC bidi stream protocol, and rejects unauthenticated requests with proper UNAUTHENTICATED status code. Firebase auth integration works correctly. Full desktop client integration (Swift side) deferred to follow-up PR per issue #6153 scope — this PR is server-only. Changed-Path Coverage Checklist
L2 paths marked UNTESTED require real Gemini API key + Firebase credentials. Deferred to production deployment verification. The gRPC transport layer, auth, and error handling are proven at L2. by AI for @beastoin |
L2 Live Test Evidence — Real Firebase Auth + Gemini E2ESetup
Test Results — 7/7 PASS
Server Logs (key excerpts)Changed-Path Coverage (L2)
L2 SynthesisAll changed paths P1-P16 proven with real Firebase auth (custom token → ID token → verify_id_token on server) and real Gemini API calls (200 OK responses). Non-happy paths proven: bad auth rejected (UNAUTHENTICATED), missing context (NO_CONTEXT error), Gemini rate limit (GEMINI_ERROR surfaced correctly). The service correctly initializes Firebase from SERVICE_ACCOUNT_JSON, verifies real ID tokens, runs the Gemini tool loop, and handles all error conditions gracefully. by AI for @beastoin |
- Add grpc-swift and swift-protobuf dependencies to Package.swift - Generate Swift proto stubs from proactive.proto (pb + grpc) - Implement ProactiveGRPCClient actor: bidi session stream, frame analysis with server-driven tool loop, heartbeat, reconnection - Update TaskAssistant with dual-path dispatch: gRPC server-side when connected, local Gemini proxy as fallback - Add gRPC lifecycle management to ProactiveAssistantsPlugin: connect on monitoring start, build SessionContext from local task store + goals, disconnect on stop Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Use .priorityHigh/.priorityMedium/.priorityLow (not .high/.medium/.low) - Replace has* property checks with switch on event oneof - Add .unspecified case to outcomeKind switch for exhaustiveness - Use String(describing:) for ToolKind in logger calls Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
processFrame() now requires a connected gRPC client. Skips analysis with a log message when not connected instead of falling back to the local Gemini proxy path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve test.sh conflict — include both proactive and desktop_transcribe tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
L2 End-to-End Test Evidence — Desktop App ↔ gRPC Backend (8+ min soak)Setup:
Results (PASS):
App log evidence ( Backend: gRPC server (PID 160934) ran continuously on VPS port 10140. Test performed by: @ren (Mac Mini operator) with @kai (backend + coordination) by AI for @beastoin |
Summary
Implements server-side ProactiveAI gRPC service (#6153) — moves Gemini API calls from the desktop client to a backend gRPC service with bidirectional streaming. No local fallback — desktop requires gRPC connection for task extraction.
Backend (Python)
backend/proactive/) with bidirectionalSessionstreamToolCallRequest/ToolResultround-tripsproto/proactive/v1/proactive.proto): ClientHello, FrameEvent, ToolCallRequest, AnalysisOutcome, etc.Desktop (Swift/macOS)
.proto(grpc-swift 1.x compatible with macOS 14.0)OMI_GRPC_HOST/OMI_GRPC_PORTenv vars for server endpointArchitecture
Test plan
Closes #6153
by AI for @beastoin