Skip to main content

memory-store-qdrant

Vector/Semantic Storage Layer

PropertyValue
Repositorymemory-store-qdrant
LanguagePython 3.11+
TypeLibrary (pip installable)
DatabaseQdrant 1.7+

Purpose

Provides semantic storage and retrieval capabilities for the Memory Platform:

  • Vector storage with tenant isolation
  • Semantic similarity search
  • Embedding generation (OpenAI + local fallback)
  • Hybrid search with metadata filtering

Repository Structure

memory-store-qdrant/
├── pyproject.toml
├── README.md
├── CHANGELOG.md
├── src/
│ └── memory_store_qdrant/
│ ├── __init__.py
│ ├── py.typed
│ │
│ ├── client/
│ │ ├── __init__.py
│ │ ├── connection.py # Qdrant client management
│ │ ├── health.py # Health checks
│ │ └── collection.py # Collection management
│ │
│ ├── embedding/
│ │ ├── __init__.py
│ │ ├── base.py # Abstract embedding interface
│ │ ├── openai.py # OpenAI embeddings
│ │ ├── local.py # Sentence-transformers (fallback)
│ │ ├── cache.py # Embedding cache (optional)
│ │ └── factory.py # Embedding provider factory
│ │
│ ├── repository/
│ │ ├── __init__.py
│ │ ├── base.py # Base with tenant isolation
│ │ ├── vector.py # Vector CRUD operations
│ │ └── search.py # Search operations
│ │
│ └── config.py # Configuration

├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_embedding/
│ │ ├── test_openai.py
│ │ └── test_local.py
│ ├── test_repository/
│ │ ├── test_vector.py
│ │ ├── test_search.py
│ │ └── test_tenant_isolation.py
│ └── test_integration/
│ └── test_full_flow.py

├── docker/
│ ├── Dockerfile
│ └── docker-compose.test.yml

└── docs/
├── embedding.md
└── search.md

Dependencies

# pyproject.toml
[project]
name = "memory-store-qdrant"
version = "0.1.0"
description = "Qdrant vector storage for Memory Platform"
requires-python = ">=3.11"
dependencies = [
"qdrant-client>=1.7,\<2.0",

"memory-domain>=0.1.0",
"pydantic-settings>=2.1",
"openai>=1.6",
"structlog>=23.2",
"tenacity>=8.2", # Retry logic
]

[project.optional-dependencies]
local = [
"sentence-transformers>=2.3",
"torch>=2.1",
]
dev = [
"pytest>=7.4",
"pytest-asyncio>=0.21",
"pytest-cov>=4.1",
"testcontainers>=3.7",
"ruff>=0.1",
"mypy>=1.7",
]

Configuration

# src/memory_store_qdrant/config.py
from enum import Enum
from pydantic_settings import BaseSettings
from pydantic import Field

class EmbeddingProvider(str, Enum):
OPENAI = "openai"
LOCAL = "local"

class EmbeddingModel(str, Enum):
# OpenAI models
OPENAI_SMALL = "text-embedding-3-small"
OPENAI_LARGE = "text-embedding-3-large"
# Local models
MINILM = "all-MiniLM-L6-v2"
MPNET = "all-mpnet-base-v2"

EMBEDDING_DIMENSIONS = {
EmbeddingModel.OPENAI_SMALL: 1536,
EmbeddingModel.OPENAI_LARGE: 3072,
EmbeddingModel.MINILM: 384,
EmbeddingModel.MPNET: 768,
}

class QdrantConfig(BaseSettings):
"""Qdrant connection configuration."""

model_config = {"env_prefix": "QDRANT_"}

host: str = Field(default="localhost")
port: int = Field(default=6333)
grpc_port: int = Field(default=6334)

# Use gRPC for better performance
prefer_grpc: bool = Field(default=True)

# Collection settings
collection_name: str = Field(default="memories")

# TLS
https: bool = Field(default=False)

# Authentication
api_key: str | None = Field(default=None)

# Timeout
timeout_seconds: int = Field(default=30)


class EmbeddingConfig(BaseSettings):
"""Embedding generation configuration."""

