Python SDK¶
The moondb Python SDK extends redis-py with typed helpers for Moon's vector search, graph engine, full-text search, session-aware retrieval, and semantic caching. All standard Redis commands remain available.
Install¶
pip install moondb
# With framework integrations
pip install moondb[langchain] # LangChain vector store
pip install moondb[llamaindex] # LlamaIndex vector store
pip install moondb[all] # Everything
Client¶
MoonClient inherits from redis.Redis. All standard Redis commands work out of the box. Moon-specific features are accessed via typed properties.
from moondb import MoonClient, encode_vector
client = MoonClient(host="localhost", port=6379)
# Standard Redis
client.set("hello", "world")
client.get("hello") # b"world"
# Moon extensions via typed properties
client.vector # Vector search (FT.*)
client.graph # Graph engine (GRAPH.*)
client.text # Full-text search (FT.SEARCH BM25 / FT.AGGREGATE)
client.session # Session-aware search (deduplication)
client.cache # Semantic caching (FT.CACHESEARCH)
Vector search (client.vector)¶
# Create a vector index
client.vector.create_index(
"products",
prefix="product:",
dim=384,
metric="COSINE",
ef_construction=400,
ef_runtime=100,
compact_threshold=10000,
)
# Store documents (auto-indexed on HSET)
embedding = encode_vector([0.1, 0.2, 0.3, ...]) # 384-dim float32
client.hset("product:1", mapping={
"vec": embedding,
"title": "Wireless Headphones",
"price": "99.99",
})
# KNN search
results = client.vector.search("products", [0.1, 0.2, ...], k=5)
for r in results:
print(f"{r.key}: {r.score:.4f} - {r.fields.get('title')}")
# Search with graph expansion
results = client.vector.search(
"products", query_vec,
expand_graph="knowledge", expand_depth=3,
)
# Recommendations (centroid-based)
recs = client.vector.recommend(
"products",
positive_keys=["product:1", "product:5"],
negative_keys=["product:99"],
k=10,
)
# Multi-hop navigation
results = client.vector.navigate(
"knowledge", query_vec, hops=3, hop_penalty=0.05,
)
Graph engine (client.graph)¶
# Create and populate
client.graph.create("social")
alice = client.graph.add_node("social", "Person", name="Alice", age="30")
bob = client.graph.add_node("social", "Person", name="Bob", age="25")
client.graph.add_edge("social", alice, bob, "KNOWS", weight=0.9)
# Cypher queries
result = client.graph.query(
"social",
"MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name",
)
for row in result.rows:
print(row)
# Graph traversal
neighbors = client.graph.neighbors("social", alice, depth=2)
Full-text search (client.text)¶
from moondb.types import GroupBy, Count, Avg, SortBy, Limit
# Create index with text, tag, and numeric fields
client.text.create_text_index(
"issues",
[
("title", "TEXT", {"WEIGHT": 2.0}),
("body", "TEXT", {}),
("status", "TAG", {}),
("priority", "TAG", {"SORTABLE": True}),
],
prefix="issue:",
)
# BM25 search
hits = client.text.text_search("issues", "crash startup", limit=5)
for h in hits:
print(h.id, h.score, h.fields.get("title"))
# Aggregation pipeline
# Note: GroupBy/SortBy fields are bare names (no leading "@" — the SDK adds it).
# Reducer aliases use `alias=`, and the pipeline is passed positionally.
pipeline = [
GroupBy(["status"], reducers=[Count(alias="total"), Avg("priority", alias="avg_p")]),
SortBy("total", direction="DESC"),
Limit(0, 10),
]
result = client.text.aggregate("issues", "*", pipeline)
# Hybrid fusion (BM25 + dense + sparse)
hits = client.text.hybrid_search(
"docs", "machine learning",
vector=[0.1, 0.2, ...],
sparse=sparse_vec,
limit=10,
)
Session-aware search (client.session)¶
Automatically deduplicates results across multiple searches in the same session.
# First search returns fresh results
results = client.session.search("products", "session:user1", query_vec, k=5)
# Subsequent searches filter out previously seen results
more = client.session.search("products", "session:user1", query_vec, k=5)
# Session management
history = client.session.history("session:user1")
client.session.set_ttl("session:user1", 3600)
client.session.reset("session:user1")
Semantic caching (client.cache)¶
# Store a cache entry with TTL
client.cache.store(
"cache:qa:hash123", embedding,
response="The answer is 42",
model="gpt-4",
ttl=3600,
)
# Lookup with similarity fallback
result = client.cache.lookup("qa", "cache:qa:", embedding, threshold=0.1)
if result.cache_hit:
print("Cached:", result.results[0].fields["response"])
# Invalidation
client.cache.invalidate("cache:qa:hash123")
client.cache.invalidate_prefix("cache:qa:")
Async client¶
import asyncio
from moondb import AsyncMoonClient
async def main():
client = AsyncMoonClient(host="localhost", port=6379)
await client.vector.create_index("docs", dim=384, metric="COSINE")
results = await client.vector.search("docs", [0.1, 0.2, ...])
await client.graph.create("social")
node_id = await client.graph.add_node("social", "Person", name="Alice")
hits = await client.text.text_search("docs", "hello world")
await client.aclose()
asyncio.run(main())
LangChain integration¶
from langchain_openai import OpenAIEmbeddings
from moondb.integrations.langchain import MoonVectorStore
store = MoonVectorStore(
index_name="knowledge",
embedding=OpenAIEmbeddings(),
moon_url="redis://localhost:6379",
dim=1536,
metric="COSINE",
)
store.add_texts(
["Document one", "Document two"],
metadatas=[{"source": "web"}, {"source": "pdf"}],
)
docs = store.similarity_search("machine learning", k=5)
results = store.similarity_search_with_score("AI research", k=3)
LlamaIndex integration¶
from llama_index.core import VectorStoreIndex, StorageContext
from moondb.integrations.llamaindex import MoonVectorStore
vector_store = MoonVectorStore(
index_name="rag_docs",
moon_url="redis://localhost:6379",
dim=1536,
metric="COSINE",
expand_graph="knowledge", # Enable graph expansion
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)
query_engine = index.as_query_engine()
response = query_engine.query("What is machine learning?")
Cross-store transactions (raw commands)¶
The SDK does not yet have typed helpers for TXN, WS, MQ, or TEMPORAL commands. Use execute_command():
client.execute_command("TXN", "BEGIN")
client.set("user:1", "alice")
client.hset("doc:1", mapping={"title": "Hello", "vec": vector_bytes})
client.execute_command("TXN", "COMMIT")
# Workspaces
ws_id = client.execute_command("WS", "CREATE", "myapp")
client.execute_command("WS", "AUTH", ws_id)
# Message queues
client.execute_command("MQ", "CREATE", "orders", "MAXDELIVERY", "5")
client.execute_command("MQ", "PUSH", "orders", "action", "process")
# Temporal
client.execute_command("TEMPORAL.SNAPSHOT_AT")
Type hints¶
All public APIs have full type annotations. Enable strict mypy: