Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 189 additions & 14 deletions src/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,22 @@
import sqlite3
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
from typing import Any, Dict, List, Optional, Sequence

from src.config import settings
from src.storage.base import BaseVectorStore, IndexStats, SearchResult
from src.storage.memory_lifecycle import (
CONTENT_HASH_KEY,
FORGET_REASON_KEY,
FORGOTTEN_AT_KEY,
IS_CURRENT_KEY,
PARENT_MEMORY_ID_KEY,
VERSION_KEY,
build_lifecycle_metadata,
compute_memory_hash,
is_retrievable_memory,
utc_now_iso,
)
from src.utils.exceptions import VectorStoreValidationError


Expand All @@ -43,6 +55,9 @@ def _cosine_similarity(a: Sequence[float], b: Sequence[float]) -> float:
return max(0.0, min(1.0, (dot / (norm_a * norm_b) + 1.0) / 2.0))


_DEDUP_SCOPE_KEYS = ("user_id", "tenant_id", "org_id", "workspace_id", "project_id")


class SQLiteVectorStore(BaseVectorStore):
"""Small embedded vector store for single-user local testing.

Expand Down Expand Up @@ -101,16 +116,29 @@ def add(

ids = ids or [str(uuid.uuid4()) for _ in texts]
metadata = metadata or [{} for _ in texts]
rows = [
(
self._namespace,
vec_id,
text,
json.dumps([float(v) for v in embedding]),
json.dumps(meta or {}),
output_ids: List[str] = []
rows = []
for text, embedding, vec_id, meta in zip(texts, embeddings, ids, metadata):
lifecycle_meta = build_lifecycle_metadata(text, meta)
existing_id = self._find_current_by_hash(
lifecycle_meta[CONTENT_HASH_KEY],
lifecycle_meta,
)
for text, embedding, vec_id, meta in zip(texts, embeddings, ids, metadata)
]
if existing_id:
output_ids.append(existing_id)
continue
output_ids.append(vec_id)
rows.append(
(
self._namespace,
vec_id,
text,
json.dumps([float(v) for v in embedding]),
json.dumps(lifecycle_meta),
)
)
if not rows:
return output_ids
self._conn.executemany(
"""
INSERT INTO xmem_vectors(namespace, id, content, embedding, metadata)
Expand All @@ -124,7 +152,7 @@ def add(
rows,
)
self._conn.commit()
return ids
return output_ids