model_config = {"env_prefix": "EMBEDDING_"}

provider: EmbeddingProvider = Field(default=EmbeddingProvider.OPENAI)
model: EmbeddingModel = Field(default=EmbeddingModel.OPENAI_SMALL)

# OpenAI specific
openai_api_key: str | None = Field(default=None, alias="OPENAI_API_KEY")
openai_org_id: str | None = Field(default=None)

# Retry settings
max_retries: int = Field(default=3)
retry_delay_seconds: float = Field(default=1.0)

# Batching
batch_size: int = Field(default=100)

# Caching
cache_enabled: bool = Field(default=True)
cache_ttl_seconds: int = Field(default=3600)

@property
def dimensions(self) -> int:
return EMBEDDING_DIMENSIONS[self.model]

Embedding Generation

# src/memory_store_qdrant/embedding/base.py
from abc import ABC, abstractmethod
from typing import List

class EmbeddingProvider(ABC):
"""Abstract base class for embedding providers."""

@property
@abstractmethod
def dimensions(self) -> int:
"""Return the dimension of generated embeddings."""
pass

@abstractmethod
async def embed_text(self, text: str) -> List[float]:
"""Generate embedding for a single text."""
pass

@abstractmethod
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple texts."""
pass
# src/memory_store_qdrant/embedding/openai.py
import structlog
from typing import List
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

from memory_store_qdrant.config import EmbeddingConfig, EmbeddingModel
from memory_store_qdrant.embedding.base import EmbeddingProvider as BaseProvider
from memory_domain import EmbeddingGenerationError

logger = structlog.get_logger()

class OpenAIEmbeddingProvider(BaseProvider):
"""OpenAI embedding provider."""

def __init__(self, config: EmbeddingConfig):
self.config = config
self.client = AsyncOpenAI(
api_key=config.openai_api_key,
organization=config.openai_org_id,
)
self._model = config.model.value

@property
def dimensions(self) -> int:
return self.config.dimensions

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True
)
async def embed_text(self, text: str) -> List[float]:
"""Generate embedding for a single text."""
try:
response = await self.client.embeddings.create(
model=self._model,
input=text,
encoding_format="float"
)
return response.data[0].embedding
except Exception as e:
logger.error("openai_embedding_failed", error=str(e))
raise EmbeddingGenerationError(
f"Failed to generate embedding: {e}",
original_error=e
)

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True
)
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple texts."""
if not texts:
return []

try:
# Process in batches
all_embeddings = []
batch_size = self.config.batch_size

for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = await self.client.embeddings.create(
model=self._model,
input=batch,
encoding_format="float"
)

# Sort by index to maintain order
sorted_data = sorted(response.data, key=lambda x: x.index)
all_embeddings.extend([d.embedding for d in sorted_data])

return all_embeddings

except Exception as e:
logger.error("openai_batch_embedding_failed", error=str(e), count=len(texts))
raise EmbeddingGenerationError(
f"Failed to generate batch embeddings: {e}",
original_error=e
)
# src/memory_store_qdrant/embedding/local.py
from typing import List
import structlog

from memory_store_qdrant.config import EmbeddingConfig
from memory_store_qdrant.embedding.base import EmbeddingProvider as BaseProvider

logger = structlog.get_logger()

class LocalEmbeddingProvider(BaseProvider):
"""
Local embedding provider using sentence-transformers.

Requires: pip install memory-store-qdrant[local]
"""

def __init__(self, config: EmbeddingConfig):
self.config = config
self._model = None

@property
def model(self):
"""Lazy load the model."""
if self._model is None:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.config.model.value)
logger.info("local_model_loaded", model=self.config.model.value)
return self._model

@property
def dimensions(self) -> int:
return self.config.dimensions

async def embed_text(self, text: str) -> List[float]:
"""Generate embedding for a single text."""
embedding = self.model.encode(text, convert_to_numpy=True)
return embedding.tolist()

async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple texts."""
if not texts:
return []

embeddings = self.model.encode(
texts,
convert_to_numpy=True,
batch_size=self.config.batch_size
)
return embeddings.tolist()
# src/memory_store_qdrant/embedding/factory.py
from memory_store_qdrant.config import EmbeddingConfig, EmbeddingProvider as ProviderEnum
from memory_store_qdrant.embedding.base import EmbeddingProvider
from memory_store_qdrant.embedding.openai import OpenAIEmbeddingProvider
from memory_store_qdrant.embedding.local import LocalEmbeddingProvider

def create_embedding_provider(config: EmbeddingConfig | None = None) -> EmbeddingProvider:
"""Factory function to create embedding provider."""
if config is None:
config = EmbeddingConfig()

if config.provider == ProviderEnum.OPENAI:
return OpenAIEmbeddingProvider(config)
elif config.provider == ProviderEnum.LOCAL:
return LocalEmbeddingProvider(config)
else:
raise ValueError(f"Unknown embedding provider: {config.provider}")

Collection Management

# src/memory_store_qdrant/client/collection.py
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
VectorParams,
PayloadSchemaType,
CreateAliasOperation,
AliasOperations,
)
import structlog

from memory_store_qdrant.config import QdrantConfig, EmbeddingConfig

logger = structlog.get_logger()

async def setup_collection(
client: QdrantClient,
qdrant_config: QdrantConfig,
embedding_config: EmbeddingConfig
) -> None:
"""
Initialize Qdrant collection with proper schema.

Creates collection and all required payload indices.
"""
collection_name = qdrant_config.collection_name

# Check if collection exists
collections = await client.get_collections()
exists = any(c.name == collection_name for c in collections.collections)

if exists:
logger.info("collection_exists", name=collection_name)
return

# Create collection
await client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=embedding_config.dimensions,
distance=Distance.COSINE,
),
# Enable on-disk storage for large collections
on_disk_payload=True,
)

logger.info("collection_created", name=collection_name)

# Create payload indices for filtering
await _create_payload_indices(client, collection_name)


async def _create_payload_indices(client: QdrantClient, collection_name: str) -> None:
"""Create indices on payload fields for fast filtering."""

indices = [
# CRITICAL: Tenant isolation index
("tenant_id", PayloadSchemaType.KEYWORD),

# Subject filtering
("subject.type", PayloadSchemaType.KEYWORD),
("subject.id", PayloadSchemaType.KEYWORD),

# Memory metadata
("kind", PayloadSchemaType.KEYWORD),
("tags", PayloadSchemaType.KEYWORD),

# Timestamps for range queries
("created_at", PayloadSchemaType.DATETIME),
("updated_at", PayloadSchemaType.DATETIME),
("accessed_at", PayloadSchemaType.DATETIME),

# Scores for filtering
("scores.salience", PayloadSchemaType.FLOAT),
("scores.confidence", PayloadSchemaType.FLOAT),
]

for field_name, field_type in indices:
await client.create_payload_index(
collection_name=collection_name,
field_name=field_name,
field_schema=field_type,
)
logger.debug("payload_index_created", field=field_name, type=field_type)

logger.info("payload_indices_created", count=len(indices))

Tenant-Isolated Vector Repository

# src/memory_store_qdrant/repository/base.py
from typing import List, Optional
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
import structlog

logger = structlog.get_logger()

class TenantIsolatedVectorRepository:
"""
Base repository with automatic tenant isolation for Qdrant operations.

CRITICAL: All queries automatically include tenant_id filter.
"""

def __init__(
self,
tenant_id: str,
client: QdrantClient,
collection_name: str
):
if not tenant_id or not tenant_id.strip():
raise ValueError("tenant_id is required")

self.tenant_id = tenant_id
self.client = client
self.collection_name = collection_name
self._log = logger.bind(tenant_id=tenant_id)

def _build_tenant_filter(
self,
additional_conditions: Optional[List[FieldCondition]] = None
) -> Filter:
"""
Build a filter that always includes tenant_id.

