Skip to content

feat: async offline sync via Cloud Tasks + FCM#6125

Draft
mdmohsin7 wants to merge 18 commits intomainfrom
mdmohsin7/auto-async-sync
Draft

feat: async offline sync via Cloud Tasks + FCM#6125
mdmohsin7 wants to merge 18 commits intomainfrom
mdmohsin7/auto-async-sync

Conversation

@mdmohsin7
Copy link
Copy Markdown
Member

Summary

  • Backend: New /v2/sync/upload endpoint accepts audio files, stores them in GCS, and enqueues processing via Cloud Tasks. /v2/sync/process callback handles transcription + conversation creation. /v2/sync/status for polling job state. Returns 202 immediately instead of blocking.
  • App: syncUpload() replaces the old synchronous flow for auto/storage sync. FCM sync_completed data message triggers SyncCompletedNotificationHandlerSyncProvider.onSyncCompleted() to fetch new conversations.
  • Infra: Adds google-cloud-tasks dependency, offline_sync_bucket for GCS staging, and cleanup of orphaned sync jobs on FCM token invalidation.

Test plan

  • Upload multiple .bin files → verify 202 response with job_id
  • Confirm files land in GCS offline_sync_bucket/{uid}/{job_id}/
  • Verify Cloud Tasks enqueues and calls /v2/sync/process
  • Check FCM sync_completed message arrives on device after processing
  • Confirm app fetches and displays new conversations on FCM receipt
  • Test error path: invalid files, missing timestamps, restricted accounts
  • Test token invalidation cleans up pending sync jobs

🤖 Generated with Claude Code

mdmohsin7 and others added 14 commits March 28, 2026 17:10
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 28, 2026

Greptile Summary

This PR introduces an asynchronous offline sync pipeline: audio .bin files are uploaded to GCS via POST /v2/sync/upload, which returns 202 immediately and enqueues a Cloud Tasks job. The worker endpoint POST /v2/sync/process handles the full decode → VAD → Deepgram → conversation creation flow, then delivers results via a silent FCM offline_sync_completed data message. The Flutter side adds SyncCompletedNotificationHandler, updates SyncProvider to listen for the FCM event, and falls back to polling /v2/sync/jobs on app open for missed pushes.

Key issues found:

  • GCS path collision (P1)backend/routers/sync.py line 1007 constructs the blob path as uploads/{uid}/{filename}, omitting job_id. Concurrent uploads of the same filename overwrite each other in GCS, causing the earlier job's Cloud Task to process wrong data or fail with a missing-blob error.
  • Auth bypass when env var unset (P1)backend/utils/cloud_tasks.py line 12: os.getenv('CLOUD_TASKS_SECRET') defaults to None. The endpoint guard passes when both sides are None, meaning any request without the auth header is accepted.
  • Repeated processing of completed jobs on every app open (P1)SyncProvider._checkForMissedSyncCompletions fetches all completed jobs with no record of which have already been handled, re-processing the same jobs on every app launch for up to 7 days.
  • uid from Cloud Task payload not validated against Firestore job (P1)process_sync_job uses uid from the request body for conversation creation without verifying it matches job['uid'] in Firestore.
  • Duplicate import in local_wal_sync.dart (P2)conversation.dart is imported twice; the show SyncUploadResponse import is redundant.

Confidence Score: 3/5

Not safe to merge — two security issues (auth bypass, uid mismatch) and one data-integrity bug (GCS path collision) need to be fixed first.

Four P1 findings: a GCS path collision that can cause silent audio data loss, a security bypass when the Cloud Tasks shared-secret env var is unset, a uid-from-payload not validated against the Firestore document (privilege escalation), and a duplicate-processing loop on every app open. The overall architecture is sound and the majority of files are clean, but the three backend issues affect correctness and security on the primary upload path.

backend/routers/sync.py (GCS path + uid validation), backend/utils/cloud_tasks.py (auth bypass), app/lib/providers/sync_provider.dart (duplicate processing)

Important Files Changed

