Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c0a0557
Add conversationId + retry metadata to WAL model (#6318)
beastoin Apr 4, 2026
64a4ea7
Add finalizeCurrentSession, stampConversationId, orphan recovery to L…
beastoin Apr 4, 2026
751a8f3
Add durable auto-sync: finalize+stamp on session end, fallback timer,…
beastoin Apr 4, 2026
26dc724
Include in-flight audio seconds in WAL indicator (#6318)
beastoin Apr 4, 2026
3e90e6c
Add 12 unit tests for WAL recovery: model, stamp, orphan, in-flight, …
beastoin Apr 4, 2026
be4bf7c
Fix review findings: shouldStored gate, const list mutability, typing
beastoin Apr 4, 2026
1bb8771
Fix capture_provider: typed phoneSync, delayed orphan recovery
beastoin Apr 4, 2026
85863d7
Update tests for shouldStored gate and synced-frames skip
beastoin Apr 4, 2026
94a7230
Add walReady Completer so recovery waits for WAL initialization (#6318)
beastoin Apr 4, 2026
2d60423
Fix race, offline retry, and WAL readiness in capture provider (#6318)
beastoin Apr 4, 2026
f889b2d
Fix offline recovery retry and persist retry metadata per attempt (#6…
beastoin Apr 4, 2026
2278b2c
Handle partial sync failure in _syncSingleWal recovery path (#6318)
beastoin Apr 4, 2026
9403a54
Fix retry budget durability and mark unrecoverable WALs corrupted (#6…
beastoin Apr 4, 2026
1abb63b
Re-arm orphan recovery when WALs remain after transient failures (#6318)
beastoin Apr 4, 2026
1292f91
Add boundary tests: threshold edges, stamp window, walReady (#6318)
beastoin Apr 4, 2026
77c4c10
Polish WAL sync status widget: green dot, cloud icon, brighter text (…
beastoin Apr 5, 2026
ab08bdb
Add WAL widget polish screenshot evidence (#6318)
beastoin Apr 5, 2026
ae2a152
Remove 'will sync automatically' text, add loading spinner when synci…
beastoin Apr 5, 2026
98a397f
Remove .pr-evidence directory, use GCS for screenshot hosting (#6318)
beastoin Apr 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 48 additions & 31 deletions app/lib/pages/conversation_capturing/page.dart
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ class _ConversationCapturingPageState extends State<ConversationCapturingPage> w
},
),
),
_buildUnsyncedWalIndicator(provider.unsyncedSessionWals),
_buildUnsyncedWalIndicator(provider.unsyncedSessionWals, provider.inFlightAudioSeconds),
],
),
// Summary Tab
Expand Down Expand Up @@ -667,36 +667,53 @@ class _ConversationCapturingPageState extends State<ConversationCapturingPage> w
);
}

Widget _buildUnsyncedWalIndicator(List<Wal> unsyncedWals) {
if (unsyncedWals.isEmpty) return const SizedBox.shrink();
final totalSeconds = unsyncedWals.fold<int>(0, (sum, w) => sum + w.seconds);
final label = totalSeconds >= 60 ? '${totalSeconds ~/ 60}m ${totalSeconds % 60}s' : '${totalSeconds}s';
return Padding(
padding: const EdgeInsets.symmetric(horizontal: 32, vertical: 6),
child: Container(
padding: const EdgeInsets.symmetric(horizontal: 14, vertical: 8),
decoration: BoxDecoration(
color: const Color(0xFF1C1C24),
borderRadius: BorderRadius.circular(12),
border: Border.all(color: const Color(0xFF3A3A4A), width: 0.5),
),
child: Row(
mainAxisSize: MainAxisSize.min,
children: [
const Icon(Icons.lock_outline, size: 14, color: Color(0xFF8B8B9E)),
const SizedBox(width: 6),
Text(
context.l10n.audioSavedLocally(label),
style: const TextStyle(color: Color(0xFF8B8B9E), fontSize: 12, height: 1.3),
),
const SizedBox(width: 6),
const Text('·', style: TextStyle(color: Color(0xFF8B8B9E), fontSize: 12)),
const SizedBox(width: 6),
Text(
context.l10n.willSyncAutomatically,
style: const TextStyle(color: Color(0xFF6B6B7E), fontSize: 11, height: 1.3),
),
],
Widget _buildUnsyncedWalIndicator(List<Wal> unsyncedWals, int inFlightSeconds) {
final totalSeconds = unsyncedWals.fold<int>(0, (sum, w) => sum + w.seconds) + inFlightSeconds;
if (totalSeconds <= 0) return const SizedBox.shrink();
final minutes = totalSeconds ~/ 60;
final seconds = totalSeconds % 60;
final label = minutes > 0 ? '${minutes}m ${seconds}s' : '${seconds}s';
return SafeArea(
top: false,
child: Padding(
padding: const EdgeInsets.symmetric(horizontal: 24, vertical: 8),
child: Container(
padding: const EdgeInsets.symmetric(horizontal: 16, vertical: 10),
decoration: BoxDecoration(
color: const Color(0xFF1A1A24),
borderRadius: BorderRadius.circular(14),
border: Border.all(color: const Color(0xFF2E2E3E), width: 0.5),
),
child: Row(
mainAxisAlignment: MainAxisAlignment.center,
mainAxisSize: MainAxisSize.min,
children: [
Container(
width: 7,
height: 7,
decoration: const BoxDecoration(
color: Color(0xFF4CAF50),
shape: BoxShape.circle,
),
),
const SizedBox(width: 8),
Text(
context.l10n.audioSavedLocally(label),
style: const TextStyle(color: Color(0xFFE0E0E8), fontSize: 12.5, fontWeight: FontWeight.w500),
),
if (inFlightSeconds > 0) ...[
const SizedBox(width: 8),
const SizedBox(
width: 12,
height: 12,
child: CircularProgressIndicator(
strokeWidth: 1.5,
color: Color(0xFF6C6C80),
),
),
],
],
),
),
),
);
Expand Down
168 changes: 159 additions & 9 deletions app/lib/providers/capture_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ class CaptureProvider extends ChangeNotifier
peopleProvider = pp;
usageProvider = up;

// Run orphan recovery once after providers are wired up and WAL service is initialized.
// Uses Future.delayed to let the WAL service finish loading wals.json from disk.
if (!_orphanRecoveryDone) {
_orphanRecoveryDone = true;
Future.delayed(const Duration(seconds: 5), () => recoverOrphanedWals());
}

notifyListeners();
}

Expand Down Expand Up @@ -209,17 +216,32 @@ class CaptureProvider extends ChangeNotifier
@visibleForTesting
set testSessionStartSeconds(int v) => _sessionStartSeconds = v;

bool _orphanRecoveryDone = false;

/// Preserved session start for auto-sync after socket-driven conversation completion.
/// Set before _resetStateVariables() clears _sessionStartSeconds, consumed on ConversationEvent.
int _pendingAutoSyncSessionStart = 0;

/// Fallback timer that fires if ConversationEvent doesn't arrive within 30s.
Timer? _autoSyncFallbackTimer;

/// The conversation ID from ConversationProcessingStartedEvent, kept for fallback sync.
String? _pendingAutoSyncConversationId;

/// Future tracking the in-progress _finalizeAndStampSession(), so _autoSyncSessionWals()
/// can await it before querying disk WALs. Prevents race when backend responds fast.
Future<void>? _pendingFinalizeAndStamp;

/// Returns unsynced WALs belonging to the current capture session.
/// Empty when all frames have been streamed successfully (clean UI).
List<Wal> get unsyncedSessionWals {
if (_sessionStartSeconds == 0) return [];
return _wal.getSyncs().phone.getSessionUnsyncedWals(_sessionStartSeconds);
}

/// Seconds of audio still in memory buffer (not yet chunked/flushed to disk).
int get inFlightAudioSeconds => _wal.getSyncs().phone.getInFlightSeconds();

// Version counter for segments/photos content changes. Incremented on in-place mutations
// (e.g., translation updates, photo description changes) to signal UI rebuilds when
// list length and last-text remain unchanged.
Expand Down Expand Up @@ -869,6 +891,7 @@ class CaptureProvider extends ChangeNotifier
_keepAliveTimer?.cancel();
_connectionStateListener?.cancel();
_metricsTimer?.cancel();
_autoSyncFallbackTimer?.cancel();
_peopleRefreshFuture = null; // Clear in-flight tracker

super.dispose();
Expand Down Expand Up @@ -1108,17 +1131,38 @@ class CaptureProvider extends ChangeNotifier
if (event is ConversationProcessingStartedEvent) {
conversationProvider!.addProcessingConversation(event.memory);
_pendingAutoSyncSessionStart = _sessionStartSeconds;
_pendingAutoSyncConversationId = event.memory.id;

// Force-drain tail buffer, stamp WALs with conversation ID, then clear state.
// Store the future so _autoSyncSessionWals() can await it before querying disk WALs.
_pendingFinalizeAndStamp = _finalizeAndStampSession(_sessionStartSeconds, event.memory.id);

_resetStateVariables();

// Start 30s fallback timer in case ConversationEvent never arrives (WS disconnect)
_autoSyncFallbackTimer?.cancel();
_autoSyncFallbackTimer = Timer(const Duration(seconds: 30), () {
if (_pendingAutoSyncSessionStart > 0 && _pendingAutoSyncConversationId != null) {
final sessionStart = _pendingAutoSyncSessionStart;
final convId = _pendingAutoSyncConversationId!;
_pendingAutoSyncSessionStart = 0;
_pendingAutoSyncConversationId = null;
Logger.debug('Auto-sync fallback timer fired — syncing WALs to conversation $convId');
_autoSyncSessionWals(sessionStart, convId);
}
});
return;
}

if (event is ConversationEvent) {
event.memory.isNew = true;
conversationProvider!.removeProcessingConversation(event.memory.id);
_processConversationCreated(event.memory, event.messages.cast<ServerMessage>());
_autoSyncFallbackTimer?.cancel();
if (_pendingAutoSyncSessionStart > 0) {
final sessionStart = _pendingAutoSyncSessionStart;
_pendingAutoSyncSessionStart = 0;
_pendingAutoSyncConversationId = null;
_autoSyncSessionWals(sessionStart, event.memory.id);
}
return;
Comment on lines 1157 to 1168
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _finalizeAndStampSession race with ConversationEvent

_finalizeAndStampSession is fired-and-forgotten at line 1127, then ConversationEvent (which often follows in seconds) cancels the fallback timer and calls _autoSyncSessionWals. If _flush() inside finalizeCurrentSession hasn't completed yet when ConversationEvent arrives, the freshly drained WALs are still WalStorage.mem and getSessionUnsyncedWals (which filters for WalStorage.disk) silently returns empty — with the fallback timer already cancelled, the WALs won't auto-sync until the next app startup. _finalizeAndStampSession should be awaited before starting the fallback timer, or _autoSyncSessionWals should be chained to its completion rather than invoked independently in the ConversationEvent branch.

Expand Down Expand Up @@ -1193,6 +1237,11 @@ class CaptureProvider extends ChangeNotifier

Future<void> forceProcessingCurrentConversation() async {
final sessionStart = _sessionStartSeconds;

// Force-drain tail buffer before clearing state
final phoneSync = _wal.getSyncs().phone;
await phoneSync.finalizeCurrentSession();

_resetStateVariables();
conversationProvider!.addProcessingConversation(
ServerConversation(
Expand All @@ -1202,7 +1251,7 @@ class CaptureProvider extends ChangeNotifier
status: ConversationStatus.processing,
),
);
processInProgressConversation().then((result) {
processInProgressConversation().then((result) async {
if (result == null || result.conversation == null) {
conversationProvider!.removeProcessingConversation('0');
return;
Expand All @@ -1211,36 +1260,132 @@ class CaptureProvider extends ChangeNotifier
result.conversation!.isNew = true;
_processConversationCreated(result.conversation, result.messages);

// Auto-sync any missed WALs from this session to the new conversation
// Stamp WALs with conversation ID and auto-sync
if (sessionStart > 0 && result.conversation != null) {
await phoneSync.stampConversationId(sessionStart, result.conversation!.id);
_autoSyncSessionWals(sessionStart, result.conversation!.id);
}
});

return;
}

/// Force-drain tail buffer and stamp all session WALs with conversation ID.
/// Called from synchronous onMessageEventReceived — fire-and-forget async.
Future<void> _finalizeAndStampSession(int sessionStartSeconds, String conversationId) async {
try {
final phoneSync = _wal.getSyncs().phone;
await phoneSync.finalizeCurrentSession();
if (sessionStartSeconds > 0) {
await phoneSync.stampConversationId(sessionStartSeconds, conversationId);
}
} catch (e) {
Logger.debug('_finalizeAndStampSession error: $e');
}
}

Future<void> _autoSyncSessionWals(int sessionStartSeconds, String conversationId) async {
// Wait for finalize+stamp to complete so tail buffer WALs are on disk before querying.
if (_pendingFinalizeAndStamp != null) {
await _pendingFinalizeAndStamp;
_pendingFinalizeAndStamp = null;
}
final phoneSync = _wal.getSyncs().phone;
final unsyncedWals = phoneSync.getSessionUnsyncedWals(sessionStartSeconds);
if (unsyncedWals.isEmpty) return;

Logger.debug('Auto-syncing ${unsyncedWals.length} session WALs to conversation $conversationId');
for (final wal in unsyncedWals) {
if (wal.filePath == null) continue;
await _syncSingleWal(wal, conversationId, phoneSync);
}
}

/// Sync a single WAL to a conversation with retry and backoff.
/// Retries up to 3 times with exponential delays (5s, 10s, 20s).
/// Network/transient errors (SocketException, no connectivity) do NOT increment retryCount.
Future<void> _syncSingleWal(Wal wal, String conversationId, LocalWalSyncImpl phoneSync) async {
if (wal.filePath == null) {
Logger.debug('Auto-sync WAL ${wal.id}: no filePath, marking corrupted');
wal.status = WalStatus.corrupted;
await phoneSync.persistRetryMetadata(wal);
return;
}
final fullPath = await Wal.getFilePath(wal.filePath);
if (fullPath == null) {
Logger.debug('Auto-sync WAL ${wal.id}: path resolution failed, marking corrupted');
wal.status = WalStatus.corrupted;
await phoneSync.persistRetryMetadata(wal);
return;
}
final file = File(fullPath);
if (!file.existsSync()) {
Logger.debug('Auto-sync WAL ${wal.id}: file missing, marking corrupted');
wal.status = WalStatus.corrupted;
await phoneSync.persistRetryMetadata(wal);
return;
}

// Remaining attempts = maxRetries minus already-persisted retryCount
const maxRetries = 3;
const baseDelay = 5;
final startAttempt = wal.retryCount;

for (int attempt = startAttempt; attempt < maxRetries; attempt++) {
if (!_isConnected) {
Logger.debug('Auto-sync WAL ${wal.id}: offline, aborting without incrementing retryCount');
return;
}
try {
final fullPath = await Wal.getFilePath(wal.filePath);
if (fullPath == null) continue;
final file = File(fullPath);
if (!file.existsSync()) continue;
await syncLocalFilesV2([file], conversationId: conversationId);
final result = await syncLocalFilesV2([file], conversationId: conversationId);
if (result.hasPartialFailure) {
throw Exception('Partial sync failure: ${result.failedSegments}/${result.totalSegments} segments failed');
}
await phoneSync.markWalSyncedAndPersist(wal);
return;
} on SocketException {
Logger.debug('Auto-sync WAL ${wal.id}: network error, aborting without incrementing retryCount');
return;
} catch (e) {
Logger.debug('Auto-sync WAL ${wal.id} failed: $e');
wal.retryCount = attempt + 1;
wal.lastRetryAt = DateTime.now().millisecondsSinceEpoch ~/ 1000;
await phoneSync.persistRetryMetadata(wal);
if (attempt < maxRetries - 1) {
final delay = baseDelay * (1 << attempt); // 5s, 10s, 20s
Logger.debug('Auto-sync WAL ${wal.id} attempt ${attempt + 1} failed, retrying in ${delay}s: $e');
await Future.delayed(Duration(seconds: delay));
} else {
Logger.debug('Auto-sync WAL ${wal.id} failed after $maxRetries attempts: $e');
}
}
}
}

/// Recover orphaned WALs on startup. Called once after providers are initialized.
/// Finds WALs with conversationId set but status still miss, and syncs them.
/// Skips recovery if offline — retryCount is not incremented for transient failures.
Future<void> recoverOrphanedWals() async {
if (!_isConnected) {
Logger.debug('Startup recovery: offline, skipping orphan WAL sync');
_orphanRecoveryDone = false; // Allow retry on next updateProviderInstances
return;
}
final phoneSync = _wal.getSyncs().phone;
await phoneSync.walReady; // Wait for WALs to be loaded from disk
final orphaned = phoneSync.getOrphanedWals();
if (orphaned.isEmpty) return;

Logger.debug('Startup recovery: found ${orphaned.length} orphaned WALs to sync');
for (final wal in orphaned) {
await _syncSingleWal(wal, wal.conversationId!, phoneSync);
}
// Check if any orphaned WALs remain (e.g., transient SocketException while "online").
// If so, allow onConnectionStateChanged to re-trigger recovery on next transition.
final remaining = phoneSync.getOrphanedWals();
if (remaining.isNotEmpty) {
_orphanRecoveryDone = false;
}
}

Future<void> _processConversationCreated(ServerConversation? conversation, List<ServerMessage> messages) async {
if (conversation == null) return;

Expand Down Expand Up @@ -1443,6 +1588,11 @@ class CaptureProvider extends ChangeNotifier

void onConnectionStateChanged(bool isConnected) {
_isConnected = isConnected;
// When coming back online, retry orphan recovery if it was skipped due to being offline
if (isConnected && !_orphanRecoveryDone) {
_orphanRecoveryDone = true;
recoverOrphanedWals();
}
notifyListeners();
}

Expand Down
Loading
Loading