memory-store-janusgraph
Graph Storage Layer
| Property | Value |
|---|---|
| Repository | memory-store-janusgraph |
| Language | Python 3.11+ |
| Type | Library (pip installable) |
| Database | JanusGraph 1.0+ (Gremlin) |
Purpose
Implements the graph persistence layer for the Memory Platform. It provides a Pythonic abstraction over Gremlin to manage the Knowledge Graph, ensuring:
- Tenant Isolation: Strict segregation of graph data by tenant.
- Schema Management: Definitions for Vertices, Edges, and Properties.
- Graph Traversal: Optimized patterns for memory retrieval and exploration.
Repository Structure
memory-store-janusgraph/
├── pyproject.toml
├── README.md
├── CHANGELOG.md
├── src/
│ └── memory_store_janusgraph/
│ ├── __init__.py
│ ├── py.typed
│ │
│ ├── client/
│ │ ├── __init__.py
│ │ ├── connection.py # Gremlin client management
│ │ └── health.py # Health checks
│ │
│ ├── schema/
│ │ ├── __init__.py
│ │ ├── management.py # Schema creation & updates
│ │ ├── definitions.py # Vertex/Edge definitions
│ │ └── migrations/ # Schema migration scripts
│ │
│ ├── repository/
│ │ ├── __init__.py
│ │ ├── base.py # Tenant-aware base repo
│ │ ├── vertices.py # Vertex CRUD
│ │ ├── edges.py # Edge CRUD
│ │ └── traversal.py # Complex graph queries
│ │
│ └── config.py # Configuration
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_schema/
│ ├── test_repository/
│ │ ├── test_vertices.py
│ │ ├── test_edges.py
│ │ └── test_isolation.py
│ └── test_integration/
│
├── docker/
│ ├── Dockerfile
│ ├── docker-compose.test.yml
│ └── janusgraph-config/ # Custom JanusGraph config
│
└── docs/
├── schema_reference.md
└── traversal_patterns.md
Dependencies
# pyproject.toml
[project]
name = "memory-store-janusgraph"
version = "0.1.0"
description = "JanusGraph storage layer for Memory Platform"
requires-python = ">=3.11"
dependencies = [
"gremlinpython>=3.7.0",
"memory-domain>=0.1.0",
"pydantic-settings>=2.1",
"structlog>=23.2",
"tenacity>=8.2",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4",
"pytest-asyncio>=0.21",
"pytest-cov>=4.1",
"testcontainers>=3.7",
"ruff>=0.1",
"mypy>=1.7",
]
Configuration
# src/memory_store_janusgraph/config.py
from pydantic_settings import BaseSettings
from pydantic import Field
class JanusGraphConfig(BaseSettings):
"""JanusGraph connection configuration."""
model_config = {"env_prefix": "JANUS_"}
# Connection
url: str = Field(default="ws://localhost:8182/gremlin")
traversal_source: str = Field(default="g")
# Connection Pool
pool_size: int = Field(default=8)
max_workers: int = Field(default=4)
# Auth (if enabled)
username: str | None = Field(default=None)
password: str | None = Field(default=None)
# Timeouts
timeout_seconds: int = Field(default=30)
keepalive_interval_seconds: int = Field(default=60)
# Retry Strategy
retry_attempts: int = Field(default=3)
retry_min_wait_seconds: float = Field(default=0.5)
retry_max_wait_seconds: float = Field(default=5.0)
Default Graph Schema
The internal schema management should enforce these definitions using the JanusGraph Management API.
Vertex Labels
Defined in memory_domain.enums.GraphNodeType:
User,Org,Project,Tool,Channel,Document,TopicMemoryBlock,Conversation,Event
Edge Labels
Defined in memory_domain.enums.GraphEdgeType:
ABOUT,PREFERS,DERIVED_FROM,MENTIONSBELONGS_TO,RELATED_TO,SUPERSEDESHAS_INTERACTION,WORKS_ON,USES,CONTAINS
Property Keys
| Key | Type | Purpose |
|---|---|---|
tenant_id | String | Partition Key for isolation |
id | String | External ID / UUID |
name | String | Human readable label |
kind | String | Enum string |
salience_score | Float | Sorting/Filtering |
created_at | Long | Timestamp |
updated_at | Long | Timestamp |
Indices (Critical)
- Composite Index
byTenantAndType:[tenant_id]on all vertices. - Composite Index
userByTenantAndId:[tenant_id, id]unique onUser. - Composite Index
orgByTenantAndId:[tenant_id, id]unique onOrg. - Mixed Index
memoryBlockFullText:[name](Text) onMemoryBlock.
Tenant Isolation Strategy
CRITICAL RULE: Every Gremlin traversal MUST start with a tenant filter.
# src/memory_store_janusgraph/repository/base.py
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import GraphTraversalSource
import structlog
logger = structlog.get_logger()
class TenantIsolatedGraphRepository:
"""
Base repository that enforces tenant isolation on all graph operations.
"""
def __init__(self, tenant_id: str, client: GraphTraversalSource):
self.tenant_id = tenant_id
self.g = client
self._log = logger.bind(tenant_id=tenant_id)
def _t(self):
"""
Start a traversal scoped to the current tenant.
Use this instead of self.g.V() directly.
"""
return self.g.V().has("tenant_id", self.tenant_id)
async def get_vertex(self, vertex_id: str):
"""Get vertex by JanusGraph ID (checked against tenant)."""
return await self.g.V(vertex_id).has("tenant_id", self.tenant_id).next()
Core Operations
1. Vertex Management
# src/memory_store_janusgraph/repository/vertices.py
from typing import Any, Dict
from memory_domain.enums import GraphNodeType
class VertexRepository(TenantIsolatedGraphRepository):
async def create_vertex(
self,
node_type: GraphNodeType,
properties: Dict[str, Any]
) -> str:
"""Create a new vertex with forced tenant_id."""
# Inject tenant
properties["tenant_id"] = self.tenant_id
# Start traversal
t = self.g.addV(node_type.value)
# Add properties
for k, v in properties.items():
t = t.property(k, v)
result = await t.next()
return result.id
async def get_by_external_id(
self,
node_type: GraphNodeType,
external_id: str
):
"""Get vertex by domain-specific ID."""
return await self._t()\
.has_label(node_type.value)\
.has("id", external_id)\
.element_map()\
.next()
2. Edge Management
# src/memory_store_janusgraph/repository/edges.py
from memory_domain.enums import GraphEdgeType
class EdgeRepository(TenantIsolatedGraphRepository):
async def create_edge(
self,
from_id: str,
to_id: str,
edge_type: GraphEdgeType,
properties: Dict[str, Any] = None
):
"""
Create edge between two vertices.
Validates that BOTH vertices belong to the current tenant.
"""
# Verification is implicitly handled if we find nodes via _t()
# but explicit check is safer for data integrity
t = self._t().has_id(from_id).as_("from")\
.V(to_id).has("tenant_id", self.tenant_id).as_("to")\
.addE(edge_type.value).from_("from").to("to")
if properties:
for k, v in properties.items():
t = t.property(k, v)
return await t.next()
3. Traversal Patterns
# src/memory_store_janusgraph/repository/traversal.py
class GraphTraversalRepository(TenantIsolatedGraphRepository):
async def get_neighborhood(self, vertex_id: str, depth: int = 1):
"""
Get surrounding graph context.
useful for visualizing context around a memory or entity.
"""
return await self.g.V(vertex_id).has("tenant_id", self.tenant_id)\
.repeat(
__.bothE().otherV().has("tenant_id", self.tenant_id).simplePath()
).times(depth)\
.path()\
.by(__.element_map())\
.to_list()
async def find_related_topics(self, memory_id: str):
"""Find topics associated with a memory block."""
return await self.g.V(memory_id).has("tenant_id", self.tenant_id)\
.out("ABOUT")\
.has_label("Topic")\
.value_map()\
.to_list()