Filename Overview
backend/routers/sync.py Adds /v2/sync/upload, /v2/sync/process, and /v2/sync/jobs endpoints; has GCS path collision bug (missing job_id in blob path) and uid-from-payload not cross-validated against Firestore job document
backend/utils/cloud_tasks.py New Cloud Tasks helper; CLOUD_TASKS_SECRET defaults to None if env var is unset, allowing auth bypass on the /v2/sync/process callback
app/lib/providers/sync_provider.dart Adds FCM sync-completed handling and _checkForMissedSyncCompletions; the missed-completion check re-processes all completed jobs on every app open with no idempotency guard
app/lib/services/notifications/sync_completed_notification_handler.dart New FCM data-message handler that broadcasts SyncCompletedEvent via a static broadcast stream; follows existing MergeNotificationHandler pattern cleanly
app/lib/services/wals/local_wal_sync.dart Switches WAL upload from synchronous syncLocalFiles to async syncUpload; duplicate conversation.dart import on line 12 is redundant
backend/utils/notifications.py Adds send_sync_completed_message; correctly encodes list fields as comma-joined strings for FCM data payload
backend/utils/other/notifications.py Adds cron-based cleanup for old and stale offline sync jobs; logic is sound and bounded with reasonable limits
backend/utils/other/storage.py Adds offline_sync_bucket env var read; trivial one-line change

Sequence Diagram

sequenceDiagram
    participant App
    participant Backend
    participant GCS
    participant Firestore
    participant CloudTasks
    participant FCM

    App->>Backend: POST /v2/sync/upload (multipart .bin files)
    Backend->>Backend: Validate files & metadata
    Backend->>GCS: Upload .bin files to uploads/{uid}/{job_id}/{filename}
    Backend->>Firestore: Create offline_sync_jobs/{job_id} (status: pending)
    Backend->>CloudTasks: Enqueue process task (job_id, uid)
    Backend-->>App: 202 Accepted {job_id, file_count}

    Note over CloudTasks,Backend: Async processing (up to 15 min)

    CloudTasks->>Backend: POST /v2/sync/process (shared secret header)
    Backend->>Firestore: Load job, mark status: processing
    Backend->>GCS: Download .bin files to /tmp/
    Backend->>Backend: decode → VAD → Deepgram → create conversations
    Backend->>Firestore: Update job (status: completed, conversation_ids)
    Backend->>FCM: send_sync_completed_message (silent push)
    Backend->>GCS: Delete .bin files

    FCM-->>App: offline_sync_completed data message
    App->>App: SyncCompletedNotificationHandler broadcasts SyncCompletedEvent
    App->>App: SyncProvider._onSyncCompleted → _processConversationResults
    App->>Backend: Fetch new/updated conversations by ID
Loading

Reviews (1): Last reviewed commit: "refactor(sync): rename syncUploadV2 to s..." | Re-trigger Greptile

Comment thread backend/routers/sync.py Outdated
Comment on lines +1007 to +1010
gcs_path = f"uploads/{uid}/{meta['filename']}"
blob = bucket.blob(gcs_path)
blob.upload_from_file(f.file, content_type='application/octet-stream')
gcs_paths.append(gcs_path)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 GCS path missing job_id — file overwrite race condition

The GCS path does not include the job_id, so two concurrent uploads of the same filename from the same user will collide: the second upload_from_file call silently overwrites the first blob before the first job's Cloud Task has had a chance to download and process it. The first job's Cloud Task would then either process the wrong batch's audio or fail with a missing-blob error, leading to silent data loss.

The PR description says paths should be offline_sync_bucket/{uid}/{job_id}/, but the code omits the segment.

Suggested change
gcs_path = f"uploads/{uid}/{meta['filename']}"
blob = bucket.blob(gcs_path)
blob.upload_from_file(f.file, content_type='application/octet-stream')
gcs_paths.append(gcs_path)
gcs_path = f"uploads/{uid}/{job_id}/{meta['filename']}"

Comment thread backend/utils/cloud_tasks.py Outdated

CLOUD_TASKS_QUEUE = os.getenv('CLOUD_TASKS_QUEUE')
BACKEND_PUBLIC_URL = os.getenv('BACKEND_PUBLIC_URL')
CLOUD_TASKS_SECRET = os.getenv('CLOUD_TASKS_SECRET')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unset env var causes auth bypass on Cloud Tasks callback

os.getenv('CLOUD_TASKS_SECRET') returns None when the environment variable is not configured. In process_sync_job (sync.py) the guard is if secret != CLOUD_TASKS_SECRET. When the env var is absent, both sides are None — the inequality evaluates to False — so any unauthenticated request to /v2/sync/process passes.

The fix is to use a non-null fallback so a missing env var never matches a missing header. Replace the plain os.getenv(...) with os.getenv(...) or '', and update the endpoint guard to also reject an empty-string match with if not secret or secret != CLOUD_TASKS_SECRET.

Comment on lines +175 to 192
void _checkForMissedSyncCompletions() async {
try {
await Future.delayed(const Duration(seconds: 5));
final completedJobs = await getSyncJobs(status: 'completed');
for (final job in completedJobs) {
if (job.newConversationIds.isNotEmpty || job.updatedConversationIds.isNotEmpty) {
Logger.debug('SyncProvider: Found missed sync completion job=${job.id}');
final result = SyncLocalFilesResponse(
newConversationIds: job.newConversationIds,
updatedConversationIds: job.updatedConversationIds,
);
await _processConversationResults(result);
}
}
} catch (e) {
Logger.debug('SyncProvider: Error checking missed sync completions: $e');
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _checkForMissedSyncCompletions re-processes all completed jobs on every app open

getSyncJobs(status: 'completed') returns up to 20 completed jobs. Since completed jobs live in Firestore for 7 days, every app launch triggers _processConversationResults for every one of those jobs — including ones already handled by a previous FCM delivery or prior app open. There is no client-side record of which job IDs have already been processed.

The standard fix is to persist the set of already-processed job IDs in SharedPreferences and skip any that have been seen before. The same job ID fired via FCM in _onSyncCompleted should also be recorded so the two code paths stay in sync.

Comment thread backend/routers/sync.py
Comment on lines +1066 to +1083
payload = await request.json()
job_id = payload["job_id"]
uid = payload["uid"]

# 2. Load job from Firestore
job_ref = db.collection('offline_sync_jobs').document(job_id)
job_snap = job_ref.get()
if not job_snap.exists:
logger.warning(f"Offline sync job {job_id} not found, skipping")
return {"status": "skipped", "reason": "job not found"}

job = job_snap.to_dict()

# Already completed (idempotency — Cloud Tasks may redeliver)
if job['status'] == 'completed':
logger.info(f"Offline sync job {job_id} already completed, skipping")
return {"status": "skipped", "reason": "already completed"}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 uid from Cloud Task payload not cross-validated against Firestore job document

After loading the job from Firestore using job_id, the code proceeds to use uid from the request payload for all downstream operations (conversation creation via process_segment, speech/DG usage recording, fair-use checks). The job document itself contains the authoritative uid — but it is never compared to the one in the payload.

Any caller that possesses the shared secret can craft a Cloud Task body with an arbitrary uid, causing audio uploaded by user A to be transcribed and stored as conversations for user B.

Add a validation step right after loading the job:

job = job_snap.to_dict()

# Validate that the uid in the payload matches the job's owner
if job.get('uid') != uid:
    logger.error(f"process_sync_job uid mismatch: payload={uid} job={job.get('uid')}")
    raise HTTPException(status_code=403, detail="Forbidden")

Comment on lines +10 to +12
import 'package:omi/backend/schema/conversation.dart';
import 'package:omi/services/wals/wal.dart';
import 'package:omi/backend/schema/conversation.dart' show SyncUploadResponse;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Duplicate conversation.dart import

conversation.dart is imported twice: once as a wildcard (line 10) and again with a show clause (line 12). The show SyncUploadResponse import is entirely redundant because the wildcard already exposes SyncUploadResponse.

Suggested change
import 'package:omi/backend/schema/conversation.dart';
import 'package:omi/services/wals/wal.dart';
import 'package:omi/backend/schema/conversation.dart' show SyncUploadResponse;
import 'package:omi/backend/schema/conversation.dart';
import 'package:omi/services/wals/wal.dart';
import 'package:omi/services/wals/wal_interfaces.dart';

mdmohsin7 and others added 4 commits March 28, 2026 17:40
Addresses Greptile review:
- GCS path now includes job_id to prevent file overwrites on concurrent uploads
- Move job_id generation before GCS upload loop
- Reject empty CLOUD_TASKS_SECRET (prevents None==None bypass)
- Validate payload uid matches Firestore job owner

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Prevents auth bypass when env var is unset (None == None).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Persists handled sync job IDs in SharedPreferences so
_checkForMissedSyncCompletions skips already-processed jobs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@mdmohsin7
Copy link
Copy Markdown
Member Author

Paused since #6157 was already done (will be closed soon)

@mdmohsin7 mdmohsin7 marked this pull request as draft March 30, 2026 17:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant