feat: async offline sync via Cloud Tasks + FCM#6125
Conversation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR introduces an asynchronous offline sync pipeline: audio Key issues found:
Confidence Score: 3/5Not safe to merge — two security issues (auth bypass, uid mismatch) and one data-integrity bug (GCS path collision) need to be fixed first. Four P1 findings: a GCS path collision that can cause silent audio data loss, a security bypass when the Cloud Tasks shared-secret env var is unset, a uid-from-payload not validated against the Firestore document (privilege escalation), and a duplicate-processing loop on every app open. The overall architecture is sound and the majority of files are clean, but the three backend issues affect correctness and security on the primary upload path. backend/routers/sync.py (GCS path + uid validation), backend/utils/cloud_tasks.py (auth bypass), app/lib/providers/sync_provider.dart (duplicate processing) Important Files Changed
Sequence DiagramsequenceDiagram
participant App
participant Backend
participant GCS
participant Firestore
participant CloudTasks
participant FCM
App->>Backend: POST /v2/sync/upload (multipart .bin files)
Backend->>Backend: Validate files & metadata
Backend->>GCS: Upload .bin files to uploads/{uid}/{job_id}/{filename}
Backend->>Firestore: Create offline_sync_jobs/{job_id} (status: pending)
Backend->>CloudTasks: Enqueue process task (job_id, uid)
Backend-->>App: 202 Accepted {job_id, file_count}
Note over CloudTasks,Backend: Async processing (up to 15 min)
CloudTasks->>Backend: POST /v2/sync/process (shared secret header)
Backend->>Firestore: Load job, mark status: processing
Backend->>GCS: Download .bin files to /tmp/
Backend->>Backend: decode → VAD → Deepgram → create conversations
Backend->>Firestore: Update job (status: completed, conversation_ids)
Backend->>FCM: send_sync_completed_message (silent push)
Backend->>GCS: Delete .bin files
FCM-->>App: offline_sync_completed data message
App->>App: SyncCompletedNotificationHandler broadcasts SyncCompletedEvent
App->>App: SyncProvider._onSyncCompleted → _processConversationResults
App->>Backend: Fetch new/updated conversations by ID
Reviews (1): Last reviewed commit: "refactor(sync): rename syncUploadV2 to s..." | Re-trigger Greptile |
| gcs_path = f"uploads/{uid}/{meta['filename']}" | ||
| blob = bucket.blob(gcs_path) | ||
| blob.upload_from_file(f.file, content_type='application/octet-stream') | ||
| gcs_paths.append(gcs_path) |
There was a problem hiding this comment.
GCS path missing
job_id — file overwrite race condition
The GCS path does not include the job_id, so two concurrent uploads of the same filename from the same user will collide: the second upload_from_file call silently overwrites the first blob before the first job's Cloud Task has had a chance to download and process it. The first job's Cloud Task would then either process the wrong batch's audio or fail with a missing-blob error, leading to silent data loss.
The PR description says paths should be offline_sync_bucket/{uid}/{job_id}/, but the code omits the segment.
| gcs_path = f"uploads/{uid}/{meta['filename']}" | |
| blob = bucket.blob(gcs_path) | |
| blob.upload_from_file(f.file, content_type='application/octet-stream') | |
| gcs_paths.append(gcs_path) | |
| gcs_path = f"uploads/{uid}/{job_id}/{meta['filename']}" |
|
|
||
| CLOUD_TASKS_QUEUE = os.getenv('CLOUD_TASKS_QUEUE') | ||
| BACKEND_PUBLIC_URL = os.getenv('BACKEND_PUBLIC_URL') | ||
| CLOUD_TASKS_SECRET = os.getenv('CLOUD_TASKS_SECRET') |
There was a problem hiding this comment.
Unset env var causes auth bypass on Cloud Tasks callback
os.getenv('CLOUD_TASKS_SECRET') returns None when the environment variable is not configured. In process_sync_job (sync.py) the guard is if secret != CLOUD_TASKS_SECRET. When the env var is absent, both sides are None — the inequality evaluates to False — so any unauthenticated request to /v2/sync/process passes.
The fix is to use a non-null fallback so a missing env var never matches a missing header. Replace the plain os.getenv(...) with os.getenv(...) or '', and update the endpoint guard to also reject an empty-string match with if not secret or secret != CLOUD_TASKS_SECRET.
| void _checkForMissedSyncCompletions() async { | ||
| try { | ||
| await Future.delayed(const Duration(seconds: 5)); | ||
| final completedJobs = await getSyncJobs(status: 'completed'); | ||
| for (final job in completedJobs) { | ||
| if (job.newConversationIds.isNotEmpty || job.updatedConversationIds.isNotEmpty) { | ||
| Logger.debug('SyncProvider: Found missed sync completion job=${job.id}'); | ||
| final result = SyncLocalFilesResponse( | ||
| newConversationIds: job.newConversationIds, | ||
| updatedConversationIds: job.updatedConversationIds, | ||
| ); | ||
| await _processConversationResults(result); | ||
| } | ||
| } | ||
| } catch (e) { | ||
| Logger.debug('SyncProvider: Error checking missed sync completions: $e'); | ||
| } | ||
| } |
There was a problem hiding this comment.
_checkForMissedSyncCompletions re-processes all completed jobs on every app open
getSyncJobs(status: 'completed') returns up to 20 completed jobs. Since completed jobs live in Firestore for 7 days, every app launch triggers _processConversationResults for every one of those jobs — including ones already handled by a previous FCM delivery or prior app open. There is no client-side record of which job IDs have already been processed.
The standard fix is to persist the set of already-processed job IDs in SharedPreferences and skip any that have been seen before. The same job ID fired via FCM in _onSyncCompleted should also be recorded so the two code paths stay in sync.
| payload = await request.json() | ||
| job_id = payload["job_id"] | ||
| uid = payload["uid"] | ||
|
|
||
| # 2. Load job from Firestore | ||
| job_ref = db.collection('offline_sync_jobs').document(job_id) | ||
| job_snap = job_ref.get() | ||
| if not job_snap.exists: | ||
| logger.warning(f"Offline sync job {job_id} not found, skipping") | ||
| return {"status": "skipped", "reason": "job not found"} | ||
|
|
||
| job = job_snap.to_dict() | ||
|
|
||
| # Already completed (idempotency — Cloud Tasks may redeliver) | ||
| if job['status'] == 'completed': | ||
| logger.info(f"Offline sync job {job_id} already completed, skipping") | ||
| return {"status": "skipped", "reason": "already completed"} | ||
|
|
There was a problem hiding this comment.
uid from Cloud Task payload not cross-validated against Firestore job document
After loading the job from Firestore using job_id, the code proceeds to use uid from the request payload for all downstream operations (conversation creation via process_segment, speech/DG usage recording, fair-use checks). The job document itself contains the authoritative uid — but it is never compared to the one in the payload.
Any caller that possesses the shared secret can craft a Cloud Task body with an arbitrary uid, causing audio uploaded by user A to be transcribed and stored as conversations for user B.
Add a validation step right after loading the job:
job = job_snap.to_dict()
# Validate that the uid in the payload matches the job's owner
if job.get('uid') != uid:
logger.error(f"process_sync_job uid mismatch: payload={uid} job={job.get('uid')}")
raise HTTPException(status_code=403, detail="Forbidden")| import 'package:omi/backend/schema/conversation.dart'; | ||
| import 'package:omi/services/wals/wal.dart'; | ||
| import 'package:omi/backend/schema/conversation.dart' show SyncUploadResponse; |
There was a problem hiding this comment.
Duplicate
conversation.dart import
conversation.dart is imported twice: once as a wildcard (line 10) and again with a show clause (line 12). The show SyncUploadResponse import is entirely redundant because the wildcard already exposes SyncUploadResponse.
| import 'package:omi/backend/schema/conversation.dart'; | |
| import 'package:omi/services/wals/wal.dart'; | |
| import 'package:omi/backend/schema/conversation.dart' show SyncUploadResponse; | |
| import 'package:omi/backend/schema/conversation.dart'; | |
| import 'package:omi/services/wals/wal.dart'; | |
| import 'package:omi/services/wals/wal_interfaces.dart'; |
Addresses Greptile review: - GCS path now includes job_id to prevent file overwrites on concurrent uploads - Move job_id generation before GCS upload loop - Reject empty CLOUD_TASKS_SECRET (prevents None==None bypass) - Validate payload uid matches Firestore job owner Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Prevents auth bypass when env var is unset (None == None). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Persists handled sync job IDs in SharedPreferences so _checkForMissedSyncCompletions skips already-processed jobs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Paused since #6157 was already done (will be closed soon) |
Summary
/v2/sync/uploadendpoint accepts audio files, stores them in GCS, and enqueues processing via Cloud Tasks./v2/sync/processcallback handles transcription + conversation creation./v2/sync/statusfor polling job state. Returns 202 immediately instead of blocking.syncUpload()replaces the old synchronous flow for auto/storage sync. FCMsync_completeddata message triggersSyncCompletedNotificationHandler→SyncProvider.onSyncCompleted()to fetch new conversations.google-cloud-tasksdependency,offline_sync_bucketfor GCS staging, and cleanup of orphaned sync jobs on FCM token invalidation.Test plan
offline_sync_bucket/{uid}/{job_id}//v2/sync/processsync_completedmessage arrives on device after processing🤖 Generated with Claude Code