Memory Platform – Complete Technical Specification
Enterprise-Grade Memory System for AI Agents
Version: 2.0
Last Updated: December 24, 2025
Status: Ready for Implementation
📋 Executive Summary
This document defines the complete architecture, implementation requirements, and operational guidelines for the Memory Engine Platform – an enterprise-grade, multi-tenant memory system for AI agents composed of:
- Graph Memory Layer (JanusGraph)
- Semantic Memory Layer (Qdrant)
- Document Intelligence Layer (memory-store-documents) [NEW]
- Structured Memory Model
- LLM Curator Pipelines
- MCP Integration
- Visualization Tools
- Frontend User Memory Explorer
Target Audience: Software engineers, DevOps engineers, ML engineers, and autonomous development agents.
Goal: Any developer or autonomous agent with this document can bootstrap, implement, deploy, extend, and maintain the entire platform.
🎯 Core Objectives
Business Goals
- Provide commercial-safe, fully open-source compatible enterprise memory platform
- Enable AI agents to maintain coherent long-term knowledge about users, organizations, projects, and contexts
- Support multi-channel memory aggregation (chat, documents, tools, events)
- Deliver sub-second query performance for agent context retrieval
- Ensure data consistency, auditability, and compliance readiness
🔥 Repository Architecture Overview
The platform consists of 11 repositories:
1️⃣ memory-domain
2️⃣ memory-store-janusgraph
3️⃣ memory-store-qdrant
4️⃣ memory-store-documents [NEW]
5️⃣ memory-engine-service
6️⃣ memory-mcp-server
7️⃣ memory-curator-worker
8️⃣ memory-graph-visualizer
9️⃣ memory-sdk-js
🔟 memory-sdk-python
1️⃣1️⃣ memory-infra
Each one is defined below.
Technical Goals
- Multi-tenant isolation at every layer
- Read performance: p95 < 500ms for context queries
- Write consistency: Graph + Vector atomic operations
- Auditability: Complete provenance tracking
- Scalability: Support 1M+ memory blocks per tenant
- Extensibility: Plugin architecture for new memory types and sources
🏗️ System Architecture
High-Level Component Diagram
┌─────────────────────────────────────────────────────────────────┐
│ Frontend Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web UI │ │ Mobile │ │ Desktop │ │
│ │ (React) │ │ App │ │ App │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼──────────────────┘
│ │ │
└──────────────────┴──────────────────┘
│
┌──────────────────▼──────────────────┐
│ MCP Gateway (Node/TS) │
│ (Existing infrastructure) │
└──────────────────┬──────────────────┘
│
┌──────────────────▼──────────────────┐
│ memory-mcp-server (Node/TS) │
│ ┌────────────────────────────────┐ │
│ │ Tools: memory.* operations │ │
│ │ Resources: memory:// URIs │ │
│ └────────────────────────────────┘ │
└──────────────────┬──────────────────┘
│
┌──────────────────▼──────────────────┐
│ memory-engine-service (Python) │
│ ┌────────────────────────────────┐ │
│ │ FastAPI HTTP/gRPC API │ │
│ │ Business Logic Layer │ │
│ │ Multi-tenant Access Control │ │
│ └────────┬────────┬───────┬──────┘ │
└───────────┼────────┼───────┼────────┘
│ │ │
┌─────────────────▼────────┴┐ │ ┌──────────────────────┐
│ JanusGraph (Graph DB) │ ├─────▶ memory-store-documents │
│ - Extensible Nodes │ │ │ - Coordinate Logic │
│ - Relationships │ │ │ - Source Provenance │
│ - Provenance │ │ └──────────────────────┘
└───────────────────────────┘ │
│
┌─────────▼──────────┐
│ Qdrant │
│ (Vector Store) │
│ - Embeddings │
│ - Semantic Search │
└────────────────────┘
│
┌─────────▼────────────────────┐
│ JetStream (NATS) │
│ Event Bus for Async Jobs │
└─────────┬────────────────────┘
│
┌─────────▼────────────────────┐
│ memory-curator-worker │
│ (Python) │
│ ┌────────────────────────┐ │
│ │ LLM Extraction │ │
│ │ Document Processing │ │
│ │ Conversation Curation │ │
│ │ Decay Management │ │
│ └────────────────────────┘ │
└──────────────────────────────┘
Data Flow Examples
1. Agent Context Loading (Pre-conversation)
Agent → MCP memory.query → Engine Service → Qdrant (semantic) + JanusGraph (graph)
→ Ranked MemoryBlocks → Agent prompt context
2. Memory Creation (Post-conversation)
Conversation End → JetStream event → Curator Worker → LLM extraction
→ Engine Service → Atomic write to Graph + Vector → Success
3. Document Ingestion
User uploads doc → Frontend → JetStream event → Curator Worker
→ Download from Drive → Chunk + Extract → Engine Service → Memory blocks
📊 Core Concepts & Data Model
1. Memory Block
The fundamental unit of memory. Represents a single piece of knowledge.
from pydantic import BaseModel, Field
from typing import Optional, Literal, Dict, Any, List
from datetime import datetime
from enum import Enum
class MemoryKind(str, Enum):
FACT = "fact" # Verifiable, stable information
PREFERENCE = "preference" # Stated likes/dislikes, requirements
INSIGHT = "insight" # Agent-discovered patterns
SUMMARY = "summary" # Consolidated information
PROFILE = "profile" # Identity/demographic data
TOOL_RESULT = "tool_result" # Output from tool execution
NOTE = "note" # User-created annotations
INTERACTION = "interaction" # Record of specific interaction
class MemorySource(str, Enum):
CHAT = "chat"
TOOL = "tool"
DOCUMENT = "document"
EVENT = "event"
SYSTEM = "system"
USER_INPUT = "user_input"
class MemoryScores(BaseModel):
"""
Scoring system for memory relevance and quality.
All scores are float [0.0 - 1.0]
"""
salience: float = Field(
ge=0.0, le=1.0,
description="Current relevance to subject. Decays over time without access."
)
stability: float = Field(
ge=0.0, le=1.0,
description="Resistance to invalidation. Facts > Preferences > Insights."
)
confidence: float = Field(
ge=0.0, le=1.0,
description="Source reliability + verification count."
)
class MemoryContent(BaseModel):
"""Content container supporting both natural language and structured data."""
text: str = Field(
description="Natural language summary suitable for LLM context"
)
structured: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional structured representation (JSON-serializable)"
)
class SourceMetadata(BaseModel):
"""Provenance information for the memory."""
origin: MemorySource
tool_name: Optional[str] = None
conversation_id: Optional[str] = None
document_id: Optional[str] = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
source_reference: Optional[Dict[str, Any]] = None # NEW: SourceLocation mapping
class Subject(BaseModel):
"""Entity that the memory is about. Extensible in Platform 2.0."""
type: str # user, org, project, tool, etc.
id: str
class MemoryBlock(BaseModel):
"""
Core memory unit.
Represents a single piece of knowledge with metadata, scores, and provenance.
"""
id: str = Field(description="Unique identifier, format: mem_{uuid}")
tenant_id: str = Field(description="Tenant isolation key")
subject: Subject = Field(description="Primary entity this memory is about")
kind: MemoryKind
source: SourceMetadata
content: MemoryContent
scores: MemoryScores
tags: List[str] = Field(
default_factory=list,
description="Concepts/topics for filtering and grouping"
)
embedding_id: Optional[str] = Field(
default=None,
description="Reference to vector in Qdrant"
)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
accessed_at: datetime = Field(
default_factory=datetime.utcnow,
description="Last time this memory was retrieved"
)
supersedes: Optional[List[str]] = Field(
default=None,
description="IDs of memory blocks this one replaces"
)
version: int = Field(default=1, description="Version number for updates")
2. Memory Packs
Logical groupings of related memory blocks for structured presentation.
from typing import List
from enum import Enum
class PackType(str, Enum):
"""Categories of memory packs for organizational structure."""
BASIC_PROFILE = "basic_profile" # Identity, contact, core attributes
BUSINESS_CONTEXT = "business_context" # Products, services, ICP, market
OPERATIONAL = "operational" # Processes, playbooks, SLAs
FINANCIAL = "financial" # Billing, pricing, contracts
INSIGHTS = "insights" # Patterns, opportunities, risks
PREFERENCES = "preferences" # Communication, tools, style
RELATIONSHIPS = "relationships" # Connections to other entities
HISTORICAL = "historical" # Past interactions, outcomes
class ConsolidationStrategy(str, Enum):
"""How to handle multiple blocks in a pack."""
LATEST = "latest" # Show only most recent
MERGE = "merge" # Combine into single view
VERSIONED = "versioned" # Show history/changelog
ALL = "all" # Display all blocks
class MemoryPack(BaseModel):
"""
Grouped collection of related memory blocks.
Packs provide structured views of knowledge for specific contexts.
"""
pack_id: str = Field(description="Unique pack identifier")
pack_type: PackType
label: str = Field(description="Human-readable name")
description: Optional[str] = Field(
default=None,
description="What this pack represents"
)
subject: Subject = Field(description="Primary entity this pack is about")
blocks: List[MemoryBlock] = Field(description="Ordered memory blocks")
consolidation_strategy: ConsolidationStrategy = Field(
default=ConsolidationStrategy.MERGE
)
last_updated: datetime = Field(default_factory=datetime.utcnow)
total_salience: float = Field(
description="Aggregate salience score across all blocks"
)
Pack Type Definitions
| Pack Type | Purpose | Example Contents |
|---|---|---|
| basic_profile | Identity & core attributes | Name, email, location, timezone, language, role |
| business_context | Market positioning | Products, services, ICP, sectors, value props |
| operational | Internal processes | Playbooks, workflows, SLAs, escalation paths |
| financial | Money matters | Billing cycle, payment terms, pricing tier, discounts |
| insights | Discovered knowledge | Patterns, anomalies, risks, opportunities, recommendations |
| preferences | Behavioral choices | Communication style, tool preferences, notification settings |
| relationships | Entity connections | Reports-to, partners, vendors, customers, projects |
| historical | Interaction record | Past conversations, outcomes, decisions, issues |
3. Graph Schema
JanusGraph stores relationships and structure.
class GraphNodeType(str):
"""Types of vertices in the knowledge graph (Aliases)."""
# Built-in types: USER, ORG, PROJECT, TOOL, CHANNEL, DOCUMENT, TOPIC, MEMORY_BLOCK
class GraphEdgeType(str):
"""Types of relationships between graph nodes (Aliases)."""
# Built-in types: ABOUT, PREFERS, DERIVED_FROM, MENTIONS, BELONGS_TO, SUPERSEDES
class GraphNode(BaseModel):
"""Vertex in the knowledge graph."""
vertex_id: str = Field(description="JanusGraph vertex ID")
type: GraphNodeType
tenant_id: str = Field(description="Tenant isolation")
attributes: Dict[str, Any] = Field(
description="Type-specific properties"
)
created_at: datetime = Field(default_factory=datetime.utcnow)
class GraphEdge(BaseModel):
"""Edge connecting two vertices."""
edge_id: Optional[str] = Field(default=None, description="JanusGraph edge ID")
from_vertex: str = Field(description="Source vertex ID")
to_vertex: str = Field(description="Target vertex ID")
edge_type: GraphEdgeType
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Edge-specific properties (weight, timestamp, etc.)"
)
created_at: datetime = Field(default_factory=datetime.utcnow)
Graph Node Property Schemas
# Node-specific attribute schemas
USER_SCHEMA = {
"id": "string (external user ID)",
"tenant_id": "string",
"name": "string",
"email": "string",
"role": "string (optional)",
"created_at": "datetime"
}
ORG_SCHEMA = {
"id": "string (external org ID)",
"tenant_id": "string",
"name": "string",
"domain": "string (optional)",
"industry": "string (optional)",
"created_at": "datetime"
}
MEMORY_BLOCK_SCHEMA = {
"id": "string (mem_*)",
"tenant_id": "string",
"kind": "string (MemoryKind)",
"salience_score": "float",
"created_at": "datetime",
"updated_at": "datetime"
}
TOPIC_SCHEMA = {
"id": "string (topic_*)",
"tenant_id": "string",
"name": "string",
"category": "string (optional)",
"created_at": "datetime"
}
DOCUMENT_SCHEMA = {
"id": "string (doc_*)",
"tenant_id": "string",
"source_tool": "string (google_drive, notion, etc.)",
"external_id": "string (source system ID)",
"title": "string",
"created_at": "datetime"
}
4. Memory Scoring System
Scores guide retrieval relevance and lifecycle management.
Score Definitions
Salience [0.0 - 1.0]
- Purpose: Current relevance to the subject
- Factors: Recency, access frequency, topic alignment
- Decay: Exponential over time without access
- Boost: Increases on retrieval
Stability [0.0 - 1.0]
- Purpose: Resistance to invalidation
- Range:
- Facts: 0.8 - 1.0 (highly stable)
- Preferences: 0.5 - 0.8 (moderately stable)
- Insights: 0.3 - 0.6 (may evolve)
- Summaries: 0.2 - 0.5 (frequently updated)
Confidence [0.0 - 1.0]
- Purpose: Trust in accuracy
- Factors:
- Source reliability (document > conversation > inference)
- Verification count (multiple sources boost confidence)
- LLM extraction confidence score
Decay Algorithm
import math
from datetime import datetime, timedelta
def calculate_salience_decay(
base_salience: float,
last_accessed: datetime,
memory_kind: MemoryKind
) -> float:
"""
Apply time-based decay to salience score.
Decay rates (λ) by kind:
- facts: 0.01 (slow decay)
- preferences: 0.05 (medium decay)
- insights: 0.1 (fast decay)
- summaries: 0.15 (very fast decay)
"""
decay_rates = {
MemoryKind.FACT: 0.01,
MemoryKind.PROFILE: 0.01,
MemoryKind.PREFERENCE: 0.05,
MemoryKind.TOOL_RESULT: 0.07,
MemoryKind.INSIGHT: 0.1,
MemoryKind.SUMMARY: 0.15,
MemoryKind.NOTE: 0.03,
MemoryKind.INTERACTION: 0.12
}
lambda_decay = decay_rates.get(memory_kind, 0.05)
days_since_access = (datetime.utcnow() - last_accessed).days
# Exponential decay: salience(t) = base * e^(-λ * t)
decayed_salience = base_salience * math.exp(-lambda_decay * days_since_access)
return max(0.0, min(1.0, decayed_salience))
def boost_salience_on_access(current_salience: float) -> float:
"""Increase salience when memory is accessed."""
boost = 0.1
return min(1.0, current_salience + boost)
def reduce_confidence_on_contradiction(
current_confidence: float,
contradiction_severity: float # 0.0 - 1.0
) -> float:
"""Reduce confidence when contradicting information appears."""
penalty = 0.3 * contradiction_severity
return max(0.0, current_confidence - penalty)
def increase_confidence_on_verification(current_confidence: float) -> float:
"""Boost confidence when memory is verified by another source."""
boost = 0.2
return min(1.0, current_confidence + boost)
Recalculation Triggers
| Trigger | Action | Component |
|---|---|---|
| Memory accessed | Boost salience by +0.1 | Engine Service (on read) |
| Daily decay | Apply exponential decay | Curator Worker (cron job) |
| Contradiction detected | Reduce confidence by 0.3 | Curator Worker (LLM analysis) |
| Verification from new source | Increase confidence by +0.2 | Curator Worker (cross-reference) |
| Memory superseded | Set salience to 0.0 on old block | Engine Service (on write) |
🗄️ Storage Layer Architecture
JanusGraph Configuration
Schema Definition
// JanusGraph schema setup (Gremlin script)
// Vertex labels
mgmt = graph.openManagement()
// Define vertex labels
user = mgmt.makeVertexLabel('User').make()
org = mgmt.makeVertexLabel('Org').make()
project = mgmt.makeVertexLabel('Project').make()
tool = mgmt.makeVertexLabel('Tool').make()
channel = mgmt.makeVertexLabel('Channel').make()
document = mgmt.makeVertexLabel('Document').make()
topic = mgmt.makeVertexLabel('Topic').make()
memoryBlock = mgmt.makeVertexLabel('MemoryBlock').make()
conversation = mgmt.makeVertexLabel('Conversation').make()
event = mgmt.makeVertexLabel('Event').make()
// Property keys
tenantId = mgmt.makePropertyKey('tenant_id').dataType(String.class).make()
externalId = mgmt.makePropertyKey('id').dataType(String.class).make()
name = mgmt.makePropertyKey('name').dataType(String.class).make()
email = mgmt.makePropertyKey('email').dataType(String.class).make()
kind = mgmt.makePropertyKey('kind').dataType(String.class).make()
salienceScore = mgmt.makePropertyKey('salience_score').dataType(Float.class).make()
createdAt = mgmt.makePropertyKey('created_at').dataType(Long.class).make()
updatedAt = mgmt.makePropertyKey('updated_at').dataType(Long.class).make()
// Composite indices for tenant isolation (CRITICAL)
mgmt.buildIndex('byTenantAndType', Vertex.class)
.addKey(tenantId)
.buildCompositeIndex()
mgmt.buildIndex('userByTenantAndId', Vertex.class)
.addKey(tenantId)
.addKey(externalId)
.indexOnly(user)
.unique()
.buildCompositeIndex()
mgmt.buildIndex('orgByTenantAndId', Vertex.class)
.addKey(tenantId)
.addKey(externalId)
.indexOnly(org)
.unique()
.buildCompositeIndex()
mgmt.buildIndex('memoryBlockByTenant', Vertex.class)
.addKey(tenantId)
.indexOnly(memoryBlock)
.buildCompositeIndex()
// Mixed index for text search
mgmt.buildIndex('memoryBlockFullText', Vertex.class)
.addKey(name, Mapping.TEXT.asParameter())
.indexOnly(memoryBlock)
.buildMixedIndex("search")
// Vertex-centric indices for fast edge traversal
mgmt.buildEdgeIndex(mgmt.getEdgeLabel('ABOUT'), 'aboutByTimestamp', Direction.OUT, Order.desc, createdAt)
mgmt.buildEdgeIndex(mgmt.getEdgeLabel('DERIVED_FROM'), 'derivedFromByTimestamp', Direction.OUT, Order.desc, createdAt)
mgmt.commit()
Required Indices
| Index Type | Name | Keys | Purpose |
|---|---|---|---|
| Composite | byTenantAndType | tenant_id | Fast tenant filtering |
| Composite | userByTenantAndId | tenant_id, id | Unique user lookup |
| Composite | orgByTenantAndId | tenant_id, id | Unique org lookup |
| Composite | memoryBlockByTenant | tenant_id | List all blocks for tenant |
| Mixed | memoryBlockFullText | name (TEXT) | Keyword search in content |
| Vertex-centric | aboutByTimestamp | OUT edges, timestamp DESC | Recent topic associations |
| Vertex-centric | derivedFromByTimestamp | OUT edges, timestamp DESC | Source provenance |
Standard Traversal Patterns
# 1. Subject Context (get all memories about an entity)
# Gremlin: g.V().has('User', 'tenant_id', tenant_id).has('id', user_id).in('ABOUT').limit(100)
# 2. Topic Exploration (find high-salience memories about a topic)
# Gremlin: g.V().has('Topic', 'tenant_id', tenant_id).has('name', 'pricing').in('ABOUT').has('salience_score', gt(0.7))
# 3. Source Provenance (trace where a memory came from)
# Gremlin: g.V(memory_block_id).out('DERIVED_FROM').path()
# 4. Related Topics (find topics connected to a memory)
# Gremlin: g.V(memory_block_id).out('ABOUT').values('name')
# 5. Entity Relationships (map connections)
# Gremlin: g.V().has('Org', 'id', org_id).both().path().by('name')
# 6. Neighborhood Query (2-hop context)
# Gremlin: g.V(subject_id).both().dedup().both().dedup().limit(50)
Qdrant Configuration & Embedding Strategy
Collection Schema
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PayloadSchemaType
# Collection naming: single collection with tenant filtering (recommended)
COLLECTION_NAME = "memories"
# Vector configuration
VECTOR_DIMENSIONS = 1536 # OpenAI text-embedding-3-small
# Alternative: 384 for all-MiniLM-L6-v2
def setup_qdrant_collection(client: QdrantClient):
"""Initialize Qdrant collection with proper schema."""
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=VectorParams(
size=VECTOR_DIMENSIONS,
distance=Distance.COSINE
)
)
# Create payload indices for fast filtering
client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="tenant_id",
field_schema=PayloadSchemaType.KEYWORD
)
client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="subject.type",
field_schema=PayloadSchemaType.KEYWORD
)
client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="subject.id",
field_schema=PayloadSchemaType.KEYWORD
)
client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="kind",
field_schema=PayloadSchemaType.KEYWORD
)
client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="tags",
field_schema=PayloadSchemaType.KEYWORD
)
client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="created_at",
field_schema=PayloadSchemaType.DATETIME
)
Embedding Model Selection
Recommended Production Setup:
- Primary: OpenAI
text-embedding-3-small(1536 dimensions)- Cost-effective: $0.02 per 1M tokens
- High quality
- Fast inference
- Fallback/Local:
all-MiniLM-L6-v2(384 dimensions)- Open source (Apache 2.0)
- Runs locally
- Good performance for English
from enum import Enum
class EmbeddingModel(str, Enum):
OPENAI_SMALL = "text-embedding-3-small"
OPENAI_LARGE = "text-embedding-3-large"
MINILM = "all-MiniLM-L6-v2"
MPNET = "all-mpnet-base-v2"
EMBEDDING_CONFIG = {
EmbeddingModel.OPENAI_SMALL: {
"dimensions": 1536,
"provider": "openai",
"max_tokens": 8191
},
EmbeddingModel.OPENAI_LARGE: {
"dimensions": 3072,
"provider": "openai",
"max_tokens": 8191
},
EmbeddingModel.MINILM: {
"dimensions": 384,
"provider": "sentence-transformers",
"max_tokens": 256
},
EmbeddingModel.MPNET: {
"dimensions": 768,
"provider": "sentence-transformers",
"max_tokens": 384
}
}
🏢 Multi-Tenancy & Security
Tenant Isolation Strategy
Critical Principle: Every query, write, and read operation MUST enforce tenant boundaries at the platform level.
JanusGraph Tenant Isolation
class TenantIsolatedGraphRepository:
"""
All graph operations automatically filter by tenant_id.
"""
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
self.client = get_gremlin_client()
async def create_vertex(
self,
vertex_type: GraphNodeType,
attributes: Dict[str, Any]
) -> str:
"""Create vertex with automatic tenant_id injection."""
# CRITICAL: Always add tenant_id
attributes["tenant_id"] = self.tenant_id
# Build Gremlin query
g = self.client.g
vertex = await g.addV(vertex_type.value).property_list(attributes).next()
return vertex.id
async def create_edge(
self,
from_vertex_id: str,
to_vertex_id: str,
edge_type: GraphEdgeType,
metadata: Dict[str, Any] = None
) -> str:
"""
Create edge with tenant validation.
CRITICAL: Verify both vertices belong to same tenant.
"""
g = self.client.g
# Validate tenant ownership of both vertices
from_vertex = await g.V(from_vertex_id).has("tenant_id", self.tenant_id).next_or_none()
to_vertex = await g.V(to_vertex_id).has("tenant_id", self.tenant_id).next_or_none()
if not from_vertex or not to_vertex:
raise TenantIsolationViolation(
f"Cannot create edge: vertices must belong to tenant {self.tenant_id}"
)
# Create edge
edge = await g.V(from_vertex_id) \
.addE(edge_type.value) \
.to(g.V(to_vertex_id)) \
.property_list(metadata or {}) \
.next()
return edge.id
Qdrant Tenant Isolation
from qdrant_client.models import Filter, FieldCondition, MatchValue
class TenantIsolatedVectorRepository:
"""
All vector operations automatically filter by tenant_id.
"""
def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
self.client = get_qdrant_client()
def _build_tenant_filter(
self,
additional_filters: Optional[List[FieldCondition]] = None
) -> Filter:
"""Build Qdrant filter that always includes tenant_id."""
conditions = [
FieldCondition(
key="tenant_id",
match=MatchValue(value=self.tenant_id)
)
]
if additional_filters:
conditions.extend(additional_filters)
return Filter(must=conditions)
async def search(
self,
query_vector: List[float],
filters: Optional[Dict[str, Any]] = None,
limit: int = 10
) -> List[ScoredPoint]:
"""Search with automatic tenant isolation."""
# CRITICAL: Always include tenant filter
tenant_filter = self._build_tenant_filter(filter_conditions)
results = await self.client.search(
collection_name=COLLECTION_NAME,
query_vector=query_vector,
query_filter=tenant_filter,
limit=limit
)
return results
🔧 Repository Definitions
Language Decision: Python vs Node
Recommendation:
- Python: Core memory engine, curator workers, data stores
- Node/TypeScript: MCP server, SDKs, frontend integration
Rationale:
- Python has superior ML/NLP ecosystem (transformers, langchain)
- Gremlin and Qdrant Python SDKs are most feature-complete
- LLM orchestration tools are Python-native
- MCP SDK is TypeScript-first
- Keeps MCP gateway in your existing Node infrastructure
1️⃣ memory-domain
Purpose: Canonical data contracts and business rules.
Language: Python 3.11+
Structure:
memory-domain/
├── pyproject.toml
├── README.md
├── src/
│ └── memory_domain/
│ ├── __init__.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── memory_block.py # MemoryBlock, MemoryKind, etc.
│ │ ├── memory_pack.py # MemoryPack, PackType
│ │ ├── subject.py # Subject model
│ │ ├── graph.py # GraphNode, GraphEdge
│ │ └── scores.py # Scoring algorithms
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── json_schemas.py # JSON Schema exports
│ │ └── openapi_schemas.py # OpenAPI spec generation
│ ├── validation/
│ │ ├── __init__.py
│ │ └── validators.py # Cross-model validation
│ └── errors/
│ ├── __init__.py
│ └── exceptions.py # Custom exceptions
├── tests/
│ ├── test_models.py
│ ├── test_validation.py
│ └── test_scores.py
└── docs/
└── models.md
Key Dependencies:
[tool.poetry.dependencies]
python = "^3.11"
pydantic = "^2.5"
pydantic-settings = "^2.1"
2️⃣ memory-store-janusgraph
Purpose: Graph persistence layer implementation.
Language: Python 3.11+
Key Dependencies:
[tool.poetry.dependencies]
python = "^3.11"
gremlinpython = "^3.7"
memory-domain = {path = "../memory-domain", develop = true}
asyncio = "^3.4"
Critical Security Tests:
# tests/test_tenant_isolation.py
async def test_cannot_access_other_tenant_vertices():
"""CRITICAL: Verify tenant boundaries are enforced."""
# Create vertex in tenant A
repo_a = TenantIsolatedGraphRepository("tenant_a")
vertex_a = await repo_a.create_vertex(
GraphNodeType.USER,
{"id": "user_123", "name": "Alice"}
)
# Attempt to access from tenant B
repo_b = TenantIsolatedGraphRepository("tenant_b")
vertex_b = await repo_b.find_vertex(GraphNodeType.USER, "user_123")
# Should not find vertex from other tenant
assert vertex_b is None
async def test_cannot_create_cross_tenant_edge():
"""CRITICAL: Verify edges cannot span tenants."""
repo_a = TenantIsolatedGraphRepository("tenant_a")
repo_b = TenantIsolatedGraphRepository("tenant_b")
vertex_a = await repo_a.create_vertex(GraphNodeType.USER, {"id": "user_a"})
vertex_b = await repo_b.create_vertex(GraphNodeType.USER, {"id": "user_b"})
# Attempt to create edge from A to B using tenant A repo
with pytest.raises(TenantIsolationViolation):
await repo_a.create_edge(
from_vertex_id=vertex_a,
to_vertex_id=vertex_b,
edge_type=GraphEdgeType.RELATED_TO
)
3️⃣ memory-store-qdrant
Purpose: Semantic storage & retrieval via Qdrant.
Language: Python 3.11+
Key Dependencies:
[tool.poetry.dependencies]
python = "^3.11"
qdrant-client = "^1.7"
memory-domain = {path = "../memory-domain", develop = true}
openai = "^1.6"
sentence-transformers = "^2.3"
4️⃣ memory-engine-service
Purpose: Core HTTP API and business logic.
Language: Python 3.11+ (FastAPI)
Key Endpoints:
# POST /memory - Create/update memory blocks
@app.post("/memory", response_model=MemoryCreateResponse)
async def create_memory(
request: MemoryCreateRequest,
tenant_id: str = Depends(extract_tenant_from_request)
):
"""
Create or update memory blocks with automatic graph and vector sync.
"""
# POST /memory/query - Semantic search
@app.post("/memory/query", response_model=MemoryQueryResponse)
async def query_memory(
request: MemoryQueryRequest,
tenant_id: str = Depends(extract_tenant_from_request)
):
"""
Hybrid semantic + graph query.
Request body:
{
"subject": {"type": "org", "id": "org_123"},
"query_text": "What are our payment terms?",
"filters": {
"kind": ["fact", "insight"],
"tags_any": ["billing", "payment"],
"time_range": {"from_days_ago": 365}
},
"limit": 20,
"include_graph_context": true
}
"""
# GET /memory/packs - Retrieve structured packs
@app.get("/memory/packs", response_model=List[MemoryPack])
async def get_packs(
subject_type: str,
subject_id: str,
pack_types: Optional[List[PackType]] = Query(None),
tenant_id: str = Depends(extract_tenant_from_request)
):
"""Get memory packs for a subject."""
# POST /memory/graph/neighbors - Graph exploration
@app.post("/memory/graph/neighbors", response_model=GraphNeighborsResponse)
async def get_graph_neighbors(
request: GraphNeighborsRequest,
tenant_id: str = Depends(extract_tenant_from_request)
):
"""Explore graph neighborhood around a node."""
# GET /health
@app.get("/health")
async def health_check():
"""Health check endpoint for load balancers."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat()
}
5️⃣ memory-mcp-server
Purpose: Expose memory engine through MCP protocol.
Language: Node/TypeScript (MCP SDK is TypeScript-first)
Key Dependencies:
{
"dependencies": {
"@modelcontextprotocol/sdk": "^0.5.0",
"axios": "^1.6.0",
"zod": "^3.22.0"
}
}
MCP Tools:
memory.upsert_blockmemory.querymemory.link_blocksmemory.graph_neighbors
MCP Resources:
memory://subjects/\{subject\}/snapshotmemory://blocks/\{id\}memory://packs/\{subject\}
6️⃣ memory-curator-worker
Purpose: LLM-powered knowledge extraction and curation.
Language: Python 3.11+
Event Handlers:
conversation_curation- Extract from chat transcriptsdocument_ingestion- Process Google Drive/Notion documentstool_event- Handle tool result memorizationsalience_decay- Daily maintenance of score decay
Curation Prompt Template:
You are a memory curator for an AI assistant. Your job is to extract structured, memorable knowledge from conversations.
**Subject:** \{subject\}
**Existing Knowledge About This Subject:**
{context_memories}
**New Conversation:**
{transcript}
**Your Task:**
Extract memories that are:
1. **Facts** - Verifiable, stable information that doesn't change often
2. **Preferences** - Stated likes, dislikes, requirements, or behavioral patterns
3. **Insights** - Patterns, opportunities, risks, or discoveries you infer
**Output Format (JSON):**
{
"blocks": [
{
"kind": "fact|preference|insight",
"content": {
"text": "Natural language summary suitable for LLM context",
"structured": {}
},
"tags": ["list", "of", "relevant", "concepts"],
"scores": {
"salience": 0.0-1.0,
"stability": 0.0-1.0,
"confidence": 0.0-1.0
},
"reasoning": "Why this is worth remembering"
}
]
}
**Guidelines:**
- Extract 3-10 blocks per conversation (focus on quality over quantity)
- Avoid duplicating existing knowledge unless it's an update
- Facts should be specific and verifiable
- Use clear, concise language that an LLM can easily understand
7️⃣ memory-graph-visualizer
Purpose: Admin/developer graph exploration UI.
Approach: Wrapper around JanusGraph-Visualizer or Graphexp
Implementation: Docker + nginx with basic auth
8️⃣ memory-sdk-js
Purpose: TypeScript/Node client for developers and frontend.
API Design:
import { MemoryClient } from '@your-org/memory-sdk';
const client = new MemoryClient({
apiUrl: 'http://localhost:8000',
apiKey: process.env.MEMORY_API_KEY
});
// Write a fact
await client.writeFact({
subject: { type: 'org', id: 'org_acme' },
text: 'Primary contact is John Doe (john@acme.com)',
tags: ['contact', 'primary']
});
// Search memories
const results = await client.searchMemories({
subject: { type: 'org', id: 'org_acme' },
query: 'Who is the primary contact?',
limit: 10
});
// Get working set for agent
const workingSet = await client.getWorkingSet({
subject: { type: 'user', id: 'user_123' },
context: 'customer support conversation',
maxBlocks: 30
});
9️⃣ memory-sdk-python
Purpose: Python client (mirrors JS SDK functionality).
Language: Python 3.11+
🔟 memory-infra
Purpose: Complete stack deployment orchestration.
Contents:
- Docker Compose for local development
- Kubernetes manifests for production
- Helm charts
- Monitoring configs (Prometheus, Grafana)
- Quick setup scripts
Quick Start Script:
#!/bin/bash
# scripts/setup-local.sh
set -e
echo "🚀 Setting up Memory Platform local environment..."
# Start infrastructure
docker-compose up -d cassandra elasticsearch qdrant nats redis postgres
# Wait for services
sleep 30
# Start JanusGraph
docker-compose up -d janusgraph
sleep 20
# Initialize schemas
docker-compose exec janusgraph /opt/janusgraph/bin/gremlin.sh < ../memory-store-janusgraph/schema/setup.groovy
# Start application services
docker-compose up -d memory-engine memory-mcp-server memory-curator-worker memory-visualizer
# Start monitoring
docker-compose up -d prometheus grafana
echo "✅ Memory Platform is running!"
echo " Memory Engine API: http://localhost:8000"
echo " MCP Server: http://localhost:3000"
echo " Graph Visualizer: http://localhost:3001"
echo " Grafana: http://localhost:3002"
📊 Monitoring & Observability
Key Metrics
from prometheus_client import Counter, Histogram, Gauge
# Memory operations
memory_blocks_created = Counter(
'memory_blocks_created_total',
'Total number of memory blocks created',
['tenant_id', 'kind']
)
memory_query_duration = Histogram(
'memory_query_duration_seconds',
'Time spent executing memory queries',
['operation', 'tenant_id'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)
curator_events_processed = Counter(
'curator_events_processed_total',
'Total events processed by curator',
['event_type', 'status']
)
memory_sync_lag = Gauge(
'memory_sync_lag_seconds',
'Time lag between graph and vector writes'
)
Critical Alerts
# prometheus-alerts.yml
groups:
- name: memory_platform
rules:
- alert: HighQueryLatency
expr: histogram_quantile(0.99, memory_query_duration_seconds_bucket) > 2
for: 5m
labels:
severity: warning
- alert: GraphVectorInconsistency
expr: rate(memory_sync_lag_seconds[5m]) > 60
for: 2m
labels:
severity: critical
- alert: CuratorWorkerLag
expr: nats_consumer_num_pending{consumer="memory-curator-worker"} > 1000
for: 10m
labels:
severity: warning
✅ Implementation Priority
Phase 1: Foundation (Weeks 1-3)
- memory-domain - Complete models
- memory-store-janusgraph - Graph CRUD + schema
- memory-store-qdrant - Vector CRUD + embedding
- memory-engine-service - Basic write/query API
Phase 2: Intelligence (Weeks 4-5)
- memory-curator-worker - Conversation curation
- memory-mcp-server - MCP tools
- memory-infra - Docker Compose setup
Phase 3: Production (Weeks 6-8)
- memory-sdk-js - Client library
- Document ingestion in curator
- Frontend integration
Phase 4: Polish (Weeks 9-10)
- memory-graph-visualizer - Admin UI
- Advanced queries (packs, graph)
- Performance optimization
- memory-sdk-python - Full parity
📝 Final Checklist
Before Starting Development
- Review complete specification
- Clarify any ambiguities
- Set up development environment
- Create all 11 repository scaffolds
- Configure CI/CD pipelines
Per Repository
- Implement core functionality
- Write comprehensive tests
- Add README with setup instructions
- Add Dockerfile
- Configure linting
- Implement logging & metrics
Integration
- Test full stack in Docker Compose
- Verify tenant isolation (CRITICAL)
- Run end-to-end scenarios
- Load test
- Security audit
Production Readiness
- Kubernetes manifests
- Monitoring dashboards
- Alert rules
- Backup procedures
- Disaster recovery plan
📄 Document Control
Version: 2.0
Status: Ready for Implementation
Last Updated: December 24, 2025
Change Log:
- v2.0 (2025-12-24): Complete specification with all gaps filled
- v1.0 (2025-12-20): Initial draft
END OF SPECIFICATION