Skip to content

Building a memory engineΒΆ

Moon's converged storage (KV, HNSW vectors, CSR graph, BM25 text) with cross-store ACID transactions makes it a natural substrate for AI memory engines. This guide shows how to wire the primitives together using the moondb Python SDK.

InstallΒΆ

pip install moondb

ArchitectureΒΆ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚               Your Application                  β”‚
β”‚   Ingest Pipeline β†’ Extractors β†’ Retrievers     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                  moondb SDK                     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    Moon Server                   β”‚
β”‚  KV β”‚ HNSW Vectors β”‚ CSR Graph β”‚ MQ β”‚ Temporal  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

One binary, one connection, one transaction β€” no cross-store drift.

Storage layoutΒΆ

What Moon primitive Key pattern
Raw episodes (docs, messages) KV ep:{id}
Chunks with embeddings KV + Vector index ch:{ep_id}:{idx}
Entities (people, concepts) KV + Vector + Graph node en:{id}
Relations (edges) Graph edge graph.add_edge()
Facts (embedded triples) KV + Vector fa:{id}
Community summaries KV + Vector co:{level}:{id}

Step 1: Create indexesΒΆ

from moondb import MoonClient, encode_vector

client = MoonClient(host="localhost", port=6379)

# Chunk embeddings (384-d cosine, auto-indexed on HSET)
client.vector.create_index(
    "chunks",
    prefix="ch:",
    dim=384,
    metric="COSINE",
    extra_schema=["text", "TEXT"],
)

# Entity embeddings with text + tag fields
client.vector.create_index(
    "entities",
    prefix="en:",
    dim=384,
    metric="COSINE",
    extra_schema=["name", "TEXT", "type", "TAG"],
)

# Knowledge graph
client.graph.create("knowledge")

Step 2: Atomic ingest with transactionsΒΆ

The key advantage β€” entity, embedding, graph edge, and KV metadata land in a single atomic write. If any step fails, everything rolls back β€” no orphaned vectors or dangling graph edges.

import json

def ingest_entity(client, name, entity_type, description, embedding, relations=None):
    """Atomic upsert: KV + vector + graph in one transaction."""
    eid = f"en:{name.lower().replace(' ', '_')}"

    client.execute_command("TXN", "BEGIN")
    try:
        # KV metadata
        client.set(eid, json.dumps({
            "name": name, "type": entity_type, "desc": description,
        }))

        # Vector (auto-indexed into HNSW on commit)
        client.hset(eid, mapping={
            "vec": encode_vector(embedding),
            "name": name,
            "type": entity_type,
        })

        # Graph node
        node_id = client.graph.add_node(
            "knowledge", entity_type, name=name,
        )

        # Graph edges
        for rel in (relations or []):
            client.graph.add_edge(
                "knowledge", rel["from_id"], rel["to_id"],
                rel["type"], weight=rel.get("weight"),
            )

        client.execute_command("TXN", "COMMIT")
        return node_id
    except Exception:
        client.execute_command("TXN", "ABORT")
        raise

Step 3: Hybrid retrievalΒΆ

Combine vector similarity, graph traversal, and BM25 text search using SDK methods.

Vector search (semantic)ΒΆ

results = client.vector.search("chunks", query_embedding, k=10)
for r in results:
    print(f"{r.key}: {r.score:.4f} β€” {r.fields.get('text', '')[:80]}")

Graph traversal (relational)ΒΆ

# Find 2-hop neighbors of a node
neighbors = client.graph.neighbors("knowledge", node_id=1, depth=2)
for n in neighbors:
    print(n.node_id, n.label, n.properties)

# Cypher query
result = client.graph.query(
    "knowledge",
    "MATCH (a:Person)-[:WORKS_AT]->(o:Organization) "
    "RETURN a.name, o.name LIMIT 20",
)
for row in result.rows:
    print(row)
hits = client.text.text_search("chunks", "machine learning", limit=10)
for h in hits:
    print(f"{h.id}: {h.score:.4f} β€” {h.fields.get('text', '')[:80]}")

Three-way hybrid fusion (vector + BM25 + graph)ΒΆ

def hybrid_retrieve(client, query_text, query_embedding, graph_name="knowledge", k=10):
    """Three-way retrieval: vector + BM25 + graph, fused with RRF."""
    # 1. Vector search
    vec_results = client.vector.search("chunks", query_embedding, k=k * 3)

    # 2. BM25 text search
    text_results = client.text.text_search("chunks", query_text, limit=k * 3)

    # 3. Graph traversal from matching entities
    entity_hits = client.vector.search("entities", query_embedding, k=5)
    graph_results = []
    for hit in entity_hits:
        node_id = int(hit.fields.get("node_id", 0))
        if node_id:
            neighbors = client.graph.neighbors(graph_name, node_id, depth=2)
            graph_results.extend(neighbors)

    return rrf_fuse(
        vec_keys=[r.key for r in vec_results],
        text_keys=[h.id for h in text_results],
        graph_keys=[str(n.node_id) for n in graph_results],
        k=k,
    )

def rrf_fuse(*key_lists, k=10, constant=60):
    """Reciprocal Rank Fusion: score = sum(1 / (constant + rank))."""
    scores = {}
    for keys in key_lists:
        for rank, key in enumerate(keys):
            scores[key] = scores.get(key, 0) + 1.0 / (constant + rank)
    ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return ranked[:k]

Aggregation (FT.AGGREGATE)ΒΆ

from moondb.types import GroupBy, Count, Avg, SortBy, Limit

pipeline = [
    GroupBy("@type", reducers=[Count(as_name="total")]),
    SortBy("@total", order="DESC"),
    Limit(0, 10),
]
result = client.text.aggregate("entities", "*", steps=pipeline)

