Skip to main content

memory-curator-worker

LLM-Powered Knowledge Extraction

PropertyValue
Repositorymemory-curator-worker
LanguagePython 3.11+
TypeBackground Worker Service
FrameworkNATS JetStream Consumer

Purpose

Background worker that processes events and extracts structured memories using LLM:

  • Conversation curation (extract facts, preferences, insights)
  • Semantic memory extraction from documents (building on structural parsing)
  • Tool result memorization
  • Salience decay management

Repository Structure

memory-curator-worker/
├── pyproject.toml
├── README.md
├── Dockerfile
├── docker-compose.yml

├── src/
│ └── memory_curator/
│ ├── __init__.py
│ ├── main.py # Worker entry point
│ ├── config.py # Configuration
│ │
│ ├── consumer/
│ │ ├── __init__.py
│ │ ├── jetstream.py # NATS JetStream consumer
│ │ └── handlers.py # Event routing
│ │
│ ├── processors/
│ │ ├── __init__.py
│ │ ├── base.py # Base processor class
│ │ ├── conversation.py # Conversation curation
│ │ ├── document.py # Document ingestion
│ │ ├── tool_result.py # Tool output processing
│ │ └── decay.py # Salience decay
│ │
│ ├── extraction/
│ │ ├── __init__.py
│ │ ├── llm_client.py # LLM API client
│ │ ├── prompts.py # Extraction prompts
│ │ ├── parser.py # Output parsing
│ │ └── chunker.py # Document chunking
│ │
│ ├── integrations/
│ │ ├── __init__.py
│ │ ├── google_drive.py # Google Drive client
│ │ ├── notion.py # Notion client
│ │ └── transcript.py # Transcript fetching
│ │
│ └── utils/
│ ├── __init__.py
│ └── retry.py # Retry logic

├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_processors/
│ │ ├── test_conversation.py
│ │ └── test_document.py
│ └── test_extraction/
│ ├── test_prompts.py
│ └── test_parser.py

└── docs/
├── processors.md
└── prompts.md

Dependencies

# pyproject.toml
[project]
name = "memory-curator-worker"
version = "0.1.0"
description = "LLM-powered knowledge extraction worker"
requires-python = ">=3.11"
dependencies = [
"nats-py>=2.6",
"memory-domain>=0.1.0",
"openai>=1.6",
"anthropic>=0.18",
"httpx>=0.26",
"pydantic>=2.5",
"pydantic-settings>=2.1",
"structlog>=23.2",
"tenacity>=8.2",
"tiktoken>=0.5",
"google-api-python-client>=2.111",
"google-auth>=2.25",
]

[project.optional-dependencies]
dev = [
"pytest>=7.4",
"pytest-asyncio>=0.21",
"pytest-cov>=4.1",
"ruff>=0.1",
"mypy>=1.7",
]

Configuration

# src/memory_curator/config.py
from enum import Enum
from pydantic_settings import BaseSettings
from pydantic import Field

class LLMProvider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"

class CuratorConfig(BaseSettings):
"""Curator worker configuration."""

model_config = {"env_prefix": "CURATOR_"}

# Worker settings
worker_id: str = Field(default="curator-1")
concurrency: int = Field(default=5, ge=1, le=20)

# LLM settings
llm_provider: LLMProvider = Field(default=LLMProvider.OPENAI)
llm_model: str = Field(default="gpt-4o")
llm_temperature: float = Field(default=0.1, ge=0, le=1)
llm_max_tokens: int = Field(default=4000)

# API keys
openai_api_key: str | None = Field(default=None)
anthropic_api_key: str | None = Field(default=None)

# Memory Engine
engine_url: str = Field(default="http://localhost:8000")
engine_api_key: str | None = Field(default=None)

# Processing settings
max_conversation_messages: int = Field(default=100)
max_document_chunk_size: int = Field(default=4000)
max_memories_per_conversation: int = Field(default=10)

# Decay settings
decay_batch_size: int = Field(default=1000)
decay_schedule_cron: str = Field(default="0 3 * * *") # 3 AM daily


class NatsConfig(BaseSettings):
"""NATS configuration."""

model_config = {"env_prefix": "NATS_"}

url: str = Field(default="nats://localhost:4222")
stream_name: str = Field(default="MEMORY_EVENTS")

# Consumer settings
consumer_name: str = Field(default="memory-curator")
ack_wait_seconds: int = Field(default=60)
max_deliver: int = Field(default=3)


class Settings(BaseSettings):
"""Combined settings."""

curator: CuratorConfig = Field(default_factory=CuratorConfig)
nats: NatsConfig = Field(default_factory=NatsConfig)

JetStream Consumer

# src/memory_curator/consumer/jetstream.py
import asyncio
from typing import Callable, Awaitable
import nats
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, AckPolicy, DeliverPolicy
import structlog

from memory_curator.config import NatsConfig

logger = structlog.get_logger()

class JetStreamConsumer:
"""NATS JetStream durable consumer."""

def __init__(self, config: NatsConfig):
self.config = config
self._nc: nats.NATS | None = None
self._js: JetStreamContext | None = None
self._subscription = None
self._running = False

async def connect(self) -> None:
"""Connect to NATS and setup consumer."""
self._nc = await nats.connect(self.config.url)
self._js = self._nc.jetstream()

# Create durable consumer
try:
await self._js.add_consumer(
self.config.stream_name,
config=ConsumerConfig(
durable_name=self.config.consumer_name,
ack_policy=AckPolicy.EXPLICIT,
deliver_policy=DeliverPolicy.ALL,
ack_wait=self.config.ack_wait_seconds,
max_deliver=self.config.max_deliver,
)
)
except Exception:
# Consumer may already exist
pass

logger.info(
"jetstream_connected",
stream=self.config.stream_name,
consumer=self.config.consumer_name
)

async def subscribe(
self,
handler: Callable[[dict], Awaitable[None]],
subjects: list[str] | None = None
) -> None:
"""Subscribe to messages with handler."""
if not self._js:
raise RuntimeError("Not connected")

# Default to all memory events
subject = subjects[0] if subjects else "memory.>"

async def message_handler(msg):
try:
import json
event = json.loads(msg.data.decode())

logger.debug(
"message_received",
subject=msg.subject,
event_type=event.get("event_type")
)

await handler(event)
await msg.ack()

except Exception as e:
logger.error(
"message_processing_failed",
subject=msg.subject,
error=str(e)
)
# Will be redelivered up to max_deliver times
await msg.nak()

self._subscription = await self._js.pull_subscribe(
subject,
durable=self.config.consumer_name,
)

self._running = True

# Pull loop
while self._running:
try:
messages = await self._subscription.fetch(batch=10, timeout=5)
for msg in messages:
await message_handler(msg)
except nats.errors.TimeoutError:
continue
except Exception as e:
logger.error("pull_error", error=str(e))
await asyncio.sleep(1)

async def close(self) -> None:
"""Close connection."""
self._running = False
if self._nc:
await self._nc.close()
logger.info("jetstream_disconnected")

Event Handlers

# src/memory_curator/consumer/handlers.py
import structlog
from memory_curator.config import Settings
from memory_curator.processors.conversation import ConversationProcessor
from memory_curator.processors.document import DocumentProcessor
from memory_curator.processors.tool_result import ToolResultProcessor
from memory_curator.processors.decay import DecayProcessor

logger = structlog.get_logger()

class EventRouter:
"""Routes events to appropriate processors."""

def __init__(self, settings: Settings):
self.settings = settings

# Initialize processors
self.conversation_processor = ConversationProcessor(settings)
self.document_processor = DocumentProcessor(settings)
self.tool_processor = ToolResultProcessor(settings)
self.decay_processor = DecayProcessor(settings)

async def handle_event(self, event: dict) -> None:
"""Route event to appropriate processor."""
event_type = event.get("event_type", "")
tenant_id = event.get("tenant_id")

log = logger.bind(event_type=event_type, tenant_id=tenant_id)

try:
if event_type == "conversation.ended":
await self.conversation_processor.process(event)

elif event_type == "document.uploaded":
await self.document_processor.process(event)

