Skip to main content

memory-store-janusgraph

Graph Storage Layer

PropertyValue
Repositorymemory-store-janusgraph
LanguagePython 3.11+
TypeLibrary (pip installable)
DatabaseJanusGraph 1.0+ (Gremlin)

Purpose

Implements the graph persistence layer for the Memory Platform. It provides a Pythonic abstraction over Gremlin to manage the Knowledge Graph, ensuring:

  • Tenant Isolation: Strict segregation of graph data by tenant.
  • Schema Management: Definitions for Vertices, Edges, and Properties.
  • Graph Traversal: Optimized patterns for memory retrieval and exploration.

Repository Structure

memory-store-janusgraph/
├── pyproject.toml
├── README.md
├── CHANGELOG.md
├── src/
│ └── memory_store_janusgraph/
│ ├── __init__.py
│ ├── py.typed
│ │
│ ├── client/
│ │ ├── __init__.py
│ │ ├── connection.py # Gremlin client management
│ │ └── health.py # Health checks
│ │
│ ├── schema/
│ │ ├── __init__.py
│ │ ├── management.py # Schema creation & updates
│ │ ├── definitions.py # Vertex/Edge definitions
│ │ └── migrations/ # Schema migration scripts
│ │
│ ├── repository/
│ │ ├── __init__.py
│ │ ├── base.py # Tenant-aware base repo
│ │ ├── vertices.py # Vertex CRUD
│ │ ├── edges.py # Edge CRUD
│ │ └── traversal.py # Complex graph queries
│ │
│ └── config.py # Configuration

├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_schema/
│ ├── test_repository/
│ │ ├── test_vertices.py
│ │ ├── test_edges.py
│ │ └── test_isolation.py
│ └── test_integration/

├── docker/
│ ├── Dockerfile
│ ├── docker-compose.test.yml
│ └── janusgraph-config/ # Custom JanusGraph config

└── docs/
├── schema_reference.md
└── traversal_patterns.md

Dependencies

# pyproject.toml
[project]
name = "memory-store-janusgraph"
version = "0.1.0"
description = "JanusGraph storage layer for Memory Platform"
requires-python = ">=3.11"
dependencies = [
"gremlinpython>=3.7.0",
"memory-domain>=0.1.0",
"pydantic-settings>=2.1",
"structlog>=23.2",
"tenacity>=8.2",
]

[project.optional-dependencies]
dev = [
"pytest>=7.4",
"pytest-asyncio>=0.21",
"pytest-cov>=4.1",
"testcontainers>=3.7",
"ruff>=0.1",
"mypy>=1.7",
]

Configuration

# src/memory_store_janusgraph/config.py
from pydantic_settings import BaseSettings
from pydantic import Field

class JanusGraphConfig(BaseSettings):
"""JanusGraph connection configuration."""

model_config = {"env_prefix": "JANUS_"}

# Connection
url: str = Field(default="ws://localhost:8182/gremlin")
traversal_source: str = Field(default="g")

# Connection Pool
pool_size: int = Field(default=8)
max_workers: int = Field(default=4)

# Auth (if enabled)
username: str | None = Field(default=None)
password: str | None = Field(default=None)

# Timeouts
timeout_seconds: int = Field(default=30)
keepalive_interval_seconds: int = Field(default=60)

# Retry Strategy
retry_attempts: int = Field(default=3)
retry_min_wait_seconds: float = Field(default=0.5)
retry_max_wait_seconds: float = Field(default=5.0)

Default Graph Schema

The internal schema management should enforce these definitions using the JanusGraph Management API.

Vertex Labels

Defined in memory_domain.enums.GraphNodeType:

  • User, Org, Project, Tool, Channel, Document, Topic
  • MemoryBlock, Conversation, Event

Edge Labels

Defined in memory_domain.enums.GraphEdgeType:

  • ABOUT, PREFERS, DERIVED_FROM, MENTIONS
  • BELONGS_TO, RELATED_TO, SUPERSEDES
  • HAS_INTERACTION, WORKS_ON, USES, CONTAINS

Property Keys

KeyTypePurpose
tenant_idStringPartition Key for isolation
idStringExternal ID / UUID
nameStringHuman readable label
kindStringEnum string
salience_scoreFloatSorting/Filtering
created_atLongTimestamp
updated_atLongTimestamp

Indices (Critical)

  1. Composite Index byTenantAndType: [tenant_id] on all vertices.
  2. Composite Index userByTenantAndId: [tenant_id, id] unique on User.
  3. Composite Index orgByTenantAndId: [tenant_id, id] unique on Org.
  4. Mixed Index memoryBlockFullText: [name] (Text) on MemoryBlock.

Tenant Isolation Strategy

CRITICAL RULE: Every Gremlin traversal MUST start with a tenant filter.

# src/memory_store_janusgraph/repository/base.py
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import GraphTraversalSource
import structlog

logger = structlog.get_logger()

class TenantIsolatedGraphRepository:
"""
Base repository that enforces tenant isolation on all graph operations.
"""

def __init__(self, tenant_id: str, client: GraphTraversalSource):
self.tenant_id = tenant_id
self.g = client
self._log = logger.bind(tenant_id=tenant_id)

def _t(self):
"""
Start a traversal scoped to the current tenant.
Use this instead of self.g.V() directly.
"""
return self.g.V().has("tenant_id", self.tenant_id)

async def get_vertex(self, vertex_id: str):
"""Get vertex by JanusGraph ID (checked against tenant)."""
return await self.g.V(vertex_id).has("tenant_id", self.tenant_id).next()

Core Operations

1. Vertex Management

# src/memory_store_janusgraph/repository/vertices.py
from typing import Any, Dict
from memory_domain.enums import GraphNodeType

class VertexRepository(TenantIsolatedGraphRepository):

async def create_vertex(
self,
node_type: GraphNodeType,
properties: Dict[str, Any]
) -> str:
"""Create a new vertex with forced tenant_id."""

# Inject tenant
properties["tenant_id"] = self.tenant_id

# Start traversal
t = self.g.addV(node_type.value)

# Add properties
for k, v in properties.items():
t = t.property(k, v)

result = await t.next()
return result.id

async def get_by_external_id(
self,
node_type: GraphNodeType,
external_id: str
):
"""Get vertex by domain-specific ID."""
return await self._t()\
.has_label(node_type.value)\
.has("id", external_id)\
.element_map()\
.next()

2. Edge Management

# src/memory_store_janusgraph/repository/edges.py
from memory_domain.enums import GraphEdgeType

class EdgeRepository(TenantIsolatedGraphRepository):

async def create_edge(
self,
from_id: str,
to_id: str,
edge_type: GraphEdgeType,
properties: Dict[str, Any] = None
):
"""
Create edge between two vertices.
Validates that BOTH vertices belong to the current tenant.
"""
# Verification is implicitly handled if we find nodes via _t()
# but explicit check is safer for data integrity

t = self._t().has_id(from_id).as_("from")\
.V(to_id).has("tenant_id", self.tenant_id).as_("to")\
.addE(edge_type.value).from_("from").to("to")

if properties:
for k, v in properties.items():
t = t.property(k, v)

return await t.next()

3. Traversal Patterns

# src/memory_store_janusgraph/repository/traversal.py

class GraphTraversalRepository(TenantIsolatedGraphRepository):

async def get_neighborhood(self, vertex_id: str, depth: int = 1):
"""
Get surrounding graph context.
useful for visualizing context around a memory or entity.
"""
return await self.g.V(vertex_id).has("tenant_id", self.tenant_id)\
.repeat(
__.bothE().otherV().has("tenant_id", self.tenant_id).simplePath()
).times(depth)\
.path()\
.by(__.element_map())\
.to_list()

async def find_related_topics(self, memory_id: str):
"""Find topics associated with a memory block."""
return await self.g.V(memory_id).has("tenant_id", self.tenant_id)\
.out("ABOUT")\
.has_label("Topic")\
.value_map()\
.to_list()