CRITICAL: This ensures tenant isolation on every query.
"""
conditions = [
FieldCondition(
key="tenant_id",
match=MatchValue(value=self.tenant_id)
)
]

if additional_conditions:
conditions.extend(additional_conditions)

return Filter(must=conditions)

def _inject_tenant(self, payload: dict) -> dict:
"""Inject tenant_id into payload. Called on every write."""
return {**payload, "tenant_id": self.tenant_id}
# src/memory_store_qdrant/repository/vector.py
from typing import List, Optional, Any
from datetime import datetime
import uuid
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, PointIdsList

from memory_domain import MemoryBlock
from memory_store_qdrant.repository.base import TenantIsolatedVectorRepository
from memory_store_qdrant.embedding.base import EmbeddingProvider

class VectorRepository(TenantIsolatedVectorRepository):
"""CRUD operations for memory vectors with tenant isolation."""

def __init__(
self,
tenant_id: str,
client: QdrantClient,
collection_name: str,
embedding_provider: EmbeddingProvider
):
super().__init__(tenant_id, client, collection_name)
self.embedding_provider = embedding_provider

async def upsert(self, memory: MemoryBlock) -> str:
"""
Insert or update a memory block's vector.

Returns the point ID (same as embedding_id).
"""
# Generate embedding from content
embedding = await self.embedding_provider.embed_text(memory.content.text)

# Generate point ID if not set
point_id = memory.embedding_id or str(uuid.uuid4())

# Build payload (with tenant injection)
payload = self._inject_tenant({
"memory_id": memory.id,
"subject": {
"type": memory.subject.type,
"id": memory.subject.id,
},
"kind": memory.kind.value,
"tags": memory.tags,
"scores": {
"salience": memory.scores.salience,
"stability": memory.scores.stability,
"confidence": memory.scores.confidence,
},
"content_preview": memory.content.text[:500], # First 500 chars
"created_at": memory.created_at.isoformat(),
"updated_at": memory.updated_at.isoformat(),
"accessed_at": memory.accessed_at.isoformat(),
})

# Upsert point
await self.client.upsert(
collection_name=self.collection_name,
points=[
PointStruct(
id=point_id,
vector=embedding,
payload=payload
)
]
)

self._log.info(
"vector_upserted",
point_id=point_id,
memory_id=memory.id
)

return point_id

async def upsert_batch(self, memories: List[MemoryBlock]) -> List[str]:
"""
Batch insert/update memory vectors.

More efficient for bulk operations.
"""
if not memories:
return []

# Generate embeddings in batch
texts = [m.content.text for m in memories]
embeddings = await self.embedding_provider.embed_batch(texts)

# Build points
points = []
point_ids = []

for memory, embedding in zip(memories, embeddings):
point_id = memory.embedding_id or str(uuid.uuid4())
point_ids.append(point_id)

payload = self._inject_tenant({
"memory_id": memory.id,
"subject": {
"type": memory.subject.type,
"id": memory.subject.id,
},
"kind": memory.kind.value,
"tags": memory.tags,
"scores": {
"salience": memory.scores.salience,
"stability": memory.scores.stability,
"confidence": memory.scores.confidence,
},
"content_preview": memory.content.text[:500],
"created_at": memory.created_at.isoformat(),
"updated_at": memory.updated_at.isoformat(),
"accessed_at": memory.accessed_at.isoformat(),
})

points.append(PointStruct(
id=point_id,
vector=embedding,
payload=payload
))

# Upsert all points
await self.client.upsert(
collection_name=self.collection_name,
points=points
)

self._log.info("vectors_batch_upserted", count=len(points))

return point_ids

async def delete(self, point_id: str) -> bool:
"""
Delete a vector point.