elif event_type == "tool.executed":
await self.tool_processor.process(event)

elif event_type == "maintenance.salience_decay":
await self.decay_processor.process(event)

else:
log.warning("unknown_event_type")

except Exception as e:
log.error("event_processing_failed", error=str(e))
raise

Conversation Processor

# src/memory_curator/processors/conversation.py
from typing import List
import structlog

from memory_domain import (
MemoryBlock,
MemoryKind,
MemorySource,
MemoryCreateRequest,
)
from memory_curator.config import Settings
from memory_curator.extraction.llm_client import LLMClient
from memory_curator.extraction.prompts import CONVERSATION_CURATION_PROMPT
from memory_curator.extraction.parser import parse_extraction_response
from memory_curator.processors.base import BaseProcessor

logger = structlog.get_logger()

class ConversationProcessor(BaseProcessor):
"""Extracts memories from completed conversations."""

def __init__(self, settings: Settings):
super().__init__(settings)
self.llm = LLMClient(settings.curator)
self.max_memories = settings.curator.max_memories_per_conversation

async def process(self, event: dict) -> None:
"""Process conversation.ended event."""
tenant_id = event["tenant_id"]
conversation_id = event["data"]["conversation_id"]
user_id = event["data"]["user_id"]

log = logger.bind(
tenant_id=tenant_id,
conversation_id=conversation_id
)

log.info("processing_conversation")

# Fetch transcript
transcript = await self._fetch_transcript(event)

if not transcript:
log.warning("no_transcript_available")
return

# Fetch existing context about user
existing_memories = await self._fetch_existing_memories(
tenant_id, user_id
)

# Build extraction prompt
prompt = self._build_prompt(
transcript=transcript,
existing_memories=existing_memories,
user_id=user_id
)

# Call LLM for extraction
response = await self.llm.complete(prompt)

# Parse extracted memories
extracted = parse_extraction_response(response)

if not extracted:
log.info("no_memories_extracted")
return

# Limit number of memories
extracted = extracted[:self.max_memories]

# Create memories via engine API
created_count = 0
for memory_data in extracted:
try:
request = MemoryCreateRequest(
subject={"type": "user", "id": user_id},
kind=MemoryKind(memory_data["kind"]),
text=memory_data["content"]["text"],
tags=memory_data.get("tags", []),
conversation_id=conversation_id,
salience=memory_data["scores"]["salience"],
stability=memory_data["scores"]["stability"],
confidence=memory_data["scores"]["confidence"],
)

await self.engine_client.create_memory(request)
created_count += 1

except Exception as e:
log.error("memory_creation_failed", error=str(e))

log.info(
"conversation_processed",
extracted=len(extracted),
created=created_count
)

async def _fetch_transcript(self, event: dict) -> str | None:
"""Fetch conversation transcript."""
transcript_url = event["data"].get("transcript_url")

if transcript_url:
# Fetch from URL
async with httpx.AsyncClient() as client:
response = await client.get(transcript_url)
return response.text

# Fallback: try to get from conversation service
# Implementation depends on your infrastructure
return None

async def _fetch_existing_memories(
self,
tenant_id: str,
user_id: str
) -> List[dict]:
"""Fetch existing memories about user for context."""
try:
memories = await self.engine_client.get_working_set(
subject_type="user",
subject_id=user_id,
max_blocks=20
)
return [
{
"kind": m["kind"],
"text": m["content"]["text"],
"tags": m.get("tags", [])
}
for m in memories
]
except Exception:
return []

def _build_prompt(
self,
transcript: str,
existing_memories: List[dict],
user_id: str
) -> str:
"""Build the extraction prompt."""
context_str = ""
if existing_memories:
context_str = "\n".join([
f"- [{m['kind']}] {m['text']}"
for m in existing_memories
])
else:
context_str = "No existing memories."

return CONVERSATION_CURATION_PROMPT.format(
subject=f"User: {user_id}",
context_memories=context_str,
transcript=transcript
)

Extraction Prompts

# src/memory_curator/extraction/prompts.py

