memory-curator-worker
LLM-Powered Knowledge Extraction
| Property | Value |
|---|---|
| Repository | memory-curator-worker |
| Language | Python 3.11+ |
| Type | Background Worker Service |
| Framework | NATS JetStream Consumer |
Purpose
Background worker that processes events and extracts structured memories using LLM:
- Conversation curation (extract facts, preferences, insights)
- Semantic memory extraction from documents (building on structural parsing)
- Tool result memorization
- Salience decay management
Repository Structure
memory-curator-worker/
├── pyproject.toml
├── README.md
├── Dockerfile
├── docker-compose.yml
│
├── src/
│ └── memory_curator/
│ ├── __init__.py
│ ├── main.py # Worker entry point
│ ├── config.py # Configuration
│ │
│ ├── consumer/
│ │ ├── __init__.py
│ │ ├── jetstream.py # NATS JetStream consumer
│ │ └── handlers.py # Event routing
│ │
│ ├── processors/
│ │ ├── __init__.py
│ │ ├── base.py # Base processor class
│ │ ├── conversation.py # Conversation curation
│ │ ├── document.py # Document ingestion
│ │ ├── tool_result.py # Tool output processing
│ │ └── decay.py # Salience decay
│ │
│ ├── extraction/
│ │ ├── __init__.py
│ │ ├── llm_client.py # LLM API client
│ │ ├── prompts.py # Extraction prompts
│ │ ├── parser.py # Output parsing
│ │ └── chunker.py # Document chunking
│ │
│ ├── integrations/
│ │ ├── __init__.py
│ │ ├── google_drive.py # Google Drive client
│ │ ├── notion.py # Notion client
│ │ └── transcript.py # Transcript fetching
│ │
│ └── utils/
│ ├── __init__.py
│ └── retry.py # Retry logic
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_processors/
│ │ ├── test_conversation.py
│ │ └── test_document.py
│ └── test_extraction/
│ ├── test_prompts.py
│ └── test_parser.py
│
└── docs/
├── processors.md
└── prompts.md
Dependencies
# pyproject.toml
[project]
name = "memory-curator-worker"
version = "0.1.0"
description = "LLM-powered knowledge extraction worker"
requires-python = ">=3.11"
dependencies = [
"nats-py>=2.6",
"memory-domain>=0.1.0",
"openai>=1.6",
"anthropic>=0.18",
"httpx>=0.26",
"pydantic>=2.5",
"pydantic-settings>=2.1",
"structlog>=23.2",
"tenacity>=8.2",
"tiktoken>=0.5",
"google-api-python-client>=2.111",
"google-auth>=2.25",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4",
"pytest-asyncio>=0.21",
"pytest-cov>=4.1",
"ruff>=0.1",
"mypy>=1.7",
]
Configuration
# src/memory_curator/config.py
from enum import Enum
from pydantic_settings import BaseSettings
from pydantic import Field
class LLMProvider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
class CuratorConfig(BaseSettings):
"""Curator worker configuration."""
model_config = {"env_prefix": "CURATOR_"}
# Worker settings
worker_id: str = Field(default="curator-1")
concurrency: int = Field(default=5, ge=1, le=20)
# LLM settings
llm_provider: LLMProvider = Field(default=LLMProvider.OPENAI)
llm_model: str = Field(default="gpt-4o")
llm_temperature: float = Field(default=0.1, ge=0, le=1)
llm_max_tokens: int = Field(default=4000)
# API keys
openai_api_key: str | None = Field(default=None)
anthropic_api_key: str | None = Field(default=None)
# Memory Engine
engine_url: str = Field(default="http://localhost:8000")
engine_api_key: str | None = Field(default=None)
# Processing settings
max_conversation_messages: int = Field(default=100)
max_document_chunk_size: int = Field(default=4000)
max_memories_per_conversation: int = Field(default=10)
# Decay settings
decay_batch_size: int = Field(default=1000)
decay_schedule_cron: str = Field(default="0 3 * * *") # 3 AM daily
class NatsConfig(BaseSettings):
"""NATS configuration."""
model_config = {"env_prefix": "NATS_"}
url: str = Field(default="nats://localhost:4222")
stream_name: str = Field(default="MEMORY_EVENTS")
# Consumer settings
consumer_name: str = Field(default="memory-curator")
ack_wait_seconds: int = Field(default=60)
max_deliver: int = Field(default=3)
class Settings(BaseSettings):
"""Combined settings."""
curator: CuratorConfig = Field(default_factory=CuratorConfig)
nats: NatsConfig = Field(default_factory=NatsConfig)
JetStream Consumer
# src/memory_curator/consumer/jetstream.py
import asyncio
from typing import Callable, Awaitable
import nats
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, AckPolicy, DeliverPolicy
import structlog
from memory_curator.config import NatsConfig
logger = structlog.get_logger()
class JetStreamConsumer:
"""NATS JetStream durable consumer."""
def __init__(self, config: NatsConfig):
self.config = config
self._nc: nats.NATS | None = None
self._js: JetStreamContext | None = None
self._subscription = None
self._running = False
async def connect(self) -> None:
"""Connect to NATS and setup consumer."""
self._nc = await nats.connect(self.config.url)
self._js = self._nc.jetstream()
# Create durable consumer
try:
await self._js.add_consumer(
self.config.stream_name,
config=ConsumerConfig(
durable_name=self.config.consumer_name,
ack_policy=AckPolicy.EXPLICIT,
deliver_policy=DeliverPolicy.ALL,
ack_wait=self.config.ack_wait_seconds,
max_deliver=self.config.max_deliver,
)
)
except Exception:
# Consumer may already exist
pass
logger.info(
"jetstream_connected",
stream=self.config.stream_name,
consumer=self.config.consumer_name
)
async def subscribe(
self,
handler: Callable[[dict], Awaitable[None]],
subjects: list[str] | None = None
) -> None:
"""Subscribe to messages with handler."""
if not self._js:
raise RuntimeError("Not connected")
# Default to all memory events
subject = subjects[0] if subjects else "memory.>"
async def message_handler(msg):
try:
import json
event = json.loads(msg.data.decode())
logger.debug(
"message_received",
subject=msg.subject,
event_type=event.get("event_type")
)
await handler(event)
await msg.ack()
except Exception as e:
logger.error(
"message_processing_failed",
subject=msg.subject,
error=str(e)
)
# Will be redelivered up to max_deliver times
await msg.nak()
self._subscription = await self._js.pull_subscribe(
subject,
durable=self.config.consumer_name,
)
self._running = True
# Pull loop
while self._running:
try:
messages = await self._subscription.fetch(batch=10, timeout=5)
for msg in messages:
await message_handler(msg)
except nats.errors.TimeoutError:
continue
except Exception as e:
logger.error("pull_error", error=str(e))
await asyncio.sleep(1)
async def close(self) -> None:
"""Close connection."""
self._running = False
if self._nc:
await self._nc.close()
logger.info("jetstream_disconnected")
Event Handlers
# src/memory_curator/consumer/handlers.py
import structlog
from memory_curator.config import Settings
from memory_curator.processors.conversation import ConversationProcessor
from memory_curator.processors.document import DocumentProcessor
from memory_curator.processors.tool_result import ToolResultProcessor
from memory_curator.processors.decay import DecayProcessor
logger = structlog.get_logger()
class EventRouter:
"""Routes events to appropriate processors."""
def __init__(self, settings: Settings):
self.settings = settings
# Initialize processors
self.conversation_processor = ConversationProcessor(settings)
self.document_processor = DocumentProcessor(settings)
self.tool_processor = ToolResultProcessor(settings)
self.decay_processor = DecayProcessor(settings)
async def handle_event(self, event: dict) -> None:
"""Route event to appropriate processor."""
event_type = event.get("event_type", "")
tenant_id = event.get("tenant_id")
log = logger.bind(event_type=event_type, tenant_id=tenant_id)
try:
if event_type == "conversation.ended":
await self.conversation_processor.process(event)
elif event_type == "document.uploaded":
await self.document_processor.process(event)
elif event_type == "tool.executed":
await self.tool_processor.process(event)
elif event_type == "maintenance.salience_decay":
await self.decay_processor.process(event)
else:
log.warning("unknown_event_type")
except Exception as e:
log.error("event_processing_failed", error=str(e))
raise
Conversation Processor
# src/memory_curator/processors/conversation.py
from typing import List
import structlog
from memory_domain import (
MemoryBlock,
MemoryKind,
MemorySource,
MemoryCreateRequest,
)
from memory_curator.config import Settings
from memory_curator.extraction.llm_client import LLMClient
from memory_curator.extraction.prompts import CONVERSATION_CURATION_PROMPT
from memory_curator.extraction.parser import parse_extraction_response
from memory_curator.processors.base import BaseProcessor
logger = structlog.get_logger()
class ConversationProcessor(BaseProcessor):
"""Extracts memories from completed conversations."""
def __init__(self, settings: Settings):
super().__init__(settings)
self.llm = LLMClient(settings.curator)
self.max_memories = settings.curator.max_memories_per_conversation
async def process(self, event: dict) -> None:
"""Process conversation.ended event."""
tenant_id = event["tenant_id"]
conversation_id = event["data"]["conversation_id"]
user_id = event["data"]["user_id"]
log = logger.bind(
tenant_id=tenant_id,
conversation_id=conversation_id
)
log.info("processing_conversation")
# Fetch transcript
transcript = await self._fetch_transcript(event)
if not transcript:
log.warning("no_transcript_available")
return
# Fetch existing context about user
existing_memories = await self._fetch_existing_memories(
tenant_id, user_id
)
# Build extraction prompt
prompt = self._build_prompt(
transcript=transcript,
existing_memories=existing_memories,
user_id=user_id
)
# Call LLM for extraction
response = await self.llm.complete(prompt)
# Parse extracted memories
extracted = parse_extraction_response(response)
if not extracted:
log.info("no_memories_extracted")
return
# Limit number of memories
extracted = extracted[:self.max_memories]
# Create memories via engine API
created_count = 0
for memory_data in extracted:
try:
request = MemoryCreateRequest(
subject={"type": "user", "id": user_id},
kind=MemoryKind(memory_data["kind"]),
text=memory_data["content"]["text"],
tags=memory_data.get("tags", []),
conversation_id=conversation_id,
salience=memory_data["scores"]["salience"],
stability=memory_data["scores"]["stability"],
confidence=memory_data["scores"]["confidence"],
)
await self.engine_client.create_memory(request)
created_count += 1
except Exception as e:
log.error("memory_creation_failed", error=str(e))
log.info(
"conversation_processed",
extracted=len(extracted),
created=created_count
)
async def _fetch_transcript(self, event: dict) -> str | None:
"""Fetch conversation transcript."""
transcript_url = event["data"].get("transcript_url")
if transcript_url:
# Fetch from URL
async with httpx.AsyncClient() as client:
response = await client.get(transcript_url)
return response.text
# Fallback: try to get from conversation service
# Implementation depends on your infrastructure
return None
async def _fetch_existing_memories(
self,
tenant_id: str,
user_id: str
) -> List[dict]:
"""Fetch existing memories about user for context."""
try:
memories = await self.engine_client.get_working_set(
subject_type="user",
subject_id=user_id,
max_blocks=20
)
return [
{
"kind": m["kind"],
"text": m["content"]["text"],
"tags": m.get("tags", [])
}
for m in memories
]
except Exception:
return []
def _build_prompt(
self,
transcript: str,
existing_memories: List[dict],
user_id: str
) -> str:
"""Build the extraction prompt."""
context_str = ""
if existing_memories:
context_str = "\n".join([
f"- [{m['kind']}] {m['text']}"
for m in existing_memories
])
else:
context_str = "No existing memories."
return CONVERSATION_CURATION_PROMPT.format(
subject=f"User: {user_id}",
context_memories=context_str,
transcript=transcript
)
Extraction Prompts
# src/memory_curator/extraction/prompts.py
CONVERSATION_CURATION_PROMPT = """You are a memory curator for an AI assistant. Your job is to extract structured, memorable knowledge from conversations.
**Subject:** \{subject\}
**Existing Knowledge About This Subject:**
{context_memories}
**New Conversation:**
{transcript}
**Your Task:**
Extract memories that are:
1. **Facts** - Verifiable, stable information that doesn't change often
2. **Preferences** - Stated likes, dislikes, requirements, or behavioral patterns
3. **Insights** - Patterns, opportunities, risks, or discoveries you infer
**Guidelines:**
- Extract 3-10 memories (focus on quality over quantity)
- Avoid duplicating existing knowledge unless it's an update
- Facts should be specific and verifiable
- Use clear, concise language that an LLM can easily understand
- Each memory should be self-contained and meaningful on its own
**Output Format (JSON):**
```json
{{
"blocks": [
{{
"kind": "fact|preference|insight",
"content": {{
"text": "Natural language summary suitable for LLM context"
}},
"tags": ["list", "of", "relevant", "concepts"],
"scores": {{
"salience": 0.0-1.0,
"stability": 0.0-1.0,
"confidence": 0.0-1.0
}},
"reasoning": "Why this is worth remembering"
}}
]
}}
Extract memories now:"""
DOCUMENT_EXTRACTION_PROMPT = """You are a memory curator extracting knowledge from a document.
Document Title: {title} Document Type: {mime_type}
Associated Subject: {subject}
Document Content: {content}
Your Task: Extract key facts, insights, and notable information from this document that would be valuable to remember.
Guidelines:
- Focus on information that would help an AI assistant better serve the subject
- Extract specific details, not general summaries
- Include relevant context so memories are self-contained
- Mark confidence based on how clear the information is in the document
Output Format (JSON):
{{
"blocks": [
{{
"kind": "fact|insight|summary",
"content": {{
"text": "Extracted information",
"structured": {{}}
}},
"tags": ["relevant", "topics"],
"scores": {{
"salience": 0.0-1.0,
"stability": 0.0-1.0,
"confidence": 0.0-1.0
}}
}}
]
}}
Extract memories now:"""
TOOL_RESULT_PROMPT = """Extract memorable information from this tool execution result.
Tool: {tool_name} Input: {input_summary} Output: {output_summary}
Extract any facts or insights worth remembering:"""
---
## LLM Client
```python
# src/memory_curator/extraction/llm_client.py
from typing import Optional
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential
from memory_curator.config import CuratorConfig, LLMProvider
logger = structlog.get_logger()
class LLMClient:
"""Client for LLM completions."""
def __init__(self, config: CuratorConfig):
self.config = config
self._client = None
@property
def client(self):
"""Lazy initialize LLM client."""
if self._client is None:
if self.config.llm_provider == LLMProvider.OPENAI:
from openai import AsyncOpenAI
self._client = AsyncOpenAI(api_key=self.config.openai_api_key)
elif self.config.llm_provider == LLMProvider.ANTHROPIC:
from anthropic import AsyncAnthropic
self._client = AsyncAnthropic(api_key=self.config.anthropic_api_key)
return self._client
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
)
async def complete(self, prompt: str) -> str:
"""Generate completion from LLM."""
logger.debug("llm_request", provider=self.config.llm_provider)
if self.config.llm_provider == LLMProvider.OPENAI:
response = await self.client.chat.completions.create(
model=self.config.llm_model,
messages=[{"role": "user", "content": prompt}],
temperature=self.config.llm_temperature,
max_tokens=self.config.llm_max_tokens,
)
return response.choices[0].message.content
elif self.config.llm_provider == LLMProvider.ANTHROPIC:
response = await self.client.messages.create(
model=self.config.llm_model,
messages=[{"role": "user", "content": prompt}],
temperature=self.config.llm_temperature,
max_tokens=self.config.llm_max_tokens,
)
return response.content[0].text
raise ValueError(f"Unknown provider: {self.config.llm_provider}")
Document Processor
The DocumentProcessor performs semantic extraction from documents. Unlike the structural parsing done in memory-store-documents, this processor focuses on identifying valuable knowledge blocks that should be added to the Memory Store.
Updated Flow
- Event: Receives
document.parsed(notdocument.uploaded). - Fetch: Calls
memory-store-documentsAPI to get the structural pages and text elements. - Analyze: Uses LLM to extract facts, insights, and summaries from the text element hierarchy.
- Reference: Includes the
document_idand character offsets in the resultingMemoryBlockfor provenance. - Store: Sends Create requests to
memory-engine-service.
from typing import List
import httpx
import structlog
from memory_domain import MemoryKind, MemoryCreateRequest
from memory_curator.config import Settings
from memory_curator.extraction.llm_client import LLMClient
from memory_curator.extraction.prompts import DOCUMENT_EXTRACTION_PROMPT
from memory_curator.extraction.parser import parse_extraction_response
from memory_curator.extraction.chunker import chunk_document
from memory_curator.processors.base import BaseProcessor
logger = structlog.get_logger()
class DocumentProcessor(BaseProcessor):
"""Processes uploaded documents for memory extraction."""
def __init__(self, settings: Settings):
super().__init__(settings)
self.llm = LLMClient(settings.curator)
self.chunk_size = settings.curator.max_document_chunk_size
async def process(self, event: dict) -> None:
"""Process document.uploaded event."""
tenant_id = event["tenant_id"]
data = event["data"]
document_id = data["document_id"]
log = logger.bind(
tenant_id=tenant_id,
document_id=document_id
)
log.info("processing_document", file_name=data["file_name"])
# Download document content
content = await self._download_document(data["download_url"])
if not content:
log.warning("document_download_failed")
return
# Chunk if large
chunks = chunk_document(
content,
max_size=self.chunk_size,
mime_type=data["mime_type"]
)
# Process each chunk
all_memories = []
for i, chunk in enumerate(chunks):
log.debug("processing_chunk", chunk_index=i, total_chunks=len(chunks))
prompt = DOCUMENT_EXTRACTION_PROMPT.format(
title=data["file_name"],
mime_type=data["mime_type"],
subject=f"document:{document_id}",
content=chunk
)
response = await self.llm.complete(prompt)
extracted = parse_extraction_response(response)
all_memories.extend(extracted)
# Deduplicate and create memories
unique_memories = self._deduplicate(all_memories)
created_count = 0
for memory_data in unique_memories[:20]: # Limit per document
try:
request = MemoryCreateRequest(
subject={"type": "document", "id": document_id},
kind=MemoryKind(memory_data["kind"]),
text=memory_data["content"]["text"],
tags=memory_data.get("tags", []),
document_id=document_id,
confidence=memory_data["scores"]["confidence"],
)
await self.engine_client.create_memory(request)
created_count += 1
except Exception as e:
log.error("memory_creation_failed", error=str(e))
log.info(
"document_processed",
chunks=len(chunks),
extracted=len(all_memories),
created=created_count
)
async def _download_document(self, url: str) -> str | None:
"""Download document content."""
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=60)
response.raise_for_status()
return response.text
except Exception as e:
logger.error("document_download_error", error=str(e))
return None
def _deduplicate(self, memories: List[dict]) -> List[dict]:
"""Remove duplicate memories based on content similarity."""
seen_texts = set()
unique = []
for mem in memories:
text = mem["content"]["text"].lower().strip()
# Simple dedup - could use embeddings for better similarity
if text not in seen_texts:
seen_texts.add(text)
unique.append(mem)
return unique
Decay Processor
# src/memory_curator/processors/decay.py
from datetime import datetime
import structlog
from memory_domain.scoring.decay import calculate_salience_decay
from memory_curator.config import Settings
from memory_curator.processors.base import BaseProcessor
logger = structlog.get_logger()
class DecayProcessor(BaseProcessor):
"""Applies salience decay to memories."""
def __init__(self, settings: Settings):
super().__init__(settings)
self.batch_size = settings.curator.decay_batch_size
async def process(self, event: dict) -> None:
"""Process maintenance.salience_decay event."""
tenant_id = event.get("tenant_id")
log = logger.bind(tenant_id=tenant_id)
log.info("starting_decay_processing")
# If tenant_id is None, process all tenants
# This is typically triggered by a cron job
total_updated = 0
offset = 0
while True:
# Fetch batch of memories
memories = await self._fetch_memories_batch(
tenant_id,
offset,
self.batch_size
)
if not memories:
break
# Calculate and apply decay
updates = []
for memory in memories:
new_salience = calculate_salience_decay(
base_salience=memory["scores"]["salience"],
last_accessed=datetime.fromisoformat(memory["accessed_at"]),
memory_kind=memory["kind"]
)
# Only update if changed significantly
if abs(new_salience - memory["scores"]["salience"]) > 0.01:
updates.append({
"memory_id": memory["id"],
"embedding_id": memory.get("embedding_id"),
"new_salience": new_salience
})
# Batch update
if updates:
await self._apply_decay_updates(updates)
total_updated += len(updates)
offset += self.batch_size
# Safety limit
if offset > 100000:
log.warning("decay_processing_limit_reached")
break
log.info("decay_processing_complete", total_updated=total_updated)
async def _fetch_memories_batch(
self,
tenant_id: str | None,
offset: int,
limit: int
) -> list:
"""Fetch batch of memories for decay processing."""
# This would use a special admin endpoint
# that can list all memories with pagination
pass
async def _apply_decay_updates(self, updates: list) -> None:
"""Apply decay updates to memories."""
# Batch update endpoint
for update in updates:
try:
await self.engine_client.update_memory_scores(
memory_id=update["memory_id"],
salience=update["new_salience"]
)
except Exception as e:
logger.error(
"decay_update_failed",
memory_id=update["memory_id"],
error=str(e)
)
Main Entry Point
# src/memory_curator/main.py
import asyncio
import signal
import structlog
from memory_curator.config import Settings
from memory_curator.consumer.jetstream import JetStreamConsumer
from memory_curator.consumer.handlers import EventRouter
logger = structlog.get_logger()
async def main():
"""Main worker entry point."""
settings = Settings()
# Setup logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger.info(
"starting_curator_worker",
worker_id=settings.curator.worker_id
)
# Initialize consumer
consumer = JetStreamConsumer(settings.nats)
await consumer.connect()
# Initialize event router
router = EventRouter(settings)
# Setup shutdown handling
shutdown_event = asyncio.Event()
def handle_shutdown(sig):
logger.info("shutdown_signal_received", signal=sig)
shutdown_event.set()
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, handle_shutdown, sig)
# Run consumer
try:
consumer_task = asyncio.create_task(
consumer.subscribe(router.handle_event)
)
# Wait for shutdown
await shutdown_event.wait()
# Graceful shutdown
consumer_task.cancel()
await consumer.close()
except Exception as e:
logger.error("worker_error", error=str(e))
raise
logger.info("curator_worker_stopped")
if __name__ == "__main__":
asyncio.run(main())
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir .
COPY src/ src/
ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1
RUN useradd -m -u 1000 appuser
USER appuser
CMD ["python", "-m", "memory_curator.main"]
Environment Variables
| Variable | Required | Default | Description |
|---|---|---|---|
CURATOR_WORKER_ID | No | curator-1 | Worker identifier |
CURATOR_CONCURRENCY | No | 5 | Parallel processing |
CURATOR_LLM_PROVIDER | No | openai | openai or anthropic |
CURATOR_LLM_MODEL | No | gpt-4o | Model name |
OPENAI_API_KEY | Conditional | - | Required for OpenAI |
ANTHROPIC_API_KEY | Conditional | - | Required for Anthropic |
CURATOR_ENGINE_URL | No | http://localhost:8000 | Engine service URL |
CURATOR_ENGINE_API_KEY | Yes | - | Engine API key |
NATS_URL | No | nats://localhost:4222 | NATS server |
Acceptance Criteria
- Conversation extraction produces quality memories
- Document chunking handles large files
- LLM failures are retried appropriately
- Decay processing is efficient
- Events are acknowledged correctly
- Dead letter handling works
- Worker scales horizontally