Uses tenant filter to ensure ownership.
"""
# Delete with tenant filter to ensure we only delete our own
await self.client.delete(
collection_name=self.collection_name,
points_selector=PointIdsList(points=[point_id]),
# Note: Qdrant delete doesn't support filter on point IDs
# We rely on the fact that point_id is unique and was created by us
)

self._log.info("vector_deleted", point_id=point_id)
return True

async def delete_by_memory_id(self, memory_id: str) -> bool:
"""Delete vector by memory ID with tenant isolation."""
from qdrant_client.models import FilterSelector

filter_conditions = self._build_tenant_filter([
FieldCondition(
key="memory_id",
match=MatchValue(value=memory_id)
)
])

await self.client.delete(
collection_name=self.collection_name,
points_selector=FilterSelector(filter=filter_conditions)
)

self._log.info("vector_deleted_by_memory_id", memory_id=memory_id)
return True

async def update_scores(
self,
point_id: str,
salience: Optional[float] = None,
stability: Optional[float] = None,
confidence: Optional[float] = None,
accessed_at: Optional[datetime] = None
) -> bool:
"""
Update score fields without re-embedding.

Used for decay updates and access tracking.
"""
payload_update = {}

if salience is not None:
payload_update["scores.salience"] = salience
if stability is not None:
payload_update["scores.stability"] = stability
if confidence is not None:
payload_update["scores.confidence"] = confidence
if accessed_at is not None:
payload_update["accessed_at"] = accessed_at.isoformat()

if not payload_update:
return False

await self.client.set_payload(
collection_name=self.collection_name,
payload=payload_update,
points=[point_id],
)

return True
# src/memory_store_qdrant/repository/search.py
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from qdrant_client import QdrantClient
from qdrant_client.models import (
FieldCondition,
MatchValue,
MatchAny,
Range,
DatetimeRange,
)

from memory_domain import MemoryKind
from memory_store_qdrant.repository.base import TenantIsolatedVectorRepository
from memory_store_qdrant.embedding.base import EmbeddingProvider

@dataclass
class SearchResult:
"""Single search result with relevance score."""
point_id: str
memory_id: str
score: float # Similarity score [0, 1]
payload: dict


@dataclass
class SearchFilters:
"""Filters for semantic search."""
subject_type: Optional[str] = None
subject_id: Optional[str] = None
kinds: Optional[List[MemoryKind]] = None
tags_any: Optional[List[str]] = None
tags_all: Optional[List[str]] = None
min_salience: Optional[float] = None
min_confidence: Optional[float] = None
from_date: Optional[datetime] = None
to_date: Optional[datetime] = None


class SearchRepository(TenantIsolatedVectorRepository):
"""Semantic search operations with tenant isolation."""

def __init__(
self,
tenant_id: str,
client: QdrantClient,
collection_name: str,
embedding_provider: EmbeddingProvider
):
super().__init__(tenant_id, client, collection_name)
self.embedding_provider = embedding_provider

async def semantic_search(
self,
query_text: str,
filters: Optional[SearchFilters] = None,
limit: int = 20,
score_threshold: float = 0.0
) -> List[SearchResult]:
"""
Perform semantic similarity search.

Always applies tenant isolation filter.
"""
# Generate query embedding
query_vector = await self.embedding_provider.embed_text(query_text)

# Build filter conditions
filter_conditions = self._build_filter_conditions(filters)
search_filter = self._build_tenant_filter(filter_conditions)

# Execute search
results = await self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=search_filter,
limit=limit,
score_threshold=score_threshold,
with_payload=True,
)

self._log.info(
"semantic_search_executed",
query_length=len(query_text),
result_count=len(results)
)

return [
SearchResult(
point_id=str(r.id),
memory_id=r.payload.get("memory_id", ""),
score=r.score,
payload=r.payload
)
for r in results
]

async def search_by_subject(
self,
subject_type: str,
subject_id: str,
query_text: Optional[str] = None,
limit: int = 50,
min_salience: float = 0.0
) -> List[SearchResult]:
"""
Get memories for a specific subject.