Step 4: Temporal queries (point-in-time memory)ΒΆ

Record snapshots and query historical state:

# Record current state as a temporal snapshot
client.execute_command("TEMPORAL.SNAPSHOT_AT")

# ... time passes, data changes ...

# Query vectors at a historical point in time
results = client.vector.search(
    "entities", query_embedding, k=5,
    # Pass AS_OF via raw search for temporal resolution
)

# Graph at a point in time
result = client.graph.query(
    "knowledge",
    "MATCH (a:Person)-[:WORKS_AT]->(o) RETURN a.name, o.name",
    # VALID_AT is appended by the server when temporal context is active
)

# Invalidate an entity (soft delete with temporal visibility)
client.execute_command("TEMPORAL.INVALIDATE", "42", "NODE", "knowledge")

Step 5: Multi-tenant isolation with workspacesΒΆ

Each tenant gets a workspace β€” complete data isolation, zero code changes:

# Create workspace per tenant
ws_id = client.execute_command("WS", "CREATE", "tenant-acme")

# New connection bound to workspace (all keys transparently prefixed)
tenant = MoonClient(host="localhost", port=6379)
tenant.execute_command("WS", "AUTH", ws_id)

# Same SDK methods, isolated data
tenant.set("user:1", "alice")  # stored as {ws_hex}:user:1
tenant.vector.search("chunks", embedding, k=5)  # only this tenant's data
tenant.graph.create("knowledge")  # workspace-scoped graph
tenant.graph.add_node("knowledge", "Person", name="Alice")

Step 6: Background pipelines with message queuesΒΆ

Use MQ triggers for async processing (community detection, summarization):

# Create a durable queue for extraction events
client.execute_command("MQ", "CREATE", "memory:events", "MAXDELIVERY", "3")

# Register a debounced trigger (fires at most every 30s)
client.execute_command(
    "MQ", "TRIGGER", "memory:events",
    "PUBLISH memory:reindex new", "DEBOUNCE", "30000",
)

# Transactional event publishing β€” only fires on successful commit
client.execute_command("TXN", "BEGIN")
try:
    client.set("en:bob", '{"name":"Bob","type":"Person"}')
    client.hset("en:bob", mapping={
        "vec": encode_vector(embedding), "name": "Bob", "type": "Person",
    })
    client.graph.add_node("knowledge", "Person", name="Bob")

    # MQ message buffered β€” only delivered after TXN COMMIT
    client.execute_command(
        "MQ", "PUBLISH", "memory:events",
        "action", "entity_added", "name", "Bob",
    )
    client.execute_command("TXN", "COMMIT")
except Exception:
    client.execute_command("TXN", "ABORT")
    raise

# Consumer: pop and acknowledge events
msgs = client.execute_command("MQ", "POP", "memory:events", "COUNT", "10")
for msg_id, fields in (msgs or []):
    # process event...
    client.execute_command("MQ", "ACK", "memory:events", msg_id)

# Monitor dead-letter queue depth
dlq_depth = client.execute_command("MQ", "DLQLEN", "memory:events")

Complete example: minimal memory engineΒΆ

"""Minimal AI memory engine on Moon β€” using moondb SDK."""
import json
from moondb import MoonClient, encode_vector

class MemoryEngine:
    def __init__(self, host="localhost", port=6379):
        self.client = MoonClient(host=host, port=port)
        self._ensure_indexes()

    def _ensure_indexes(self):
        try:
            self.client.vector.create_index(
                "mem_chunks", prefix="ch:", dim=384, metric="COSINE",
                extra_schema=["text", "TEXT"],
            )
        except Exception:
            pass  # index already exists
        try:
            self.client.graph.create("knowledge")
        except Exception:
            pass

    def add(self, text: str, embedding: list[float], entities: list[dict] = None):
        """Atomic ingest: chunk + entities + graph in one transaction."""
        import uuid
        chunk_id = str(uuid.uuid4())[:8]

        self.client.execute_command("TXN", "BEGIN")
        try:
            # Chunk with vector (auto-indexed into HNSW)
            self.client.hset(f"ch:{chunk_id}", mapping={
                "vec": encode_vector(embedding),
                "text": text,
            })
            # Graph nodes for extracted entities
            for ent in (entities or []):
                self.client.graph.add_node(
                    "knowledge", ent.get("type", "Entity"),
                    name=ent["name"],
                )
            self.client.execute_command("TXN", "COMMIT")
        except Exception:
            self.client.execute_command("TXN", "ABORT")
            raise
        return chunk_id

    def search(self, query_embedding: list[float], k: int = 5):
        """Vector similarity search over chunks."""
        return self.client.vector.search("mem_chunks", query_embedding, k=k)

    def text_search(self, query_text: str, k: int = 5):
        """BM25 text search over chunks."""
        return self.client.text.text_search("mem_chunks", query_text, limit=k)

    def graph_query(self, cypher: str):
        """Run a Cypher query against the knowledge graph."""
        return self.client.graph.query("knowledge", cypher)

    def snapshot(self):
        """Record a temporal snapshot for point-in-time queries."""
        self.client.execute_command("TEMPORAL.SNAPSHOT_AT")

# Usage
engine = MemoryEngine()

engine.add(
    "Alice joined Acme Corp as CTO in 2024",
    embedding=[0.1] * 384,
    entities=[
        {"name": "Alice", "type": "Person"},
        {"name": "Acme", "type": "Organization"},
    ],
)

# Vector search
results = engine.search(query_embedding=[0.1] * 384, k=5)
for r in results:
    print(f"{r.key}: {r.score:.4f}")

# Text search
hits = engine.text_search("Alice CTO", k=5)

# Graph query
result = engine.graph_query(
    "MATCH (p:Person) RETURN p.name LIMIT 10"
)