Skip to main content

Memory Platform – Complete Technical Specification

Enterprise-Grade Memory System for AI Agents

Version: 2.0
Last Updated: December 24, 2025
Status: Ready for Implementation


📋 Executive Summary

This document defines the complete architecture, implementation requirements, and operational guidelines for the Memory Engine Platform – an enterprise-grade, multi-tenant memory system for AI agents composed of:

  • Graph Memory Layer (JanusGraph)
  • Semantic Memory Layer (Qdrant)
  • Document Intelligence Layer (memory-store-documents) [NEW]
  • Structured Memory Model
  • LLM Curator Pipelines
  • MCP Integration
  • Visualization Tools
  • Frontend User Memory Explorer

Target Audience: Software engineers, DevOps engineers, ML engineers, and autonomous development agents.

Goal: Any developer or autonomous agent with this document can bootstrap, implement, deploy, extend, and maintain the entire platform.


🎯 Core Objectives

Business Goals

  • Provide commercial-safe, fully open-source compatible enterprise memory platform
  • Enable AI agents to maintain coherent long-term knowledge about users, organizations, projects, and contexts
  • Support multi-channel memory aggregation (chat, documents, tools, events)
  • Deliver sub-second query performance for agent context retrieval
  • Ensure data consistency, auditability, and compliance readiness

🔥 Repository Architecture Overview

The platform consists of 11 repositories:

1️⃣ memory-domain
2️⃣ memory-store-janusgraph
3️⃣ memory-store-qdrant
4️⃣ memory-store-documents [NEW]
5️⃣ memory-engine-service
6️⃣ memory-mcp-server
7️⃣ memory-curator-worker
8️⃣ memory-graph-visualizer
9️⃣ memory-sdk-js
🔟 memory-sdk-python
1️⃣1️⃣ memory-infra

Each one is defined below.


Technical Goals

  • Multi-tenant isolation at every layer
  • Read performance: p95 < 500ms for context queries
  • Write consistency: Graph + Vector atomic operations
  • Auditability: Complete provenance tracking
  • Scalability: Support 1M+ memory blocks per tenant
  • Extensibility: Plugin architecture for new memory types and sources

🏗️ System Architecture

High-Level Component Diagram

┌─────────────────────────────────────────────────────────────────┐
│ Frontend Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web UI │ │ Mobile │ │ Desktop │ │
│ │ (React) │ │ App │ │ App │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼──────────────────┘
│ │ │
└──────────────────┴──────────────────┘

┌──────────────────▼──────────────────┐
│ MCP Gateway (Node/TS) │
│ (Existing infrastructure) │
└──────────────────┬──────────────────┘

┌──────────────────▼──────────────────┐
│ memory-mcp-server (Node/TS) │
│ ┌────────────────────────────────┐ │
│ │ Tools: memory.* operations │ │
│ │ Resources: memory:// URIs │ │
│ └────────────────────────────────┘ │
└──────────────────┬──────────────────┘

┌──────────────────▼──────────────────┐
│ memory-engine-service (Python) │
│ ┌────────────────────────────────┐ │
│ │ FastAPI HTTP/gRPC API │ │
│ │ Business Logic Layer │ │
│ │ Multi-tenant Access Control │ │
│ └────────┬────────┬───────┬──────┘ │
└───────────┼────────┼───────┼────────┘
│ │ │
┌─────────────────▼────────┴┐ │ ┌──────────────────────┐
│ JanusGraph (Graph DB) │ ├─────▶ memory-store-documents │
│ - Extensible Nodes │ │ │ - Coordinate Logic │
│ - Relationships │ │ │ - Source Provenance │
│ - Provenance │ │ └──────────────────────┘
└───────────────────────────┘ │

┌─────────▼──────────┐
│ Qdrant │
│ (Vector Store) │
│ - Embeddings │
│ - Semantic Search │
└────────────────────┘

┌─────────▼────────────────────┐
│ JetStream (NATS) │
│ Event Bus for Async Jobs │
└─────────┬────────────────────┘

┌─────────▼────────────────────┐
│ memory-curator-worker │
│ (Python) │
│ ┌────────────────────────┐ │
│ │ LLM Extraction │ │
│ │ Document Processing │ │
│ │ Conversation Curation │ │
│ │ Decay Management │ │
│ └────────────────────────┘ │
└──────────────────────────────┘

Data Flow Examples

1. Agent Context Loading (Pre-conversation)

Agent → MCP memory.query → Engine Service → Qdrant (semantic) + JanusGraph (graph)
→ Ranked MemoryBlocks → Agent prompt context

2. Memory Creation (Post-conversation)

Conversation End → JetStream event → Curator Worker → LLM extraction
→ Engine Service → Atomic write to Graph + Vector → Success

3. Document Ingestion

User uploads doc → Frontend → JetStream event → Curator Worker
→ Download from Drive → Chunk + Extract → Engine Service → Memory blocks

📊 Core Concepts & Data Model

1. Memory Block

The fundamental unit of memory. Represents a single piece of knowledge.

from pydantic import BaseModel, Field
from typing import Optional, Literal, Dict, Any, List
from datetime import datetime
from enum import Enum

class MemoryKind(str, Enum):
FACT = "fact" # Verifiable, stable information
PREFERENCE = "preference" # Stated likes/dislikes, requirements
INSIGHT = "insight" # Agent-discovered patterns
SUMMARY = "summary" # Consolidated information
PROFILE = "profile" # Identity/demographic data
TOOL_RESULT = "tool_result" # Output from tool execution
NOTE = "note" # User-created annotations
INTERACTION = "interaction" # Record of specific interaction

class MemorySource(str, Enum):
CHAT = "chat"
TOOL = "tool"
DOCUMENT = "document"
EVENT = "event"
SYSTEM = "system"
USER_INPUT = "user_input"

class MemoryScores(BaseModel):
"""
Scoring system for memory relevance and quality.

All scores are float [0.0 - 1.0]
"""
salience: float = Field(
ge=0.0, le=1.0,
description="Current relevance to subject. Decays over time without access."
)
stability: float = Field(
ge=0.0, le=1.0,
description="Resistance to invalidation. Facts > Preferences > Insights."
)
confidence: float = Field(
ge=0.0, le=1.0,
description="Source reliability + verification count."
)

class MemoryContent(BaseModel):
"""Content container supporting both natural language and structured data."""
text: str = Field(
description="Natural language summary suitable for LLM context"
)
structured: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional structured representation (JSON-serializable)"
)

class SourceMetadata(BaseModel):
"""Provenance information for the memory."""
origin: MemorySource
tool_name: Optional[str] = None
conversation_id: Optional[str] = None
document_id: Optional[str] = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
source_reference: Optional[Dict[str, Any]] = None # NEW: SourceLocation mapping

class Subject(BaseModel):
"""Entity that the memory is about. Extensible in Platform 2.0."""
type: str # user, org, project, tool, etc.
id: str

class MemoryBlock(BaseModel):
"""
Core memory unit.

Represents a single piece of knowledge with metadata, scores, and provenance.
"""
id: str = Field(description="Unique identifier, format: mem_{uuid}")
tenant_id: str = Field(description="Tenant isolation key")
subject: Subject = Field(description="Primary entity this memory is about")
kind: MemoryKind
source: SourceMetadata
content: MemoryContent
scores: MemoryScores
tags: List[str] = Field(
default_factory=list,
description="Concepts/topics for filtering and grouping"
)
embedding_id: Optional[str] = Field(
default=None,
description="Reference to vector in Qdrant"
)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
accessed_at: datetime = Field(
default_factory=datetime.utcnow,
description="Last time this memory was retrieved"
)
supersedes: Optional[List[str]] = Field(
default=None,
description="IDs of memory blocks this one replaces"
)
version: int = Field(default=1, description="Version number for updates")

2. Memory Packs

Logical groupings of related memory blocks for structured presentation.

from typing import List
from enum import Enum

class PackType(str, Enum):
"""Categories of memory packs for organizational structure."""
BASIC_PROFILE = "basic_profile" # Identity, contact, core attributes
BUSINESS_CONTEXT = "business_context" # Products, services, ICP, market
OPERATIONAL = "operational" # Processes, playbooks, SLAs
FINANCIAL = "financial" # Billing, pricing, contracts
INSIGHTS = "insights" # Patterns, opportunities, risks
PREFERENCES = "preferences" # Communication, tools, style
RELATIONSHIPS = "relationships" # Connections to other entities
HISTORICAL = "historical" # Past interactions, outcomes

class ConsolidationStrategy(str, Enum):
"""How to handle multiple blocks in a pack."""
LATEST = "latest" # Show only most recent
MERGE = "merge" # Combine into single view
VERSIONED = "versioned" # Show history/changelog
ALL = "all" # Display all blocks

class MemoryPack(BaseModel):
"""
Grouped collection of related memory blocks.

Packs provide structured views of knowledge for specific contexts.
"""
pack_id: str = Field(description="Unique pack identifier")
pack_type: PackType
label: str = Field(description="Human-readable name")
description: Optional[str] = Field(
default=None,
description="What this pack represents"
)
subject: Subject = Field(description="Primary entity this pack is about")
blocks: List[MemoryBlock] = Field(description="Ordered memory blocks")
consolidation_strategy: ConsolidationStrategy = Field(
default=ConsolidationStrategy.MERGE
)
last_updated: datetime = Field(default_factory=datetime.utcnow)
total_salience: float = Field(
description="Aggregate salience score across all blocks"
)

Pack Type Definitions

Pack TypePurposeExample Contents
basic_profileIdentity & core attributesName, email, location, timezone, language, role
business_contextMarket positioningProducts, services, ICP, sectors, value props
operationalInternal processesPlaybooks, workflows, SLAs, escalation paths
financialMoney mattersBilling cycle, payment terms, pricing tier, discounts
insightsDiscovered knowledgePatterns, anomalies, risks, opportunities, recommendations
preferencesBehavioral choicesCommunication style, tool preferences, notification settings
relationshipsEntity connectionsReports-to, partners, vendors, customers, projects
historicalInteraction recordPast conversations, outcomes, decisions, issues

3. Graph Schema

JanusGraph stores relationships and structure.

class GraphNodeType(str):
"""Types of vertices in the knowledge graph (Aliases)."""
# Built-in types: USER, ORG, PROJECT, TOOL, CHANNEL, DOCUMENT, TOPIC, MEMORY_BLOCK

class GraphEdgeType(str):
"""Types of relationships between graph nodes (Aliases)."""
# Built-in types: ABOUT, PREFERS, DERIVED_FROM, MENTIONS, BELONGS_TO, SUPERSEDES

class GraphNode(BaseModel):
"""Vertex in the knowledge graph."""
vertex_id: str = Field(description="JanusGraph vertex ID")
type: GraphNodeType
tenant_id: str = Field(description="Tenant isolation")
attributes: Dict[str, Any] = Field(
description="Type-specific properties"
)
created_at: datetime = Field(default_factory=datetime.utcnow)

class GraphEdge(BaseModel):
"""Edge connecting two vertices."""
edge_id: Optional[str] = Field(default=None, description="JanusGraph edge ID")
from_vertex: str = Field(description="Source vertex ID")
to_vertex: str = Field(description="Target vertex ID")
edge_type: GraphEdgeType
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Edge-specific properties (weight, timestamp, etc.)"
)
created_at: datetime = Field(default_factory=datetime.utcnow)

Graph Node Property Schemas

# Node-specific attribute schemas

USER_SCHEMA = {
"id": "string (external user ID)",
"tenant_id": "string",
"name": "string",
"email": "string",
"role": "string (optional)",
"created_at": "datetime"
}

ORG_SCHEMA = {
"id": "string (external org ID)",
"tenant_id": "string",
"name": "string",
"domain": "string (optional)",
"industry": "string (optional)",
"created_at": "datetime"
}

MEMORY_BLOCK_SCHEMA = {
"id": "string (mem_*)",
"tenant_id": "string",
"kind": "string (MemoryKind)",
"salience_score": "float",
"created_at": "datetime",
"updated_at": "datetime"
}

TOPIC_SCHEMA = {
"id": "string (topic_*)",
"tenant_id": "string",
"name": "string",
"category": "string (optional)",
"created_at": "datetime"
}

DOCUMENT_SCHEMA = {
"id": "string (doc_*)",
"tenant_id": "string",
"source_tool": "string (google_drive, notion, etc.)",
"external_id": "string (source system ID)",
"title": "string",
"created_at": "datetime"
}

4. Memory Scoring System

Scores guide retrieval relevance and lifecycle management.

Score Definitions

Salience [0.0 - 1.0]

  • Purpose: Current relevance to the subject
  • Factors: Recency, access frequency, topic alignment
  • Decay: Exponential over time without access
  • Boost: Increases on retrieval

Stability [0.0 - 1.0]

  • Purpose: Resistance to invalidation
  • Range:
    • Facts: 0.8 - 1.0 (highly stable)
    • Preferences: 0.5 - 0.8 (moderately stable)
    • Insights: 0.3 - 0.6 (may evolve)
    • Summaries: 0.2 - 0.5 (frequently updated)

Confidence [0.0 - 1.0]

  • Purpose: Trust in accuracy
  • Factors:
    • Source reliability (document > conversation > inference)
    • Verification count (multiple sources boost confidence)
    • LLM extraction confidence score

Decay Algorithm

import math
from datetime import datetime, timedelta

def calculate_salience_decay(
base_salience: float,
last_accessed: datetime,
memory_kind: MemoryKind
) -> float:
"""
Apply time-based decay to salience score.

Decay rates (λ) by kind:
- facts: 0.01 (slow decay)
- preferences: 0.05 (medium decay)
- insights: 0.1 (fast decay)
- summaries: 0.15 (very fast decay)
"""
decay_rates = {
MemoryKind.FACT: 0.01,
MemoryKind.PROFILE: 0.01,
MemoryKind.PREFERENCE: 0.05,
MemoryKind.TOOL_RESULT: 0.07,
MemoryKind.INSIGHT: 0.1,
MemoryKind.SUMMARY: 0.15,
MemoryKind.NOTE: 0.03,
MemoryKind.INTERACTION: 0.12
}

lambda_decay = decay_rates.get(memory_kind, 0.05)
days_since_access = (datetime.utcnow() - last_accessed).days

# Exponential decay: salience(t) = base * e^(-λ * t)
decayed_salience = base_salience * math.exp(-lambda_decay * days_since_access)

return max(0.0, min(1.0, decayed_salience))

def boost_salience_on_access(current_salience: float) -> float:
"""Increase salience when memory is accessed."""
boost = 0.1
return min(1.0, current_salience + boost)

def reduce_confidence_on_contradiction(
current_confidence: float,
contradiction_severity: float # 0.0 - 1.0
) -> float:
"""Reduce confidence when contradicting information appears."""
penalty = 0.3 * contradiction_severity
return max(0.0, current_confidence - penalty)

def increase_confidence_on_verification(current_confidence: float) -> float:
"""Boost confidence when memory is verified by another source."""
boost = 0.2
return min(1.0, current_confidence + boost)

Recalculation Triggers

TriggerActionComponent
Memory accessedBoost salience by +0.1Engine Service (on read)
Daily decayApply exponential decayCurator Worker (cron job)
Contradiction detectedReduce confidence by 0.3Curator Worker (LLM analysis)
Verification from new sourceIncrease confidence by +0.2Curator Worker (cross-reference)
Memory supersededSet salience to 0.0 on old blockEngine Service (on write)

🗄️ Storage Layer Architecture

JanusGraph Configuration

Schema Definition

// JanusGraph schema setup (Gremlin script)

// Vertex labels
mgmt = graph.openManagement()

// Define vertex labels
user = mgmt.makeVertexLabel('User').make()
org = mgmt.makeVertexLabel('Org').make()
project = mgmt.makeVertexLabel('Project').make()
tool = mgmt.makeVertexLabel('Tool').make()
channel = mgmt.makeVertexLabel('Channel').make()
document = mgmt.makeVertexLabel('Document').make()
topic = mgmt.makeVertexLabel('Topic').make()
memoryBlock = mgmt.makeVertexLabel('MemoryBlock').make()
conversation = mgmt.makeVertexLabel('Conversation').make()
event = mgmt.makeVertexLabel('Event').make()

// Property keys
tenantId = mgmt.makePropertyKey('tenant_id').dataType(String.class).make()
externalId = mgmt.makePropertyKey('id').dataType(String.class).make()
name = mgmt.makePropertyKey('name').dataType(String.class).make()
email = mgmt.makePropertyKey('email').dataType(String.class).make()
kind = mgmt.makePropertyKey('kind').dataType(String.class).make()
salienceScore = mgmt.makePropertyKey('salience_score').dataType(Float.class).make()
createdAt = mgmt.makePropertyKey('created_at').dataType(Long.class).make()
updatedAt = mgmt.makePropertyKey('updated_at').dataType(Long.class).make()

// Composite indices for tenant isolation (CRITICAL)
mgmt.buildIndex('byTenantAndType', Vertex.class)
.addKey(tenantId)
.buildCompositeIndex()

mgmt.buildIndex('userByTenantAndId', Vertex.class)
.addKey(tenantId)
.addKey(externalId)
.indexOnly(user)
.unique()
.buildCompositeIndex()

mgmt.buildIndex('orgByTenantAndId', Vertex.class)
.addKey(tenantId)
.addKey(externalId)
.indexOnly(org)
.unique()
.buildCompositeIndex()

mgmt.buildIndex('memoryBlockByTenant', Vertex.class)
.addKey(tenantId)
.indexOnly(memoryBlock)
.buildCompositeIndex()

// Mixed index for text search
mgmt.buildIndex('memoryBlockFullText', Vertex.class)
.addKey(name, Mapping.TEXT.asParameter())
.indexOnly(memoryBlock)
.buildMixedIndex("search")

// Vertex-centric indices for fast edge traversal
mgmt.buildEdgeIndex(mgmt.getEdgeLabel('ABOUT'), 'aboutByTimestamp', Direction.OUT, Order.desc, createdAt)
mgmt.buildEdgeIndex(mgmt.getEdgeLabel('DERIVED_FROM'), 'derivedFromByTimestamp', Direction.OUT, Order.desc, createdAt)

mgmt.commit()

Required Indices

Index TypeNameKeysPurpose
CompositebyTenantAndTypetenant_idFast tenant filtering
CompositeuserByTenantAndIdtenant_id, idUnique user lookup
CompositeorgByTenantAndIdtenant_id, idUnique org lookup
CompositememoryBlockByTenanttenant_idList all blocks for tenant
MixedmemoryBlockFullTextname (TEXT)Keyword search in content
Vertex-centricaboutByTimestampOUT edges, timestamp DESCRecent topic associations
Vertex-centricderivedFromByTimestampOUT edges, timestamp DESCSource provenance

Standard Traversal Patterns

# 1. Subject Context (get all memories about an entity)
# Gremlin: g.V().has('User', 'tenant_id', tenant_id).has('id', user_id).in('ABOUT').limit(100)

# 2. Topic Exploration (find high-salience memories about a topic)
# Gremlin: g.V().has('Topic', 'tenant_id', tenant_id).has('name', 'pricing').in('ABOUT').has('salience_score', gt(0.7))

# 3. Source Provenance (trace where a memory came from)
# Gremlin: g.V(memory_block_id).out('DERIVED_FROM').path()

# 4. Related Topics (find topics connected to a memory)
# Gremlin: g.V(memory_block_id).out('ABOUT').values('name')

# 5. Entity Relationships (map connections)
# Gremlin: g.V().has('Org', 'id', org_id).both().path().by('name')

# 6. Neighborhood Query (2-hop context)
# Gremlin: g.V(subject_id).both().dedup().both().dedup().limit(50)

Qdrant Configuration & Embedding Strategy

Collection Schema

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PayloadSchemaType

# Collection naming: single collection with tenant filtering (recommended)
COLLECTION_NAME = "memories"

# Vector configuration
VECTOR_DIMENSIONS = 1536 # OpenAI text-embedding-3-small
# Alternative: 384 for all-MiniLM-L6-v2

def setup_qdrant_collection(client: QdrantClient):
"""Initialize Qdrant collection with proper schema."""

client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config=VectorParams(
size=VECTOR_DIMENSIONS,
distance=Distance.COSINE
)
)

# Create payload indices for fast filtering
client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="tenant_id",
field_schema=PayloadSchemaType.KEYWORD
)

client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="subject.type",
field_schema=PayloadSchemaType.KEYWORD
)

client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="subject.id",
field_schema=PayloadSchemaType.KEYWORD
)

client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="kind",
field_schema=PayloadSchemaType.KEYWORD
)

client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="tags",
field_schema=PayloadSchemaType.KEYWORD
)

client.create_payload_index(
collection_name=COLLECTION_NAME,
field_name="created_at",
field_schema=PayloadSchemaType.DATETIME
)

Embedding Model Selection

Recommended Production Setup:

  • Primary: OpenAI text-embedding-3-small (1536 dimensions)
    • Cost-effective: $0.02 per 1M tokens
    • High quality
    • Fast inference
  • Fallback/Local: all-MiniLM-L6-v2 (384 dimensions)
    • Open source (Apache 2.0)
    • Runs locally
    • Good performance for English
from enum import Enum

class EmbeddingModel(str, Enum):
OPENAI_SMALL = "text-embedding-3-small"
OPENAI_LARGE = "text-embedding-3-large"
MINILM = "all-MiniLM-L6-v2"
MPNET = "all-mpnet-base-v2"

EMBEDDING_CONFIG = {
EmbeddingModel.OPENAI_SMALL: {
"dimensions": 1536,
"provider": "openai",
"max_tokens": 8191
},
EmbeddingModel.OPENAI_LARGE: {
"dimensions": 3072,
"provider": "openai",
"max_tokens": 8191
},
EmbeddingModel.MINILM: {
"dimensions": 384,
"provider": "sentence-transformers",
"max_tokens": 256
},
EmbeddingModel.MPNET: {
"dimensions": 768,
"provider": "sentence-transformers",
"max_tokens": 384
}
}

🏢 Multi-Tenancy & Security

Tenant Isolation Strategy

Critical Principle: Every query, write, and read operation MUST enforce tenant boundaries at the platform level.

JanusGraph Tenant Isolation

class TenantIsolatedGraphRepository:
"""
All graph operations automatically filter by tenant_id.
"""

def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
self.client = get_gremlin_client()

async def create_vertex(
self,
vertex_type: GraphNodeType,
attributes: Dict[str, Any]
) -> str:
"""Create vertex with automatic tenant_id injection."""

# CRITICAL: Always add tenant_id
attributes["tenant_id"] = self.tenant_id

# Build Gremlin query
g = self.client.g
vertex = await g.addV(vertex_type.value).property_list(attributes).next()

return vertex.id

async def create_edge(
self,
from_vertex_id: str,
to_vertex_id: str,
edge_type: GraphEdgeType,
metadata: Dict[str, Any] = None
) -> str:
"""
Create edge with tenant validation.

CRITICAL: Verify both vertices belong to same tenant.
"""
g = self.client.g

# Validate tenant ownership of both vertices
from_vertex = await g.V(from_vertex_id).has("tenant_id", self.tenant_id).next_or_none()
to_vertex = await g.V(to_vertex_id).has("tenant_id", self.tenant_id).next_or_none()

if not from_vertex or not to_vertex:
raise TenantIsolationViolation(
f"Cannot create edge: vertices must belong to tenant {self.tenant_id}"
)