If query_text provided, ranks by semantic similarity.
Otherwise, returns by salience score.
"""
filters = SearchFilters(
subject_type=subject_type,
subject_id=subject_id,
min_salience=min_salience
)

if query_text:
return await self.semantic_search(query_text, filters, limit)

# No query text - scroll through results
filter_conditions = self._build_filter_conditions(filters)
search_filter = self._build_tenant_filter(filter_conditions)

results, _ = await self.client.scroll(
collection_name=self.collection_name,
scroll_filter=search_filter,
limit=limit,
with_payload=True,
with_vectors=False,
)

# Sort by salience
sorted_results = sorted(
results,
key=lambda r: r.payload.get("scores", {}).get("salience", 0),
reverse=True
)

return [
SearchResult(
point_id=str(r.id),
memory_id=r.payload.get("memory_id", ""),
score=r.payload.get("scores", {}).get("salience", 0),
payload=r.payload
)
for r in sorted_results
]

async def find_similar(
self,
point_id: str,
limit: int = 10,
exclude_self: bool = True
) -> List[SearchResult]:
"""
Find memories similar to a given memory.

Uses the existing vector to find neighbors.
"""
# Get the point's vector
points = await self.client.retrieve(
collection_name=self.collection_name,
ids=[point_id],
with_vectors=True
)

if not points:
return []

vector = points[0].vector

# Search for similar
results = await self.client.search(
collection_name=self.collection_name,
query_vector=vector,
query_filter=self._build_tenant_filter(),
limit=limit + (1 if exclude_self else 0),
with_payload=True
)

# Filter out self if requested
if exclude_self:
results = [r for r in results if str(r.id) != point_id][:limit]

return [
SearchResult(
point_id=str(r.id),
memory_id=r.payload.get("memory_id", ""),
score=r.score,
payload=r.payload
)
for r in results
]

def _build_filter_conditions(
self,
filters: Optional[SearchFilters]
) -> List[FieldCondition]:
"""Build Qdrant filter conditions from SearchFilters."""
if not filters:
return []

conditions = []

if filters.subject_type:
conditions.append(FieldCondition(
key="subject.type",
match=MatchValue(value=filters.subject_type)
))

if filters.subject_id:
conditions.append(FieldCondition(
key="subject.id",
match=MatchValue(value=filters.subject_id)
))

if filters.kinds:
conditions.append(FieldCondition(
key="kind",
match=MatchAny(any=[k.value for k in filters.kinds])
))

if filters.tags_any:
conditions.append(FieldCondition(
key="tags",
match=MatchAny(any=filters.tags_any)
))

if filters.min_salience is not None:
conditions.append(FieldCondition(
key="scores.salience",
range=Range(gte=filters.min_salience)
))

if filters.min_confidence is not None:
conditions.append(FieldCondition(
key="scores.confidence",
range=Range(gte=filters.min_confidence)
))

if filters.from_date or filters.to_date:
conditions.append(FieldCondition(
key="created_at",
range=DatetimeRange(
gte=filters.from_date,
lte=filters.to_date
)
))

return conditions

Tenant Isolation Tests

# tests/test_repository/test_tenant_isolation.py
"""
CRITICAL TESTS: Qdrant Tenant Isolation

These tests verify that vector data from one tenant is invisible to another.
"""

import pytest
from memory_domain import MemoryBlock, MemoryKind, Subject, MemorySource

pytestmark = pytest.mark.asyncio


class TestVectorTenantIsolation:
"""Verify vector operations respect tenant boundaries."""

async def test_search_only_returns_own_tenant_vectors(
self,
tenant_a_search_repo,
tenant_b_search_repo,
tenant_a_vector_repo,
tenant_b_vector_repo,
sample_memory_factory
):
"""Semantic search only returns vectors from the same tenant."""
# Both tenants store memories about "pricing"
memory_a = sample_memory_factory(
tenant_id="tenant_a",
content_text="Our pricing is $99/month for the basic plan."
)
memory_b = sample_memory_factory(
tenant_id="tenant_b",
content_text="Pricing starts at $199/month for enterprise."
)

await tenant_a_vector_repo.upsert(memory_a)
await tenant_b_vector_repo.upsert(memory_b)

# Tenant A searches for pricing
results_a = await tenant_a_search_repo.semantic_search(
"What is the pricing?",
limit=10
)

# Should only find their own memory
assert len(results_a) == 1
assert results_a[0].memory_id == memory_a.id
assert "99" in results_a[0].payload.get("content_preview", "")

async def test_cannot_access_other_tenant_by_point_id(
self,
tenant_a_vector_repo,
tenant_b_search_repo,
sample_memory_factory
):
"""Even with exact point ID, tenant B cannot see tenant A's data."""
memory_a = sample_memory_factory(
tenant_id="tenant_a",
content_text="Secret tenant A information"
)