CONVERSATION_CURATION_PROMPT = """You are a memory curator for an AI assistant. Your job is to extract structured, memorable knowledge from conversations.

**Subject:** \{subject\}


**Existing Knowledge About This Subject:**
{context_memories}

**New Conversation:**
{transcript}

**Your Task:**
Extract memories that are:
1. **Facts** - Verifiable, stable information that doesn't change often
2. **Preferences** - Stated likes, dislikes, requirements, or behavioral patterns
3. **Insights** - Patterns, opportunities, risks, or discoveries you infer

**Guidelines:**
- Extract 3-10 memories (focus on quality over quantity)
- Avoid duplicating existing knowledge unless it's an update
- Facts should be specific and verifiable
- Use clear, concise language that an LLM can easily understand
- Each memory should be self-contained and meaningful on its own

**Output Format (JSON):**
```json
{{
"blocks": [
{{
"kind": "fact|preference|insight",
"content": {{
"text": "Natural language summary suitable for LLM context"
}},
"tags": ["list", "of", "relevant", "concepts"],
"scores": {{
"salience": 0.0-1.0,
"stability": 0.0-1.0,
"confidence": 0.0-1.0
}},
"reasoning": "Why this is worth remembering"
}}
]
}}

Extract memories now:"""

DOCUMENT_EXTRACTION_PROMPT = """You are a memory curator extracting knowledge from a document.

Document Title: {title} Document Type: {mime_type}

Associated Subject: {subject}

Document Content: {content}

Your Task: Extract key facts, insights, and notable information from this document that would be valuable to remember.

Guidelines:

  • Focus on information that would help an AI assistant better serve the subject
  • Extract specific details, not general summaries
  • Include relevant context so memories are self-contained
  • Mark confidence based on how clear the information is in the document

Output Format (JSON):

{{
"blocks": [
{{
"kind": "fact|insight|summary",
"content": {{
"text": "Extracted information",
"structured": {{}}
}},
"tags": ["relevant", "topics"],
"scores": {{
"salience": 0.0-1.0,
"stability": 0.0-1.0,
"confidence": 0.0-1.0
}}
}}
]
}}

Extract memories now:"""

TOOL_RESULT_PROMPT = """Extract memorable information from this tool execution result.

Tool: {tool_name} Input: {input_summary} Output: {output_summary}

Extract any facts or insights worth remembering:"""


---

## LLM Client

