Building a memory engineΒΆ
Moon's converged storage (KV, HNSW vectors, CSR graph, BM25 text) with cross-store ACID transactions makes it a natural substrate for AI memory engines. This guide shows how to wire the primitives together using the moondb Python SDK.
InstallΒΆ
ArchitectureΒΆ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Your Application β
β Ingest Pipeline β Extractors β Retrievers β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β moondb SDK β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Moon Server β
β KV β HNSW Vectors β CSR Graph β MQ β Temporal β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
One binary, one connection, one transaction β no cross-store drift.
Storage layoutΒΆ
| What | Moon primitive | Key pattern |
|---|---|---|
| Raw episodes (docs, messages) | KV | ep:{id} |
| Chunks with embeddings | KV + Vector index | ch:{ep_id}:{idx} |
| Entities (people, concepts) | KV + Vector + Graph node | en:{id} |
| Relations (edges) | Graph edge | graph.add_edge() |
| Facts (embedded triples) | KV + Vector | fa:{id} |
| Community summaries | KV + Vector | co:{level}:{id} |
Step 1: Create indexesΒΆ
from moondb import MoonClient, encode_vector
client = MoonClient(host="localhost", port=6379)
# Chunk embeddings (384-d cosine, auto-indexed on HSET)
client.vector.create_index(
"chunks",
prefix="ch:",
dim=384,
metric="COSINE",
extra_schema=["text", "TEXT"],
)
# Entity embeddings with text + tag fields
client.vector.create_index(
"entities",
prefix="en:",
dim=384,
metric="COSINE",
extra_schema=["name", "TEXT", "type", "TAG"],
)
# Knowledge graph
client.graph.create("knowledge")
Step 2: Atomic ingest with transactionsΒΆ
The key advantage β entity, embedding, graph edge, and KV metadata land in a single atomic write. If any step fails, everything rolls back β no orphaned vectors or dangling graph edges.
import json
def ingest_entity(client, name, entity_type, description, embedding, relations=None):
"""Atomic upsert: KV + vector + graph in one transaction."""
eid = f"en:{name.lower().replace(' ', '_')}"
client.execute_command("TXN", "BEGIN")
try:
# KV metadata
client.set(eid, json.dumps({
"name": name, "type": entity_type, "desc": description,
}))
# Vector (auto-indexed into HNSW on commit)
client.hset(eid, mapping={
"vec": encode_vector(embedding),
"name": name,
"type": entity_type,
})
# Graph node
node_id = client.graph.add_node(
"knowledge", entity_type, name=name,
)
# Graph edges
for rel in (relations or []):
client.graph.add_edge(
"knowledge", rel["from_id"], rel["to_id"],
rel["type"], weight=rel.get("weight"),
)
client.execute_command("TXN", "COMMIT")
return node_id
except Exception:
client.execute_command("TXN", "ABORT")
raise
Step 3: Hybrid retrievalΒΆ
Combine vector similarity, graph traversal, and BM25 text search using SDK methods.
Vector search (semantic)ΒΆ
results = client.vector.search("chunks", query_embedding, k=10)
for r in results:
print(f"{r.key}: {r.score:.4f} β {r.fields.get('text', '')[:80]}")
Graph traversal (relational)ΒΆ
# Find 2-hop neighbors of a node
neighbors = client.graph.neighbors("knowledge", node_id=1, depth=2)
for n in neighbors:
print(n.node_id, n.label, n.properties)
# Cypher query
result = client.graph.query(
"knowledge",
"MATCH (a:Person)-[:WORKS_AT]->(o:Organization) "
"RETURN a.name, o.name LIMIT 20",
)
for row in result.rows:
print(row)
BM25 text searchΒΆ
hits = client.text.text_search("chunks", "machine learning", limit=10)
for h in hits:
print(f"{h.id}: {h.score:.4f} β {h.fields.get('text', '')[:80]}")
Three-way hybrid fusion (vector + BM25 + graph)ΒΆ
def hybrid_retrieve(client, query_text, query_embedding, graph_name="knowledge", k=10):
"""Three-way retrieval: vector + BM25 + graph, fused with RRF."""
# 1. Vector search
vec_results = client.vector.search("chunks", query_embedding, k=k * 3)
# 2. BM25 text search
text_results = client.text.text_search("chunks", query_text, limit=k * 3)
# 3. Graph traversal from matching entities
entity_hits = client.vector.search("entities", query_embedding, k=5)
graph_results = []
for hit in entity_hits:
node_id = int(hit.fields.get("node_id", 0))
if node_id:
neighbors = client.graph.neighbors(graph_name, node_id, depth=2)
graph_results.extend(neighbors)
return rrf_fuse(
vec_keys=[r.key for r in vec_results],
text_keys=[h.id for h in text_results],
graph_keys=[str(n.node_id) for n in graph_results],
k=k,
)
def rrf_fuse(*key_lists, k=10, constant=60):
"""Reciprocal Rank Fusion: score = sum(1 / (constant + rank))."""
scores = {}
for keys in key_lists:
for rank, key in enumerate(keys):
scores[key] = scores.get(key, 0) + 1.0 / (constant + rank)
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:k]
Aggregation (FT.AGGREGATE)ΒΆ
from moondb.types import GroupBy, Count, Avg, SortBy, Limit
pipeline = [
GroupBy("@type", reducers=[Count(as_name="total")]),
SortBy("@total", order="DESC"),
Limit(0, 10),
]
result = client.text.aggregate("entities", "*", steps=pipeline)
Step 4: Temporal queries (point-in-time memory)ΒΆ
Record snapshots and query historical state:
# Record current state as a temporal snapshot
client.execute_command("TEMPORAL.SNAPSHOT_AT")
# ... time passes, data changes ...
# Query vectors at a historical point in time
results = client.vector.search(
"entities", query_embedding, k=5,
# Pass AS_OF via raw search for temporal resolution
)
# Graph at a point in time
result = client.graph.query(
"knowledge",
"MATCH (a:Person)-[:WORKS_AT]->(o) RETURN a.name, o.name",
# VALID_AT is appended by the server when temporal context is active
)
# Invalidate an entity (soft delete with temporal visibility)
client.execute_command("TEMPORAL.INVALIDATE", "42", "NODE", "knowledge")
Step 5: Multi-tenant isolation with workspacesΒΆ
Each tenant gets a workspace β complete data isolation, zero code changes:
# Create workspace per tenant
ws_id = client.execute_command("WS", "CREATE", "tenant-acme")
# New connection bound to workspace (all keys transparently prefixed)
tenant = MoonClient(host="localhost", port=6379)
tenant.execute_command("WS", "AUTH", ws_id)
# Same SDK methods, isolated data
tenant.set("user:1", "alice") # stored as {ws_hex}:user:1
tenant.vector.search("chunks", embedding, k=5) # only this tenant's data
tenant.graph.create("knowledge") # workspace-scoped graph
tenant.graph.add_node("knowledge", "Person", name="Alice")
Step 6: Background pipelines with message queuesΒΆ
Use MQ triggers for async processing (community detection, summarization):
# Create a durable queue for extraction events
client.execute_command("MQ", "CREATE", "memory:events", "MAXDELIVERY", "3")
# Register a debounced trigger (fires at most every 30s)
client.execute_command(
"MQ", "TRIGGER", "memory:events",
"PUBLISH memory:reindex new", "DEBOUNCE", "30000",
)
# Transactional event publishing β only fires on successful commit
client.execute_command("TXN", "BEGIN")
try:
client.set("en:bob", '{"name":"Bob","type":"Person"}')
client.hset("en:bob", mapping={
"vec": encode_vector(embedding), "name": "Bob", "type": "Person",
})
client.graph.add_node("knowledge", "Person", name="Bob")
# MQ message buffered β only delivered after TXN COMMIT
client.execute_command(
"MQ", "PUBLISH", "memory:events",
"action", "entity_added", "name", "Bob",
)
client.execute_command("TXN", "COMMIT")
except Exception:
client.execute_command("TXN", "ABORT")
raise
# Consumer: pop and acknowledge events
msgs = client.execute_command("MQ", "POP", "memory:events", "COUNT", "10")
for msg_id, fields in (msgs or []):
# process event...
client.execute_command("MQ", "ACK", "memory:events", msg_id)
# Monitor dead-letter queue depth
dlq_depth = client.execute_command("MQ", "DLQLEN", "memory:events")
Complete example: minimal memory engineΒΆ
"""Minimal AI memory engine on Moon β using moondb SDK."""
import json
from moondb import MoonClient, encode_vector
class MemoryEngine:
def __init__(self, host="localhost", port=6379):
self.client = MoonClient(host=host, port=port)
self._ensure_indexes()
def _ensure_indexes(self):
try:
self.client.vector.create_index(
"mem_chunks", prefix="ch:", dim=384, metric="COSINE",
extra_schema=["text", "TEXT"],
)
except Exception:
pass # index already exists
try:
self.client.graph.create("knowledge")
except Exception:
pass
def add(self, text: str, embedding: list[float], entities: list[dict] = None):
"""Atomic ingest: chunk + entities + graph in one transaction."""
import uuid
chunk_id = str(uuid.uuid4())[:8]
self.client.execute_command("TXN", "BEGIN")
try:
# Chunk with vector (auto-indexed into HNSW)
self.client.hset(f"ch:{chunk_id}", mapping={
"vec": encode_vector(embedding),
"text": text,
})
# Graph nodes for extracted entities
for ent in (entities or []):
self.client.graph.add_node(
"knowledge", ent.get("type", "Entity"),
name=ent["name"],
)
self.client.execute_command("TXN", "COMMIT")
except Exception:
self.client.execute_command("TXN", "ABORT")
raise
return chunk_id
def search(self, query_embedding: list[float], k: int = 5):
"""Vector similarity search over chunks."""
return self.client.vector.search("mem_chunks", query_embedding, k=k)
def text_search(self, query_text: str, k: int = 5):
"""BM25 text search over chunks."""
return self.client.text.text_search("mem_chunks", query_text, limit=k)
def graph_query(self, cypher: str):
"""Run a Cypher query against the knowledge graph."""
return self.client.graph.query("knowledge", cypher)
def snapshot(self):
"""Record a temporal snapshot for point-in-time queries."""
self.client.execute_command("TEMPORAL.SNAPSHOT_AT")
# Usage
engine = MemoryEngine()
engine.add(
"Alice joined Acme Corp as CTO in 2024",
embedding=[0.1] * 384,
entities=[
{"name": "Alice", "type": "Person"},
{"name": "Acme", "type": "Organization"},
],
)
# Vector search
results = engine.search(query_embedding=[0.1] * 384, k=5)
for r in results:
print(f"{r.key}: {r.score:.4f}")
# Text search
hits = engine.text_search("Alice CTO", k=5)
# Graph query
result = engine.graph_query(
"MATCH (p:Person) RETURN p.name LIMIT 10"
)