point_id = await tenant_a_vector_repo.upsert(memory_a)

# Tenant B tries to find similar using the actual point ID
# This should return nothing because the point belongs to tenant A
results = await tenant_b_search_repo.find_similar(point_id)

# Should not find anything (including the original)
assert len(results) == 0

async def test_subject_search_isolated(
self,
tenant_a_search_repo,
tenant_b_search_repo,
tenant_a_vector_repo,
tenant_b_vector_repo,
sample_memory_factory
):
"""Subject-based search is tenant-isolated."""
# Both tenants have memories about user_123
memory_a = sample_memory_factory(
tenant_id="tenant_a",
subject=Subject(type="user", id="user_123"),
content_text="User 123 from tenant A prefers email."
)
memory_b = sample_memory_factory(
tenant_id="tenant_b",
subject=Subject(type="user", id="user_123"),
content_text="User 123 from tenant B prefers phone."
)

await tenant_a_vector_repo.upsert(memory_a)
await tenant_b_vector_repo.upsert(memory_b)

# Tenant A searches for user_123
results = await tenant_a_search_repo.search_by_subject(
subject_type="user",
subject_id="user_123"
)

# Should only find their version
assert len(results) == 1
assert "email" in results[0].payload.get("content_preview", "")
assert "phone" not in results[0].payload.get("content_preview", "")

async def test_delete_only_affects_own_tenant(
self,
tenant_a_vector_repo,
tenant_b_vector_repo,
tenant_b_search_repo,
sample_memory_factory
):
"""Deleting a memory doesn't affect other tenants."""
# Tenant B stores a memory
memory_b = sample_memory_factory(
tenant_id="tenant_b",
content_text="Tenant B important data"
)
point_id = await tenant_b_vector_repo.upsert(memory_b)

# Tenant A tries to delete using memory ID
# (They shouldn't be able to, but let's verify nothing breaks)
await tenant_a_vector_repo.delete_by_memory_id(memory_b.id)

# Tenant B's memory should still exist
results = await tenant_b_search_repo.semantic_search("Tenant B data")
assert len(results) == 1
assert results[0].memory_id == memory_b.id


# =========================================
# Test Fixtures
# =========================================

@pytest.fixture
def sample_memory_factory():
"""Factory for creating test memory blocks."""
counter = 0

def create(
tenant_id: str,
content_text: str,
subject: Subject = None
) -> MemoryBlock:
nonlocal counter
counter += 1

return MemoryBlock(
id=f"mem_test_{counter}",
tenant_id=tenant_id,
subject=subject or Subject(type="user", id="default_user"),
kind=MemoryKind.FACT,
source={"origin": MemorySource.CHAT},
content={"text": content_text},
scores={"salience": 0.8, "stability": 0.9, "confidence": 0.7}
)

return create

Environment Variables

VariableRequiredDefaultDescription
QDRANT_HOSTNolocalhostQdrant server host
QDRANT_PORTNo6333HTTP port
QDRANT_GRPC_PORTNo6334gRPC port
QDRANT_PREFER_GRPCNotrueUse gRPC
QDRANT_COLLECTION_NAMENomemoriesCollection name
QDRANT_API_KEYNo-API key
EMBEDDING_PROVIDERNoopenaiopenai or local
EMBEDDING_MODELNotext-embedding-3-smallModel name
OPENAI_API_KEYConditional-Required if using OpenAI

Acceptance Criteria

  • Embedding generation works with OpenAI
  • Embedding generation works with local models (fallback)
  • Tenant isolation tests pass (CRITICAL)
  • Semantic search returns relevant results
  • Batch operations are efficient
  • Score updates work without re-embedding
  • Collection setup is idempotent
  • Performance: <100ms for single embedding
  • Performance: <500ms for search with 1M vectors