# Create edge
edge = await g.V(from_vertex_id) \
.addE(edge_type.value) \
.to(g.V(to_vertex_id)) \
.property_list(metadata or {}) \
.next()

return edge.id

Qdrant Tenant Isolation

from qdrant_client.models import Filter, FieldCondition, MatchValue

class TenantIsolatedVectorRepository:
"""
All vector operations automatically filter by tenant_id.
"""

def __init__(self, tenant_id: str):
self.tenant_id = tenant_id
self.client = get_qdrant_client()

def _build_tenant_filter(
self,
additional_filters: Optional[List[FieldCondition]] = None
) -> Filter:
"""Build Qdrant filter that always includes tenant_id."""

conditions = [
FieldCondition(
key="tenant_id",
match=MatchValue(value=self.tenant_id)
)
]

if additional_filters:
conditions.extend(additional_filters)

return Filter(must=conditions)

async def search(
self,
query_vector: List[float],
filters: Optional[Dict[str, Any]] = None,
limit: int = 10
) -> List[ScoredPoint]:
"""Search with automatic tenant isolation."""

# CRITICAL: Always include tenant filter
tenant_filter = self._build_tenant_filter(filter_conditions)

results = await self.client.search(
collection_name=COLLECTION_NAME,
query_vector=query_vector,
query_filter=tenant_filter,
limit=limit
)

return results

🔧 Repository Definitions

Language Decision: Python vs Node

Recommendation:

  • Python: Core memory engine, curator workers, data stores
  • Node/TypeScript: MCP server, SDKs, frontend integration

Rationale:

  • Python has superior ML/NLP ecosystem (transformers, langchain)
  • Gremlin and Qdrant Python SDKs are most feature-complete
  • LLM orchestration tools are Python-native
  • MCP SDK is TypeScript-first
  • Keeps MCP gateway in your existing Node infrastructure

1️⃣ memory-domain

Purpose: Canonical data contracts and business rules.

Language: Python 3.11+

Structure:

memory-domain/
├── pyproject.toml
├── README.md
├── src/
│ └── memory_domain/
│ ├── __init__.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── memory_block.py # MemoryBlock, MemoryKind, etc.
│ │ ├── memory_pack.py # MemoryPack, PackType
│ │ ├── subject.py # Subject model
│ │ ├── graph.py # GraphNode, GraphEdge
│ │ └── scores.py # Scoring algorithms
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── json_schemas.py # JSON Schema exports
│ │ └── openapi_schemas.py # OpenAPI spec generation
│ ├── validation/
│ │ ├── __init__.py
│ │ └── validators.py # Cross-model validation
│ └── errors/
│ ├── __init__.py
│ └── exceptions.py # Custom exceptions
├── tests/
│ ├── test_models.py
│ ├── test_validation.py
│ └── test_scores.py
└── docs/
└── models.md

Key Dependencies:

[tool.poetry.dependencies]
python = "^3.11"
pydantic = "^2.5"
pydantic-settings = "^2.1"

2️⃣ memory-store-janusgraph

Purpose: Graph persistence layer implementation.

Language: Python 3.11+

Key Dependencies:

[tool.poetry.dependencies]
python = "^3.11"
gremlinpython = "^3.7"
memory-domain = {path = "../memory-domain", develop = true}
asyncio = "^3.4"

Critical Security Tests:

# tests/test_tenant_isolation.py

async def test_cannot_access_other_tenant_vertices():
"""CRITICAL: Verify tenant boundaries are enforced."""

# Create vertex in tenant A
repo_a = TenantIsolatedGraphRepository("tenant_a")
vertex_a = await repo_a.create_vertex(
GraphNodeType.USER,
{"id": "user_123", "name": "Alice"}
)

# Attempt to access from tenant B
repo_b = TenantIsolatedGraphRepository("tenant_b")
vertex_b = await repo_b.find_vertex(GraphNodeType.USER, "user_123")

# Should not find vertex from other tenant
assert vertex_b is None

async def test_cannot_create_cross_tenant_edge():
"""CRITICAL: Verify edges cannot span tenants."""

repo_a = TenantIsolatedGraphRepository("tenant_a")
repo_b = TenantIsolatedGraphRepository("tenant_b")

vertex_a = await repo_a.create_vertex(GraphNodeType.USER, {"id": "user_a"})
vertex_b = await repo_b.create_vertex(GraphNodeType.USER, {"id": "user_b"})

# Attempt to create edge from A to B using tenant A repo
with pytest.raises(TenantIsolationViolation):
await repo_a.create_edge(
from_vertex_id=vertex_a,
to_vertex_id=vertex_b,
edge_type=GraphEdgeType.RELATED_TO
)

3️⃣ memory-store-qdrant