def search(
self,
Expand All @@ -145,7 +173,7 @@ def search(
results: List[SearchResult] = []
for row in rows:
meta = json.loads(row["metadata"] or "{}")
if not _metadata_matches(meta, filters):
if not is_retrievable_memory(meta) or not _metadata_matches(meta, filters):
continue
embedding = json.loads(row["embedding"])
results.append(
Expand Down Expand Up @@ -175,6 +203,11 @@ def update(
return False
current_meta = json.loads(row["metadata"] or "{}")
current_meta.update(metadata or {})
new_text = text if text is not None else row["content"]
current_meta[CONTENT_HASH_KEY] = compute_memory_hash(new_text)
existing_id = self._find_current_by_hash(current_meta[CONTENT_HASH_KEY], current_meta)
if existing_id and existing_id != id:
return False
new_embedding = embedding if embedding is not None else json.loads(row["embedding"])
if len(new_embedding) != self._dimension:
raise VectorStoreValidationError(
Expand All @@ -188,7 +221,7 @@ def update(
WHERE namespace = ? AND id = ?
""",
(
text if text is not None else row["content"],
new_text,
json.dumps([float(v) for v in new_embedding]),
json.dumps(current_meta),
self._namespace,
Expand All @@ -198,6 +231,117 @@ def update(
self._conn.commit()
return True

def add_version(
self,
parent_id: str,
text: str,
embedding: List[float],
id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
"""Create a new current memory version and keep the parent as history."""

parent = self.get([parent_id])
if not parent:
return None
Comment thread
greptile-apps[bot] marked this conversation as resolved.
if not is_retrievable_memory(parent[0]["metadata"] or {}):
return None
if len(embedding) != self._dimension:
raise VectorStoreValidationError(
f"Embedding dimension {len(embedding)} doesn't match {self._dimension}",
operation="add_version",
)

parent_meta = dict(parent[0]["metadata"] or {})
root_parent_id = parent_meta.get(PARENT_MEMORY_ID_KEY) or parent_id
next_version = int(parent_meta.get(VERSION_KEY) or 1) + 1
new_id = id or str(uuid.uuid4())
new_meta = build_lifecycle_metadata(
text,
metadata,
parent_memory_id=root_parent_id,
version=next_version,
is_current=True,
)
existing_id = self._find_current_by_hash(new_meta[CONTENT_HASH_KEY], new_meta)
if existing_id:
if existing_id == parent_id:
return existing_id
parent_meta[IS_CURRENT_KEY] = False
with self._conn:
self._conn.execute(
"""
UPDATE xmem_vectors
SET metadata = ?, updated_at = CURRENT_TIMESTAMP
WHERE namespace = ? AND id = ?
""",
(json.dumps(parent_meta), self._namespace, parent_id),
)
return existing_id

parent_meta[IS_CURRENT_KEY] = False
with self._conn:
self._conn.execute(
"""
UPDATE xmem_vectors
SET metadata = ?, updated_at = CURRENT_TIMESTAMP
WHERE namespace = ? AND id = ?
""",
(json.dumps(parent_meta), self._namespace, parent_id),
)
self._conn.execute(
"""
INSERT INTO xmem_vectors(namespace, id, content, embedding, metadata)
VALUES (?, ?, ?, ?, ?)
""",
(
self._namespace,
new_id,
text,
json.dumps([float(v) for v in embedding]),
json.dumps(new_meta),
),
)
return new_id

def forget(
self,
ids: List[str],
reason: Optional[str] = None,
hard_delete: bool = False,
) -> bool:
"""Soft-forget memories by default, preserving audit history."""

if hard_delete:
return self.delete(ids)
if not ids:
return True

placeholders = ",".join("?" for _ in ids)
rows = self._conn.execute(
f"SELECT id, metadata FROM xmem_vectors "
f"WHERE namespace = ? AND id IN ({placeholders})",
[self._namespace, *ids],
).fetchall()

now = utc_now_iso()
updates = []
for row in rows:
meta = json.loads(row["metadata"] or "{}")
meta[IS_CURRENT_KEY] = False
meta[FORGOTTEN_AT_KEY] = now
meta[FORGET_REASON_KEY] = reason
updates.append((json.dumps(meta), self._namespace, row["id"]))

if updates:
self._conn.executemany(
"UPDATE xmem_vectors SET metadata = ?, updated_at = CURRENT_TIMESTAMP "
"WHERE namespace = ? AND id = ?",
updates,
)
self._conn.commit()
return True

def delete(self, ids: List[str]) -> bool:
if not ids:
return True
Expand Down Expand Up @@ -239,10 +383,41 @@ def search_by_metadata(
results: List[SearchResult] = []
for row in rows:
meta = json.loads(row["metadata"] or "{}")
if _metadata_matches(meta, filters):
if is_retrievable_memory(meta) and _metadata_matches(meta, filters):
results.append(SearchResult(id=row["id"], content=row["content"], score=1.0, metadata=meta))
return results[:top_k]

def _find_current_by_hash(
self,
content_hash: str,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
clauses = [
"namespace = ?",
f"json_extract(metadata, '$.{CONTENT_HASH_KEY}') = ?",
f"json_extract(metadata, '$.{IS_CURRENT_KEY}') = 1",
f"json_extract(metadata, '$.{FORGOTTEN_AT_KEY}') IS NULL",
]
params: List[Any] = [self._namespace, content_hash]
scope = {
key: (metadata or {}).get(key)
for key in _DEDUP_SCOPE_KEYS
if (metadata or {}).get(key) is not None
}
if scope:
for key, value in scope.items():
clauses.append(f"json_extract(metadata, '$.{key}') = ?")
params.append(value)
else:
for key in _DEDUP_SCOPE_KEYS:
clauses.append(f"json_type(metadata, '$.{key}') IS NULL")

row = self._conn.execute(
f"SELECT id FROM xmem_vectors WHERE {' AND '.join(clauses)} LIMIT 1",
params,
).fetchone()
return row["id"] if row else None

async def search_by_text(
self,
query_text: str,
Expand Down
63 changes: 63 additions & 0 deletions src/storage/memory_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Memory lifecycle metadata helpers.

These helpers keep duplicate detection, version lineage, and soft-forget
metadata consistent across vector-store implementations.
"""

from __future__ import annotations

import hashlib
import re
from datetime import datetime, timezone
from typing import Any, Dict, Optional

CONTENT_HASH_KEY = "content_hash"
PARENT_MEMORY_ID_KEY = "parent_memory_id"
VERSION_KEY = "version"
IS_CURRENT_KEY = "is_current"
FORGOTTEN_AT_KEY = "forgotten_at"
FORGET_REASON_KEY = "forget_reason"


def normalize_memory_content(content: str) -> str:
"""Normalize memory text before hashing to catch whitespace-only duplicates."""

return re.sub(r"\s+", " ", content.strip()).casefold()


def compute_memory_hash(content: str) -> str:
"""Return a stable SHA-256 digest for normalized memory content."""

normalized = normalize_memory_content(content)
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()


def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()


def build_lifecycle_metadata(
content: str,
metadata: Optional[Dict[str, Any]] = None,
*,
parent_memory_id: Optional[str] = None,
version: int = 1,
is_current: bool = True,
) -> Dict[str, Any]:
"""Merge caller metadata with lifecycle fields without losing custom keys."""

merged = dict(metadata or {})
merged[CONTENT_HASH_KEY] = compute_memory_hash(content)
merged[PARENT_MEMORY_ID_KEY] = parent_memory_id
merged[VERSION_KEY] = version
merged[IS_CURRENT_KEY] = is_current
merged[FORGOTTEN_AT_KEY] = None
merged[FORGET_REASON_KEY] = None
return merged
Comment thread
greptile-apps[bot] marked this conversation as resolved.


def is_retrievable_memory(metadata: Optional[Dict[str, Any]]) -> bool:
"""Return False for superseded or soft-forgotten memory records."""

meta = metadata or {}
return meta.get(IS_CURRENT_KEY, True) is not False and not meta.get(FORGOTTEN_AT_KEY)
Loading
Loading