refactor(memory): canonicalize memory imports (#50)

This commit is contained in:
Nicholai
2026-06-03 22:31:15 -06:00
committed by GitHub
parent a2e691da2b
commit 4dc11cfe6b
5 changed files with 122 additions and 589 deletions

View File

@@ -43,6 +43,16 @@ class MemoryService:
os.path.join(data_dir, "memory_vectors")
) else None
@staticmethod
def _to_memory(entry: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> Memory:
return Memory(
id=entry.get("id", ""),
text=entry.get("text", ""),
timestamp=entry.get("timestamp", 0),
session_id=entry.get("session_id"),
metadata=metadata or {},
)
async def remember(self, text: str, session_id: Optional[str] = None) -> Memory:
"""
Store a new memory.
@@ -54,31 +64,19 @@ class MemoryService:
Returns:
Created Memory object
"""
import uuid
import time
entry = self.manager.add_entry(text)
if session_id:
entry["session_id"] = session_id
memory_id = str(uuid.uuid4())[:8]
timestamp = int(time.time())
entry = {
"id": memory_id,
"text": text,
"timestamp": timestamp,
"session_id": session_id,
}
self.manager.add_memory(entry)
memories = self.manager.load_all()
memories.append(entry)
self.manager.save(memories)
# Also add to vector store if available
if self.vector_store:
self.vector_store.add(text, {"id": memory_id, "session_id": session_id})
if self.vector_store and self.vector_store.healthy:
self.vector_store.add(entry["id"], entry["text"])
return Memory(
id=memory_id,
text=text,
timestamp=timestamp,
session_id=session_id,
)
return self._to_memory(entry)
async def recall(self, query: str, top_k: int = 5) -> MemorySearchResult:
"""
@@ -92,47 +90,36 @@ class MemoryService:
MemorySearchResult with matching memories
"""
# Try vector search first
if self.vector_store:
all_memories = self.manager.load_all()
by_id = {m.get("id"): m for m in all_memories}
if self.vector_store and self.vector_store.healthy:
results = self.vector_store.search(query, k=top_k)
memories = [
Memory(
id=r.get("id", ""),
text=r.get("text", ""),
timestamp=r.get("timestamp", 0),
session_id=r.get("session_id"),
metadata=r.get("metadata", {}),
)
for r in results
if isinstance(r, dict)
]
return MemorySearchResult(memories=memories, query=query, total=len(memories))
found = []
for result in results:
entry = by_id.get(result.get("memory_id"))
if entry:
found.append(self._to_memory(entry, metadata={"score": result.get("score")}))
if found:
return MemorySearchResult(memories=found, query=query, total=len(found))
# Fallback to keyword search
results = self.manager.search_memories(query, limit=top_k)
memories = [
Memory(
id=m.get("id", ""),
text=m.get("text", ""),
timestamp=m.get("timestamp", 0),
session_id=m.get("session_id"),
)
for m in results
]
results = self.manager.get_relevant_memories(query, all_memories, max_items=top_k)
memories = [self._to_memory(m) for m in results]
return MemorySearchResult(memories=memories, query=query, total=len(memories))
def get_all(self, limit: int = 100) -> List[Memory]:
"""Get all memories."""
memories = self.manager.get_memories(limit=limit)
return [
Memory(
id=m.get("id", ""),
text=m.get("text", ""),
timestamp=m.get("timestamp", 0),
session_id=m.get("session_id"),
)
for m in memories
]
memories = self.manager.load_all()[:limit]
return [self._to_memory(m) for m in memories]
def delete(self, memory_id: str) -> bool:
"""Delete a memory by ID."""
return self.manager.delete_memory(memory_id)
memories = self.manager.load_all()
remaining = [m for m in memories if m.get("id") != memory_id]
if len(remaining) == len(memories):
return False
self.manager.save(remaining)
if self.vector_store and self.vector_store.healthy:
self.vector_store.remove(memory_id)
return True