```python
# src/memory_curator/extraction/llm_client.py
from typing import Optional
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential

from memory_curator.config import CuratorConfig, LLMProvider

logger = structlog.get_logger()

class LLMClient:
"""Client for LLM completions."""

def __init__(self, config: CuratorConfig):
self.config = config
self._client = None

@property
def client(self):
"""Lazy initialize LLM client."""
if self._client is None:
if self.config.llm_provider == LLMProvider.OPENAI:
from openai import AsyncOpenAI
self._client = AsyncOpenAI(api_key=self.config.openai_api_key)
elif self.config.llm_provider == LLMProvider.ANTHROPIC:
from anthropic import AsyncAnthropic
self._client = AsyncAnthropic(api_key=self.config.anthropic_api_key)
return self._client

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
)
async def complete(self, prompt: str) -> str:
"""Generate completion from LLM."""
logger.debug("llm_request", provider=self.config.llm_provider)

if self.config.llm_provider == LLMProvider.OPENAI:
response = await self.client.chat.completions.create(
model=self.config.llm_model,
messages=[{"role": "user", "content": prompt}],
temperature=self.config.llm_temperature,
max_tokens=self.config.llm_max_tokens,
)
return response.choices[0].message.content

elif self.config.llm_provider == LLMProvider.ANTHROPIC:
response = await self.client.messages.create(
model=self.config.llm_model,
messages=[{"role": "user", "content": prompt}],
temperature=self.config.llm_temperature,
max_tokens=self.config.llm_max_tokens,
)
return response.content[0].text

raise ValueError(f"Unknown provider: {self.config.llm_provider}")

Document Processor

The DocumentProcessor performs semantic extraction from documents. Unlike the structural parsing done in memory-store-documents, this processor focuses on identifying valuable knowledge blocks that should be added to the Memory Store.

Updated Flow

  1. Event: Receives document.parsed (not document.uploaded).
  2. Fetch: Calls memory-store-documents API to get the structural pages and text elements.
  3. Analyze: Uses LLM to extract facts, insights, and summaries from the text element hierarchy.
  4. Reference: Includes the document_id and character offsets in the resulting MemoryBlock for provenance.
  5. Store: Sends Create requests to memory-engine-service.
from typing import List
import httpx
import structlog

from memory_domain import MemoryKind, MemoryCreateRequest
from memory_curator.config import Settings
from memory_curator.extraction.llm_client import LLMClient
from memory_curator.extraction.prompts import DOCUMENT_EXTRACTION_PROMPT
from memory_curator.extraction.parser import parse_extraction_response
from memory_curator.extraction.chunker import chunk_document
from memory_curator.processors.base import BaseProcessor

logger = structlog.get_logger()

class DocumentProcessor(BaseProcessor):
"""Processes uploaded documents for memory extraction."""

def __init__(self, settings: Settings):
super().__init__(settings)
self.llm = LLMClient(settings.curator)
self.chunk_size = settings.curator.max_document_chunk_size

async def process(self, event: dict) -> None:
"""Process document.uploaded event."""
tenant_id = event["tenant_id"]
data = event["data"]
document_id = data["document_id"]

log = logger.bind(
tenant_id=tenant_id,
document_id=document_id
)

log.info("processing_document", file_name=data["file_name"])

# Download document content
content = await self._download_document(data["download_url"])

if not content:
log.warning("document_download_failed")
return

# Chunk if large
chunks = chunk_document(
content,
max_size=self.chunk_size,
mime_type=data["mime_type"]
)

# Process each chunk
all_memories = []
for i, chunk in enumerate(chunks):
log.debug("processing_chunk", chunk_index=i, total_chunks=len(chunks))

prompt = DOCUMENT_EXTRACTION_PROMPT.format(
title=data["file_name"],
mime_type=data["mime_type"],
subject=f"document:{document_id}",
content=chunk
)

response = await self.llm.complete(prompt)
extracted = parse_extraction_response(response)
all_memories.extend(extracted)

# Deduplicate and create memories
unique_memories = self._deduplicate(all_memories)

created_count = 0
for memory_data in unique_memories[:20]: # Limit per document
try:
request = MemoryCreateRequest(
subject={"type": "document", "id": document_id},
kind=MemoryKind(memory_data["kind"]),
text=memory_data["content"]["text"],
tags=memory_data.get("tags", []),
document_id=document_id,
confidence=memory_data["scores"]["confidence"],
)

await self.engine_client.create_memory(request)
created_count += 1

except Exception as e:
log.error("memory_creation_failed", error=str(e))

log.info(
"document_processed",
chunks=len(chunks),
extracted=len(all_memories),
created=created_count
)

async def _download_document(self, url: str) -> str | None:
"""Download document content."""
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=60)
response.raise_for_status()
return response.text
except Exception as e:
logger.error("document_download_error", error=str(e))
return None

def _deduplicate(self, memories: List[dict]) -> List[dict]:
"""Remove duplicate memories based on content similarity."""
seen_texts = set()
unique = []

for mem in memories:
text = mem["content"]["text"].lower().strip()
# Simple dedup - could use embeddings for better similarity
if text not in seen_texts:
seen_texts.add(text)
unique.append(mem)

return unique

Decay Processor

# src/memory_curator/processors/decay.py
from datetime import datetime
import structlog

from memory_domain.scoring.decay import calculate_salience_decay
from memory_curator.config import Settings
from memory_curator.processors.base import BaseProcessor

logger = structlog.get_logger()

class DecayProcessor(BaseProcessor):
"""Applies salience decay to memories."""

def __init__(self, settings: Settings):
super().__init__(settings)
self.batch_size = settings.curator.decay_batch_size

async def process(self, event: dict) -> None:
"""Process maintenance.salience_decay event."""
tenant_id = event.get("tenant_id")

log = logger.bind(tenant_id=tenant_id)
log.info("starting_decay_processing")

# If tenant_id is None, process all tenants
# This is typically triggered by a cron job

total_updated = 0
offset = 0

while True:
# Fetch batch of memories
memories = await self._fetch_memories_batch(
tenant_id,
offset,
self.batch_size
)

if not memories:
break

# Calculate and apply decay
updates = []
for memory in memories:
new_salience = calculate_salience_decay(
base_salience=memory["scores"]["salience"],
last_accessed=datetime.fromisoformat(memory["accessed_at"]),
memory_kind=memory["kind"]
)

# Only update if changed significantly
if abs(new_salience - memory["scores"]["salience"]) > 0.01:
updates.append({
"memory_id": memory["id"],
"embedding_id": memory.get("embedding_id"),
"new_salience": new_salience
})

# Batch update
if updates:
await self._apply_decay_updates(updates)
total_updated += len(updates)

offset += self.batch_size

# Safety limit
if offset > 100000:
log.warning("decay_processing_limit_reached")
break

log.info("decay_processing_complete", total_updated=total_updated)

async def _fetch_memories_batch(
self,
tenant_id: str | None,
offset: int,
limit: int
) -> list:
"""Fetch batch of memories for decay processing."""
# This would use a special admin endpoint
# that can list all memories with pagination
pass

async def _apply_decay_updates(self, updates: list) -> None:
"""Apply decay updates to memories."""
# Batch update endpoint
for update in updates:
try:
await self.engine_client.update_memory_scores(
memory_id=update["memory_id"],
salience=update["new_salience"]
)
except Exception as e:
logger.error(
"decay_update_failed",
memory_id=update["memory_id"],
error=str(e)
)

Main Entry Point

# src/memory_curator/main.py
import asyncio
import signal
import structlog

from memory_curator.config import Settings
from memory_curator.consumer.jetstream import JetStreamConsumer
from memory_curator.consumer.handlers import EventRouter

logger = structlog.get_logger()

async def main():
"""Main worker entry point."""
settings = Settings()

# Setup logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)

logger.info(
"starting_curator_worker",
worker_id=settings.curator.worker_id
)

# Initialize consumer
consumer = JetStreamConsumer(settings.nats)
await consumer.connect()

# Initialize event router
router = EventRouter(settings)

# Setup shutdown handling
shutdown_event = asyncio.Event()

def handle_shutdown(sig):
logger.info("shutdown_signal_received", signal=sig)
shutdown_event.set()

loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, handle_shutdown, sig)

# Run consumer
try:
consumer_task = asyncio.create_task(
consumer.subscribe(router.handle_event)
)

# Wait for shutdown
await shutdown_event.wait()

# Graceful shutdown
consumer_task.cancel()
await consumer.close()

except Exception as e:
logger.error("worker_error", error=str(e))
raise

logger.info("curator_worker_stopped")


if __name__ == "__main__":
asyncio.run(main())

Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY pyproject.toml .
RUN pip install --no-cache-dir .

COPY src/ src/

ENV PYTHONPATH=/app/src
ENV PYTHONUNBUFFERED=1

RUN useradd -m -u 1000 appuser
USER appuser

CMD ["python", "-m", "memory_curator.main"]

Environment Variables

VariableRequiredDefaultDescription
CURATOR_WORKER_IDNocurator-1Worker identifier
CURATOR_CONCURRENCYNo5Parallel processing
CURATOR_LLM_PROVIDERNoopenaiopenai or anthropic
CURATOR_LLM_MODELNogpt-4oModel name
OPENAI_API_KEYConditional-Required for OpenAI
ANTHROPIC_API_KEYConditional-Required for Anthropic
CURATOR_ENGINE_URLNohttp://localhost:8000Engine service URL
CURATOR_ENGINE_API_KEYYes-Engine API key
NATS_URLNonats://localhost:4222NATS server

Acceptance Criteria

  • Conversation extraction produces quality memories
  • Document chunking handles large files
  • LLM failures are retried appropriately
  • Decay processing is efficient
  • Events are acknowledged correctly
  • Dead letter handling works
  • Worker scales horizontally