memory-engine-service
Core Brain & Orchestration Service
| Property | Value |
|---|---|
| Repository | memory-engine-service |
| Language | Python 3.11+ |
| Type | REST/gRPC API Service |
| Framework | FastAPI |
Purpose
The Memory Engine Service acts as the central brain of the platform. It orchestrates all reads and writes, ensuring data consistency between the Graph (JanusGraph) and Vector (Qdrant) stores.
It is the only component allowed to write to these low-level stores directly for "Memory Block" operations (although Curator Worker may write via this service's internal modules or via API).
Key Responsibilities:
- Atomic Writes: Ensures Graph and Vector stores are updated in sync.
- Tenant Enforcement: Validates and propagates tenant context to storage layers.
- Query Orchestration: Combines semantic search (vector) with structural context (graph) for retrieval.
- Memory Pack Assembly: Aggregates memory blocks into structured packs (e.g., "Profile", "Business Context").
Repository Structure
memory-engine-service/
├── pyproject.toml
├── README.md
├── Dockerfile
├── src/
│ └── memory_engine/
│ ├── __init__.py
│ ├── main.py # App entry point
│ ├── config.py # Service config
│ │
│ ├── api/ # API Layer
│ │ ├── __init__.py
│ │ ├── dependencies.py # DI & Auth
│ │ ├── middleware.py # Logging/Cors
│ │ └── v1/
│ │ ├── __init__.py
│ │ ├── memory.py # CRUD Operations
│ │ ├── query.py # Search/Retrieval
│ │ ├── packs.py # Memory Packs
│ │ └── graph.py # Graph exploration
│ │
│ ├── core/ # Business Logic
│ │ ├── __init__.py
│ │ ├── orchestration.py # Write coordination
│ │ ├── ranking.py # Result re-ranking
│ │ └── packs.py # Pack assembly logic
│ │
│ └── services/ # Infrastructure Adapters
│ ├── __init__.py
│ ├── vector_store.py # Qdrant wrapper
│ └── graph_store.py # JanusGraph wrapper
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_api/
│ └── test_core/
│
└── docs/
├── api_reference.md
└── architecture.md
Dependencies
# pyproject.toml
[project]
name = "memory-engine-service"
version = "0.1.0"
description = "Core orchestration service for Memory Platform"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.109",
"uvicorn[standard]>=0.27",
"pydantic>=2.5",
"pydantic-settings>=2.1",
"memory-domain>=0.1.0",
"memory-store-janusgraph>=0.1.0",
"memory-store-qdrant>=0.1.0",
"structlog>=23.2",
"opendal>=0.45", # For localized file/blob access if needed
]
Architecture & Flows
1. Dual-Write Strategy (The "Brain")
When a memory block is created or updated:
- Validation:
memory-domainmodels validate the payload. - Vector Store:
- Generate embedding.
- Upsert to Qdrant.
- Store full payload in vector metadata for fast semantic retrieval.
- Graph Store:
- Create/Update
MemoryBlockvertex. - Create/Update relationships:
ABOUT-> Subject (User/Org)ABOUT-> Topics (extracted from tags)DERIVED_FROM-> Source (Document/Conversation)
- Create/Update
Failure Handling:
- If Vector fails: Abort.
- If Graph fails: Log error, queue async repair (or dual-write transaction if feasible, but usually eventual consistency is acceptable if Vector succeeds).
2. Read Orchestration
When a query is received:
- Vector Search: Query Qdrant for top-K semantically relevant blocks.
- Graph Context (Optional):
- Fetch immediate neighborhood of relevant blocks to find connected topics.
- Boost scores based on graph centrality or recency.
- Ranking:
- Apply
memory-domainscoring logic (Salience * Stability * Confidence). - Apply Decay function based on access time.
- Apply
- Response: Return ranked list.
API Endpoints (V1)
Memory Operations
-
POST /api/v1/memory- Body:
MemoryCreateRequest - Action: Orchestrate dual-write.
- Returns:
MemoryBlock(with ID)
- Body:
-
GET /api/v1/memory/{id}- Returns:
MemoryBlock
- Returns:
-
DELETE /api/v1/memory/{id}- Action: Soft delete in both stores (or hard delete if requested).
Query & Retrieval
-
POST /api/v1/memory/query- Body:
MemoryQueryRequest(text, limits, filters) - Action: Semantic search + ranking.
- Body:
-
POST /api/v1/memory/query/working-set- Purpose: Get "active context" for an agent session.
- Logic: High salience queries + recent interaction history from Graph.
Memory Packs
GET /api/v1/memory/packs- Params:
subject_type,subject_id,pack_types(optional) - Action: Assemble filtered MemoryPacks (e.g., "Basic Profile", "Preferences").
- Params:
Graph Exploration
POST /api/v1/graph/neighbors- Body:
GraphNeighborsRequest - Purpose: Visualizer support or deep-context retrieval.
- Body:
Configuration
# src/memory_engine/config.py
from pydantic_settings import BaseSettings
from memory_store_qdrant.config import QdrantConfig
from memory_store_janusgraph.config import JanusGraphConfig
class ServiceConfig(BaseSettings):
"""Main Service Configuration."""
API_KEYS: list[str] = [] # Basic auth for MCP/Workers
PORT: int = 8000
WORKERS: int = 4
# Nested configs for stores
qdrant: QdrantConfig = QdrantConfig()
janus: JanusGraphConfig = JanusGraphConfig()
Resilience & Observability
Circuit Breakers
The service MUST implement circuit breakers (e.g., using pybreaker or tenacity) for all downstream calls to Qdrant and JanusGraph.
- Threshold: 5 consecutive failures.
- Reset Timeout: 30 seconds.
- Fallback: Return 503 if primary stores are down.
Health Checks (/health)
- Liveness: Returns 200 if service is running.
- Readiness: Checks connectivity to Qdrant and JanusGraph. Response must include details:
{
"status": "healthy",
"dependencies": {
"qdrant": "up",
"janusgraph": "up",
"nats": "up"
}
}
Metrics (Prometheus)
Expose standard metrics on /metrics:
memory_upsert_latency_seconds: Histogrammemory_upsert_count: Counter (labels: tenant_id, status)query_latency_seconds: Histogram