memory-store-qdrant
Vector/Semantic Storage Layer
| Property | Value |
|---|---|
| Repository | memory-store-qdrant |
| Language | Python 3.11+ |
| Type | Library (pip installable) |
| Database | Qdrant 1.7+ |
Purpose
Provides semantic storage and retrieval capabilities for the Memory Platform:
- Vector storage with tenant isolation
- Semantic similarity search
- Embedding generation (OpenAI + local fallback)
- Hybrid search with metadata filtering
Repository Structure
memory-store-qdrant/
├── pyproject.toml
├── README.md
├── CHANGELOG.md
├── src/
│ └── memory_store_qdrant/
│ ├── __init__.py
│ ├── py.typed
│ │
│ ├── client/
│ │ ├── __init__.py
│ │ ├── connection.py # Qdrant client management
│ │ ├── health.py # Health checks
│ │ └── collection.py # Collection management
│ │
│ ├── embedding/
│ │ ├── __init__.py
│ │ ├── base.py # Abstract embedding interface
│ │ ├── openai.py # OpenAI embeddings
│ │ ├── local.py # Sentence-transformers (fallback)
│ │ ├── cache.py # Embedding cache (optional)
│ │ └── factory.py # Embedding provider factory
│ │
│ ├── repository/
│ │ ├── __init__.py
│ │ ├── base.py # Base with tenant isolation
│ │ ├── vector.py # Vector CRUD operations
│ │ └── search.py # Search operations
│ │
│ └── config.py # Configuration
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_embedding/
│ │ ├── test_openai.py
│ │ └── test_local.py
│ ├── test_repository/
│ │ ├── test_vector.py
│ │ ├── test_search.py
│ │ └── test_tenant_isolation.py
│ └── test_integration/
│ └── test_full_flow.py
│
├── docker/
│ ├── Dockerfile
│ └── docker-compose.test.yml
│
└── docs/
├── embedding.md
└── search.md
Dependencies
# pyproject.toml
[project]
name = "memory-store-qdrant"
version = "0.1.0"
description = "Qdrant vector storage for Memory Platform"
requires-python = ">=3.11"
dependencies = [
"qdrant-client>=1.7,\<2.0",
"memory-domain>=0.1.0",
"pydantic-settings>=2.1",
"openai>=1.6",
"structlog>=23.2",
"tenacity>=8.2", # Retry logic
]
[project.optional-dependencies]
local = [
"sentence-transformers>=2.3",
"torch>=2.1",
]
dev = [
"pytest>=7.4",
"pytest-asyncio>=0.21",
"pytest-cov>=4.1",
"testcontainers>=3.7",
"ruff>=0.1",
"mypy>=1.7",
]
Configuration
# src/memory_store_qdrant/config.py
from enum import Enum
from pydantic_settings import BaseSettings
from pydantic import Field
class EmbeddingProvider(str, Enum):
OPENAI = "openai"
LOCAL = "local"
class EmbeddingModel(str, Enum):
# OpenAI models
OPENAI_SMALL = "text-embedding-3-small"
OPENAI_LARGE = "text-embedding-3-large"
# Local models
MINILM = "all-MiniLM-L6-v2"
MPNET = "all-mpnet-base-v2"
EMBEDDING_DIMENSIONS = {
EmbeddingModel.OPENAI_SMALL: 1536,
EmbeddingModel.OPENAI_LARGE: 3072,
EmbeddingModel.MINILM: 384,
EmbeddingModel.MPNET: 768,
}
class QdrantConfig(BaseSettings):
"""Qdrant connection configuration."""
model_config = {"env_prefix": "QDRANT_"}
host: str = Field(default="localhost")
port: int = Field(default=6333)
grpc_port: int = Field(default=6334)
# Use gRPC for better performance
prefer_grpc: bool = Field(default=True)
# Collection settings
collection_name: str = Field(default="memories")
# TLS
https: bool = Field(default=False)
# Authentication
api_key: str | None = Field(default=None)
# Timeout
timeout_seconds: int = Field(default=30)
class EmbeddingConfig(BaseSettings):
"""Embedding generation configuration."""
model_config = {"env_prefix": "EMBEDDING_"}
provider: EmbeddingProvider = Field(default=EmbeddingProvider.OPENAI)
model: EmbeddingModel = Field(default=EmbeddingModel.OPENAI_SMALL)
# OpenAI specific
openai_api_key: str | None = Field(default=None, alias="OPENAI_API_KEY")
openai_org_id: str | None = Field(default=None)
# Retry settings
max_retries: int = Field(default=3)
retry_delay_seconds: float = Field(default=1.0)
# Batching
batch_size: int = Field(default=100)
# Caching
cache_enabled: bool = Field(default=True)
cache_ttl_seconds: int = Field(default=3600)
@property
def dimensions(self) -> int:
return EMBEDDING_DIMENSIONS[self.model]
Embedding Generation
# src/memory_store_qdrant/embedding/base.py
from abc import ABC, abstractmethod
from typing import List
class EmbeddingProvider(ABC):
"""Abstract base class for embedding providers."""
@property
@abstractmethod
def dimensions(self) -> int:
"""Return the dimension of generated embeddings."""
pass
@abstractmethod
async def embed_text(self, text: str) -> List[float]:
"""Generate embedding for a single text."""
pass
@abstractmethod
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple texts."""
pass
# src/memory_store_qdrant/embedding/openai.py
import structlog
from typing import List
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
from memory_store_qdrant.config import EmbeddingConfig, EmbeddingModel
from memory_store_qdrant.embedding.base import EmbeddingProvider as BaseProvider
from memory_domain import EmbeddingGenerationError
logger = structlog.get_logger()
class OpenAIEmbeddingProvider(BaseProvider):
"""OpenAI embedding provider."""
def __init__(self, config: EmbeddingConfig):
self.config = config
self.client = AsyncOpenAI(
api_key=config.openai_api_key,
organization=config.openai_org_id,
)
self._model = config.model.value
@property
def dimensions(self) -> int:
return self.config.dimensions
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True
)
async def embed_text(self, text: str) -> List[float]:
"""Generate embedding for a single text."""
try:
response = await self.client.embeddings.create(
model=self._model,
input=text,
encoding_format="float"
)
return response.data[0].embedding
except Exception as e:
logger.error("openai_embedding_failed", error=str(e))
raise EmbeddingGenerationError(
f"Failed to generate embedding: {e}",
original_error=e
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True
)
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple texts."""
if not texts:
return []
try:
# Process in batches
all_embeddings = []
batch_size = self.config.batch_size
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = await self.client.embeddings.create(
model=self._model,
input=batch,
encoding_format="float"
)
# Sort by index to maintain order
sorted_data = sorted(response.data, key=lambda x: x.index)
all_embeddings.extend([d.embedding for d in sorted_data])
return all_embeddings
except Exception as e:
logger.error("openai_batch_embedding_failed", error=str(e), count=len(texts))
raise EmbeddingGenerationError(
f"Failed to generate batch embeddings: {e}",
original_error=e
)
# src/memory_store_qdrant/embedding/local.py
from typing import List
import structlog
from memory_store_qdrant.config import EmbeddingConfig
from memory_store_qdrant.embedding.base import EmbeddingProvider as BaseProvider
logger = structlog.get_logger()
class LocalEmbeddingProvider(BaseProvider):
"""
Local embedding provider using sentence-transformers.
Requires: pip install memory-store-qdrant[local]
"""
def __init__(self, config: EmbeddingConfig):
self.config = config
self._model = None
@property
def model(self):
"""Lazy load the model."""
if self._model is None:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.config.model.value)
logger.info("local_model_loaded", model=self.config.model.value)
return self._model
@property
def dimensions(self) -> int:
return self.config.dimensions
async def embed_text(self, text: str) -> List[float]:
"""Generate embedding for a single text."""
embedding = self.model.encode(text, convert_to_numpy=True)
return embedding.tolist()
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple texts."""
if not texts:
return []
embeddings = self.model.encode(
texts,
convert_to_numpy=True,
batch_size=self.config.batch_size
)
return embeddings.tolist()
# src/memory_store_qdrant/embedding/factory.py
from memory_store_qdrant.config import EmbeddingConfig, EmbeddingProvider as ProviderEnum
from memory_store_qdrant.embedding.base import EmbeddingProvider
from memory_store_qdrant.embedding.openai import OpenAIEmbeddingProvider
from memory_store_qdrant.embedding.local import LocalEmbeddingProvider
def create_embedding_provider(config: EmbeddingConfig | None = None) -> EmbeddingProvider:
"""Factory function to create embedding provider."""
if config is None:
config = EmbeddingConfig()
if config.provider == ProviderEnum.OPENAI:
return OpenAIEmbeddingProvider(config)
elif config.provider == ProviderEnum.LOCAL:
return LocalEmbeddingProvider(config)
else:
raise ValueError(f"Unknown embedding provider: {config.provider}")
Collection Management
# src/memory_store_qdrant/client/collection.py
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
VectorParams,
PayloadSchemaType,
CreateAliasOperation,
AliasOperations,
)
import structlog
from memory_store_qdrant.config import QdrantConfig, EmbeddingConfig
logger = structlog.get_logger()
async def setup_collection(
client: QdrantClient,
qdrant_config: QdrantConfig,
embedding_config: EmbeddingConfig
) -> None:
"""
Initialize Qdrant collection with proper schema.
Creates collection and all required payload indices.
"""
collection_name = qdrant_config.collection_name
# Check if collection exists
collections = await client.get_collections()
exists = any(c.name == collection_name for c in collections.collections)
if exists:
logger.info("collection_exists", name=collection_name)
return
# Create collection
await client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=embedding_config.dimensions,
distance=Distance.COSINE,
),
# Enable on-disk storage for large collections
on_disk_payload=True,
)
logger.info("collection_created", name=collection_name)
# Create payload indices for filtering
await _create_payload_indices(client, collection_name)
async def _create_payload_indices(client: QdrantClient, collection_name: str) -> None:
"""Create indices on payload fields for fast filtering."""
indices = [
# CRITICAL: Tenant isolation index
("tenant_id", PayloadSchemaType.KEYWORD),
# Subject filtering
("subject.type", PayloadSchemaType.KEYWORD),
("subject.id", PayloadSchemaType.KEYWORD),
# Memory metadata
("kind", PayloadSchemaType.KEYWORD),
("tags", PayloadSchemaType.KEYWORD),
# Timestamps for range queries
("created_at", PayloadSchemaType.DATETIME),
("updated_at", PayloadSchemaType.DATETIME),
("accessed_at", PayloadSchemaType.DATETIME),
# Scores for filtering
("scores.salience", PayloadSchemaType.FLOAT),
("scores.confidence", PayloadSchemaType.FLOAT),
]
for field_name, field_type in indices:
await client.create_payload_index(
collection_name=collection_name,
field_name=field_name,
field_schema=field_type,
)
logger.debug("payload_index_created", field=field_name, type=field_type)
logger.info("payload_indices_created", count=len(indices))
Tenant-Isolated Vector Repository
# src/memory_store_qdrant/repository/base.py
from typing import List, Optional
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
import structlog
logger = structlog.get_logger()
class TenantIsolatedVectorRepository:
"""
Base repository with automatic tenant isolation for Qdrant operations.
CRITICAL: All queries automatically include tenant_id filter.
"""
def __init__(
self,
tenant_id: str,
client: QdrantClient,
collection_name: str
):
if not tenant_id or not tenant_id.strip():
raise ValueError("tenant_id is required")
self.tenant_id = tenant_id
self.client = client
self.collection_name = collection_name
self._log = logger.bind(tenant_id=tenant_id)
def _build_tenant_filter(
self,
additional_conditions: Optional[List[FieldCondition]] = None
) -> Filter:
"""
Build a filter that always includes tenant_id.
CRITICAL: This ensures tenant isolation on every query.
"""
conditions = [
FieldCondition(
key="tenant_id",
match=MatchValue(value=self.tenant_id)
)
]
if additional_conditions:
conditions.extend(additional_conditions)
return Filter(must=conditions)
def _inject_tenant(self, payload: dict) -> dict:
"""Inject tenant_id into payload. Called on every write."""
return {**payload, "tenant_id": self.tenant_id}
# src/memory_store_qdrant/repository/vector.py
from typing import List, Optional, Any
from datetime import datetime
import uuid
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, PointIdsList
from memory_domain import MemoryBlock
from memory_store_qdrant.repository.base import TenantIsolatedVectorRepository
from memory_store_qdrant.embedding.base import EmbeddingProvider
class VectorRepository(TenantIsolatedVectorRepository):
"""CRUD operations for memory vectors with tenant isolation."""
def __init__(
self,
tenant_id: str,
client: QdrantClient,
collection_name: str,
embedding_provider: EmbeddingProvider
):
super().__init__(tenant_id, client, collection_name)
self.embedding_provider = embedding_provider
async def upsert(self, memory: MemoryBlock) -> str:
"""
Insert or update a memory block's vector.
Returns the point ID (same as embedding_id).
"""
# Generate embedding from content
embedding = await self.embedding_provider.embed_text(memory.content.text)
# Generate point ID if not set
point_id = memory.embedding_id or str(uuid.uuid4())
# Build payload (with tenant injection)
payload = self._inject_tenant({
"memory_id": memory.id,
"subject": {
"type": memory.subject.type,
"id": memory.subject.id,
},
"kind": memory.kind.value,
"tags": memory.tags,
"scores": {
"salience": memory.scores.salience,
"stability": memory.scores.stability,
"confidence": memory.scores.confidence,
},
"content_preview": memory.content.text[:500], # First 500 chars
"created_at": memory.created_at.isoformat(),
"updated_at": memory.updated_at.isoformat(),
"accessed_at": memory.accessed_at.isoformat(),
})
# Upsert point
await self.client.upsert(
collection_name=self.collection_name,
points=[
PointStruct(
id=point_id,
vector=embedding,
payload=payload
)
]
)
self._log.info(
"vector_upserted",
point_id=point_id,
memory_id=memory.id
)
return point_id
async def upsert_batch(self, memories: List[MemoryBlock]) -> List[str]:
"""
Batch insert/update memory vectors.
More efficient for bulk operations.
"""
if not memories:
return []
# Generate embeddings in batch
texts = [m.content.text for m in memories]
embeddings = await self.embedding_provider.embed_batch(texts)
# Build points
points = []
point_ids = []
for memory, embedding in zip(memories, embeddings):
point_id = memory.embedding_id or str(uuid.uuid4())
point_ids.append(point_id)
payload = self._inject_tenant({
"memory_id": memory.id,
"subject": {
"type": memory.subject.type,
"id": memory.subject.id,
},
"kind": memory.kind.value,
"tags": memory.tags,
"scores": {
"salience": memory.scores.salience,
"stability": memory.scores.stability,
"confidence": memory.scores.confidence,
},
"content_preview": memory.content.text[:500],
"created_at": memory.created_at.isoformat(),
"updated_at": memory.updated_at.isoformat(),
"accessed_at": memory.accessed_at.isoformat(),
})
points.append(PointStruct(
id=point_id,
vector=embedding,
payload=payload
))
# Upsert all points
await self.client.upsert(
collection_name=self.collection_name,
points=points
)
self._log.info("vectors_batch_upserted", count=len(points))
return point_ids
async def delete(self, point_id: str) -> bool:
"""
Delete a vector point.
Uses tenant filter to ensure ownership.
"""
# Delete with tenant filter to ensure we only delete our own
await self.client.delete(
collection_name=self.collection_name,
points_selector=PointIdsList(points=[point_id]),
# Note: Qdrant delete doesn't support filter on point IDs
# We rely on the fact that point_id is unique and was created by us
)
self._log.info("vector_deleted", point_id=point_id)
return True
async def delete_by_memory_id(self, memory_id: str) -> bool:
"""Delete vector by memory ID with tenant isolation."""
from qdrant_client.models import FilterSelector
filter_conditions = self._build_tenant_filter([
FieldCondition(
key="memory_id",
match=MatchValue(value=memory_id)
)
])
await self.client.delete(
collection_name=self.collection_name,
points_selector=FilterSelector(filter=filter_conditions)
)
self._log.info("vector_deleted_by_memory_id", memory_id=memory_id)
return True
async def update_scores(
self,
point_id: str,
salience: Optional[float] = None,
stability: Optional[float] = None,
confidence: Optional[float] = None,
accessed_at: Optional[datetime] = None
) -> bool:
"""
Update score fields without re-embedding.
Used for decay updates and access tracking.
"""
payload_update = {}
if salience is not None:
payload_update["scores.salience"] = salience
if stability is not None:
payload_update["scores.stability"] = stability
if confidence is not None:
payload_update["scores.confidence"] = confidence
if accessed_at is not None:
payload_update["accessed_at"] = accessed_at.isoformat()
if not payload_update:
return False
await self.client.set_payload(
collection_name=self.collection_name,
payload=payload_update,
points=[point_id],
)
return True
# src/memory_store_qdrant/repository/search.py
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from qdrant_client import QdrantClient
from qdrant_client.models import (
FieldCondition,
MatchValue,
MatchAny,
Range,
DatetimeRange,
)
from memory_domain import MemoryKind
from memory_store_qdrant.repository.base import TenantIsolatedVectorRepository
from memory_store_qdrant.embedding.base import EmbeddingProvider
@dataclass
class SearchResult:
"""Single search result with relevance score."""
point_id: str
memory_id: str
score: float # Similarity score [0, 1]
payload: dict
@dataclass
class SearchFilters:
"""Filters for semantic search."""
subject_type: Optional[str] = None
subject_id: Optional[str] = None
kinds: Optional[List[MemoryKind]] = None
tags_any: Optional[List[str]] = None
tags_all: Optional[List[str]] = None
min_salience: Optional[float] = None
min_confidence: Optional[float] = None
from_date: Optional[datetime] = None
to_date: Optional[datetime] = None
class SearchRepository(TenantIsolatedVectorRepository):
"""Semantic search operations with tenant isolation."""
def __init__(
self,
tenant_id: str,
client: QdrantClient,
collection_name: str,
embedding_provider: EmbeddingProvider
):
super().__init__(tenant_id, client, collection_name)
self.embedding_provider = embedding_provider
async def semantic_search(
self,
query_text: str,
filters: Optional[SearchFilters] = None,
limit: int = 20,
score_threshold: float = 0.0
) -> List[SearchResult]:
"""
Perform semantic similarity search.
Always applies tenant isolation filter.
"""
# Generate query embedding
query_vector = await self.embedding_provider.embed_text(query_text)
# Build filter conditions
filter_conditions = self._build_filter_conditions(filters)
search_filter = self._build_tenant_filter(filter_conditions)
# Execute search
results = await self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=search_filter,
limit=limit,
score_threshold=score_threshold,
with_payload=True,
)
self._log.info(
"semantic_search_executed",
query_length=len(query_text),
result_count=len(results)
)
return [
SearchResult(
point_id=str(r.id),
memory_id=r.payload.get("memory_id", ""),
score=r.score,
payload=r.payload
)
for r in results
]
async def search_by_subject(
self,
subject_type: str,
subject_id: str,
query_text: Optional[str] = None,
limit: int = 50,
min_salience: float = 0.0
) -> List[SearchResult]:
"""
Get memories for a specific subject.
If query_text provided, ranks by semantic similarity.
Otherwise, returns by salience score.
"""
filters = SearchFilters(
subject_type=subject_type,
subject_id=subject_id,
min_salience=min_salience
)
if query_text:
return await self.semantic_search(query_text, filters, limit)
# No query text - scroll through results
filter_conditions = self._build_filter_conditions(filters)
search_filter = self._build_tenant_filter(filter_conditions)
results, _ = await self.client.scroll(
collection_name=self.collection_name,
scroll_filter=search_filter,
limit=limit,
with_payload=True,
with_vectors=False,
)
# Sort by salience
sorted_results = sorted(
results,
key=lambda r: r.payload.get("scores", {}).get("salience", 0),
reverse=True
)
return [
SearchResult(
point_id=str(r.id),
memory_id=r.payload.get("memory_id", ""),
score=r.payload.get("scores", {}).get("salience", 0),
payload=r.payload
)
for r in sorted_results
]
async def find_similar(
self,
point_id: str,
limit: int = 10,
exclude_self: bool = True
) -> List[SearchResult]:
"""
Find memories similar to a given memory.
Uses the existing vector to find neighbors.
"""
# Get the point's vector
points = await self.client.retrieve(
collection_name=self.collection_name,
ids=[point_id],
with_vectors=True
)
if not points:
return []
vector = points[0].vector
# Search for similar
results = await self.client.search(
collection_name=self.collection_name,
query_vector=vector,
query_filter=self._build_tenant_filter(),
limit=limit + (1 if exclude_self else 0),
with_payload=True
)
# Filter out self if requested
if exclude_self:
results = [r for r in results if str(r.id) != point_id][:limit]
return [
SearchResult(
point_id=str(r.id),
memory_id=r.payload.get("memory_id", ""),
score=r.score,
payload=r.payload
)
for r in results
]
def _build_filter_conditions(
self,
filters: Optional[SearchFilters]
) -> List[FieldCondition]:
"""Build Qdrant filter conditions from SearchFilters."""
if not filters:
return []
conditions = []
if filters.subject_type:
conditions.append(FieldCondition(
key="subject.type",
match=MatchValue(value=filters.subject_type)
))
if filters.subject_id:
conditions.append(FieldCondition(
key="subject.id",
match=MatchValue(value=filters.subject_id)
))
if filters.kinds:
conditions.append(FieldCondition(
key="kind",
match=MatchAny(any=[k.value for k in filters.kinds])
))
if filters.tags_any:
conditions.append(FieldCondition(
key="tags",
match=MatchAny(any=filters.tags_any)
))
if filters.min_salience is not None:
conditions.append(FieldCondition(
key="scores.salience",
range=Range(gte=filters.min_salience)
))
if filters.min_confidence is not None:
conditions.append(FieldCondition(
key="scores.confidence",
range=Range(gte=filters.min_confidence)
))
if filters.from_date or filters.to_date:
conditions.append(FieldCondition(
key="created_at",
range=DatetimeRange(
gte=filters.from_date,
lte=filters.to_date
)
))
return conditions
Tenant Isolation Tests
# tests/test_repository/test_tenant_isolation.py
"""
CRITICAL TESTS: Qdrant Tenant Isolation
These tests verify that vector data from one tenant is invisible to another.
"""
import pytest
from memory_domain import MemoryBlock, MemoryKind, Subject, MemorySource
pytestmark = pytest.mark.asyncio
class TestVectorTenantIsolation:
"""Verify vector operations respect tenant boundaries."""
async def test_search_only_returns_own_tenant_vectors(
self,
tenant_a_search_repo,
tenant_b_search_repo,
tenant_a_vector_repo,
tenant_b_vector_repo,
sample_memory_factory
):
"""Semantic search only returns vectors from the same tenant."""
# Both tenants store memories about "pricing"
memory_a = sample_memory_factory(
tenant_id="tenant_a",
content_text="Our pricing is $99/month for the basic plan."
)
memory_b = sample_memory_factory(
tenant_id="tenant_b",
content_text="Pricing starts at $199/month for enterprise."
)
await tenant_a_vector_repo.upsert(memory_a)
await tenant_b_vector_repo.upsert(memory_b)
# Tenant A searches for pricing
results_a = await tenant_a_search_repo.semantic_search(
"What is the pricing?",
limit=10
)
# Should only find their own memory
assert len(results_a) == 1
assert results_a[0].memory_id == memory_a.id
assert "99" in results_a[0].payload.get("content_preview", "")
async def test_cannot_access_other_tenant_by_point_id(
self,
tenant_a_vector_repo,
tenant_b_search_repo,
sample_memory_factory
):
"""Even with exact point ID, tenant B cannot see tenant A's data."""
memory_a = sample_memory_factory(
tenant_id="tenant_a",
content_text="Secret tenant A information"
)
point_id = await tenant_a_vector_repo.upsert(memory_a)
# Tenant B tries to find similar using the actual point ID
# This should return nothing because the point belongs to tenant A
results = await tenant_b_search_repo.find_similar(point_id)
# Should not find anything (including the original)
assert len(results) == 0
async def test_subject_search_isolated(
self,
tenant_a_search_repo,
tenant_b_search_repo,
tenant_a_vector_repo,
tenant_b_vector_repo,
sample_memory_factory
):
"""Subject-based search is tenant-isolated."""
# Both tenants have memories about user_123
memory_a = sample_memory_factory(
tenant_id="tenant_a",
subject=Subject(type="user", id="user_123"),
content_text="User 123 from tenant A prefers email."
)
memory_b = sample_memory_factory(
tenant_id="tenant_b",
subject=Subject(type="user", id="user_123"),
content_text="User 123 from tenant B prefers phone."
)
await tenant_a_vector_repo.upsert(memory_a)
await tenant_b_vector_repo.upsert(memory_b)
# Tenant A searches for user_123
results = await tenant_a_search_repo.search_by_subject(
subject_type="user",
subject_id="user_123"
)
# Should only find their version
assert len(results) == 1
assert "email" in results[0].payload.get("content_preview", "")
assert "phone" not in results[0].payload.get("content_preview", "")
async def test_delete_only_affects_own_tenant(
self,
tenant_a_vector_repo,
tenant_b_vector_repo,
tenant_b_search_repo,
sample_memory_factory
):
"""Deleting a memory doesn't affect other tenants."""
# Tenant B stores a memory
memory_b = sample_memory_factory(
tenant_id="tenant_b",
content_text="Tenant B important data"
)
point_id = await tenant_b_vector_repo.upsert(memory_b)
# Tenant A tries to delete using memory ID
# (They shouldn't be able to, but let's verify nothing breaks)
await tenant_a_vector_repo.delete_by_memory_id(memory_b.id)
# Tenant B's memory should still exist
results = await tenant_b_search_repo.semantic_search("Tenant B data")
assert len(results) == 1
assert results[0].memory_id == memory_b.id
# =========================================
# Test Fixtures
# =========================================
@pytest.fixture
def sample_memory_factory():
"""Factory for creating test memory blocks."""
counter = 0
def create(
tenant_id: str,
content_text: str,
subject: Subject = None
) -> MemoryBlock:
nonlocal counter
counter += 1
return MemoryBlock(
id=f"mem_test_{counter}",
tenant_id=tenant_id,
subject=subject or Subject(type="user", id="default_user"),
kind=MemoryKind.FACT,
source={"origin": MemorySource.CHAT},
content={"text": content_text},
scores={"salience": 0.8, "stability": 0.9, "confidence": 0.7}
)
return create
Environment Variables
| Variable | Required | Default | Description |
|---|---|---|---|
QDRANT_HOST | No | localhost | Qdrant server host |
QDRANT_PORT | No | 6333 | HTTP port |
QDRANT_GRPC_PORT | No | 6334 | gRPC port |
QDRANT_PREFER_GRPC | No | true | Use gRPC |
QDRANT_COLLECTION_NAME | No | memories | Collection name |
QDRANT_API_KEY | No | - | API key |
EMBEDDING_PROVIDER | No | openai | openai or local |
EMBEDDING_MODEL | No | text-embedding-3-small | Model name |
OPENAI_API_KEY | Conditional | - | Required if using OpenAI |
Acceptance Criteria
- Embedding generation works with OpenAI
- Embedding generation works with local models (fallback)
- Tenant isolation tests pass (CRITICAL)
- Semantic search returns relevant results
- Batch operations are efficient
- Score updates work without re-embedding
- Collection setup is idempotent
- Performance: <100ms for single embedding
- Performance: <500ms for search with 1M vectors