Purpose: Semantic storage & retrieval via Qdrant.

Language: Python 3.11+

Key Dependencies:

[tool.poetry.dependencies]
python = "^3.11"
qdrant-client = "^1.7"
memory-domain = {path = "../memory-domain", develop = true}
openai = "^1.6"
sentence-transformers = "^2.3"

4️⃣ memory-engine-service

Purpose: Core HTTP API and business logic.

Language: Python 3.11+ (FastAPI)

Key Endpoints:

# POST /memory - Create/update memory blocks
@app.post("/memory", response_model=MemoryCreateResponse)
async def create_memory(
request: MemoryCreateRequest,
tenant_id: str = Depends(extract_tenant_from_request)
):
"""
Create or update memory blocks with automatic graph and vector sync.
"""

# POST /memory/query - Semantic search
@app.post("/memory/query", response_model=MemoryQueryResponse)
async def query_memory(
request: MemoryQueryRequest,
tenant_id: str = Depends(extract_tenant_from_request)
):
"""
Hybrid semantic + graph query.

Request body:
{
"subject": {"type": "org", "id": "org_123"},
"query_text": "What are our payment terms?",
"filters": {
"kind": ["fact", "insight"],
"tags_any": ["billing", "payment"],
"time_range": {"from_days_ago": 365}
},
"limit": 20,
"include_graph_context": true
}
"""

# GET /memory/packs - Retrieve structured packs
@app.get("/memory/packs", response_model=List[MemoryPack])
async def get_packs(
subject_type: str,
subject_id: str,
pack_types: Optional[List[PackType]] = Query(None),
tenant_id: str = Depends(extract_tenant_from_request)
):
"""Get memory packs for a subject."""

# POST /memory/graph/neighbors - Graph exploration
@app.post("/memory/graph/neighbors", response_model=GraphNeighborsResponse)
async def get_graph_neighbors(
request: GraphNeighborsRequest,
tenant_id: str = Depends(extract_tenant_from_request)
):
"""Explore graph neighborhood around a node."""

# GET /health
@app.get("/health")
async def health_check():
"""Health check endpoint for load balancers."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat()
}

5️⃣ memory-mcp-server

Purpose: Expose memory engine through MCP protocol.

Language: Node/TypeScript (MCP SDK is TypeScript-first)

Key Dependencies:

{
"dependencies": {
"@modelcontextprotocol/sdk": "^0.5.0",
"axios": "^1.6.0",
"zod": "^3.22.0"
}
}

MCP Tools:

  • memory.upsert_block
  • memory.query
  • memory.link_blocks
  • memory.graph_neighbors

MCP Resources:

  • memory://subjects/\{subject\}/snapshot
  • memory://blocks/\{id\}
  • memory://packs/\{subject\}

6️⃣ memory-curator-worker

Purpose: LLM-powered knowledge extraction and curation.

Language: Python 3.11+

Event Handlers:

  • conversation_curation - Extract from chat transcripts
  • document_ingestion - Process Google Drive/Notion documents
  • tool_event - Handle tool result memorization
  • salience_decay - Daily maintenance of score decay

Curation Prompt Template:

You are a memory curator for an AI assistant. Your job is to extract structured, memorable knowledge from conversations.

**Subject:** \{subject\}

**Existing Knowledge About This Subject:**
{context_memories}

**New Conversation:**
{transcript}

**Your Task:**
Extract memories that are:
1. **Facts** - Verifiable, stable information that doesn't change often
2. **Preferences** - Stated likes, dislikes, requirements, or behavioral patterns
3. **Insights** - Patterns, opportunities, risks, or discoveries you infer

**Output Format (JSON):**
{
"blocks": [
{
"kind": "fact|preference|insight",
"content": {
"text": "Natural language summary suitable for LLM context",
"structured": {}
},
"tags": ["list", "of", "relevant", "concepts"],
"scores": {
"salience": 0.0-1.0,
"stability": 0.0-1.0,
"confidence": 0.0-1.0
},
"reasoning": "Why this is worth remembering"
}
]
}

**Guidelines:**
- Extract 3-10 blocks per conversation (focus on quality over quantity)
- Avoid duplicating existing knowledge unless it's an update
- Facts should be specific and verifiable
- Use clear, concise language that an LLM can easily understand

7️⃣ memory-graph-visualizer

Purpose: Admin/developer graph exploration UI.

Approach: Wrapper around JanusGraph-Visualizer or Graphexp

Implementation: Docker + nginx with basic auth

8️⃣ memory-sdk-js

Purpose: TypeScript/Node client for developers and frontend.

API Design:

import { MemoryClient } from '@your-org/memory-sdk';

const client = new MemoryClient({
apiUrl: 'http://localhost:8000',
apiKey: process.env.MEMORY_API_KEY
});

// Write a fact
await client.writeFact({
subject: { type: 'org', id: 'org_acme' },
text: 'Primary contact is John Doe (john@acme.com)',
tags: ['contact', 'primary']
});

// Search memories
const results = await client.searchMemories({
subject: { type: 'org', id: 'org_acme' },
query: 'Who is the primary contact?',
limit: 10
});

// Get working set for agent
const workingSet = await client.getWorkingSet({
subject: { type: 'user', id: 'user_123' },
context: 'customer support conversation',
maxBlocks: 30
});

9️⃣ memory-sdk-python

Purpose: Python client (mirrors JS SDK functionality).

Language: Python 3.11+

🔟 memory-infra

Purpose: Complete stack deployment orchestration.

Contents:

  • Docker Compose for local development
  • Kubernetes manifests for production
  • Helm charts
  • Monitoring configs (Prometheus, Grafana)
  • Quick setup scripts

Quick Start Script:

#!/bin/bash
# scripts/setup-local.sh

set -e

echo "🚀 Setting up Memory Platform local environment..."

# Start infrastructure
docker-compose up -d cassandra elasticsearch qdrant nats redis postgres

# Wait for services
sleep 30

# Start JanusGraph
docker-compose up -d janusgraph
sleep 20

# Initialize schemas
docker-compose exec janusgraph /opt/janusgraph/bin/gremlin.sh < ../memory-store-janusgraph/schema/setup.groovy

# Start application services
docker-compose up -d memory-engine memory-mcp-server memory-curator-worker memory-visualizer

# Start monitoring
docker-compose up -d prometheus grafana

echo "✅ Memory Platform is running!"
echo " Memory Engine API: http://localhost:8000"
echo " MCP Server: http://localhost:3000"
echo " Graph Visualizer: http://localhost:3001"
echo " Grafana: http://localhost:3002"

📊 Monitoring & Observability

Key Metrics

from prometheus_client import Counter, Histogram, Gauge

# Memory operations
memory_blocks_created = Counter(
'memory_blocks_created_total',
'Total number of memory blocks created',
['tenant_id', 'kind']
)

memory_query_duration = Histogram(
'memory_query_duration_seconds',
'Time spent executing memory queries',
['operation', 'tenant_id'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)

curator_events_processed = Counter(
'curator_events_processed_total',
'Total events processed by curator',
['event_type', 'status']
)

memory_sync_lag = Gauge(
'memory_sync_lag_seconds',
'Time lag between graph and vector writes'
)

Critical Alerts

# prometheus-alerts.yml

groups:
- name: memory_platform
rules:
- alert: HighQueryLatency
expr: histogram_quantile(0.99, memory_query_duration_seconds_bucket) > 2
for: 5m
labels:
severity: warning

- alert: GraphVectorInconsistency
expr: rate(memory_sync_lag_seconds[5m]) > 60
for: 2m
labels:
severity: critical

- alert: CuratorWorkerLag
expr: nats_consumer_num_pending{consumer="memory-curator-worker"} > 1000
for: 10m
labels:
severity: warning

✅ Implementation Priority

Phase 1: Foundation (Weeks 1-3)

  1. memory-domain - Complete models
  2. memory-store-janusgraph - Graph CRUD + schema
  3. memory-store-qdrant - Vector CRUD + embedding
  4. memory-engine-service - Basic write/query API

Phase 2: Intelligence (Weeks 4-5)

  1. memory-curator-worker - Conversation curation
  2. memory-mcp-server - MCP tools
  3. memory-infra - Docker Compose setup

Phase 3: Production (Weeks 6-8)

  1. memory-sdk-js - Client library
  2. Document ingestion in curator
  3. Frontend integration

Phase 4: Polish (Weeks 9-10)

  1. memory-graph-visualizer - Admin UI
  2. Advanced queries (packs, graph)
  3. Performance optimization
  4. memory-sdk-python - Full parity

📝 Final Checklist

Before Starting Development

  • Review complete specification
  • Clarify any ambiguities
  • Set up development environment
  • Create all 11 repository scaffolds
  • Configure CI/CD pipelines

Per Repository

  • Implement core functionality
  • Write comprehensive tests
  • Add README with setup instructions
  • Add Dockerfile
  • Configure linting
  • Implement logging & metrics

Integration

  • Test full stack in Docker Compose
  • Verify tenant isolation (CRITICAL)
  • Run end-to-end scenarios
  • Load test
  • Security audit

Production Readiness

  • Kubernetes manifests
  • Monitoring dashboards
  • Alert rules
  • Backup procedures
  • Disaster recovery plan

📄 Document Control

Version: 2.0
Status: Ready for Implementation
Last Updated: December 24, 2025

Change Log:

  • v2.0 (2025-12-24): Complete specification with all gaps filled
  • v1.0 (2025-12-20): Initial draft

END OF SPECIFICATION