Create a test directory with sample files:
mkdir -p test_data- test_data/ai_facts.txt:
Artificial Intelligence (AI) was founded as an academic discipline in 1956.
John McCarthy coined the term "Artificial Intelligence".
Deep Learning is a subset of Machine Learning.
Geoffrey Hinton is known as the "Godfather of Deep Learning".
AlphaGo was developed by DeepMind.
- test_data/companies.csv:
Company,Founded,Founder,Industry
OpenAI,2015,Sam Altman,AI
DeepMind,2010,Demis Hassabis,AI
Tesla,2003,Elon Musk,Automotive
SpaceX,2002,Elon Musk,Aerospace- test_data/research.html:
<!DOCTYPE html>
<html>
<head><title>AI Research</title></head>
<body>
<h1>Recent AI Breakthroughs</h1>
<p>GPT-4 was released by OpenAI in March 2023.</p>
<p>It demonstrates significant improvements in reasoning capabilities.</p>
</body>
</html># Ingest text file
curl -X POST http://localhost:8000/api/v1/ingest/file \
-F "file=@test_data/ai_facts.txt" \
-F "source_type=TEXT"
# Ingest CSV
curl -X POST http://localhost:8000/api/v1/ingest/file \
-F "file=@test_data/companies.csv" \
-F "source_type=CSV"
# Ingest HTML from URL
curl -X POST http://localhost:8000/api/v1/ingest \
-H "Content-Type: application/json" \
-d '{
"source": "https://en.wikipedia.org/wiki/Machine_learning",
"source_type": "HTML",
"metadata": {"topic": "machine_learning"}
}'Watch the worker logs to see the pipeline in action:
docker-compose logs -f workerYou should see:
- Normalization task started
- Extraction task started
- Validation task started
- Fusion task started
- Embedding task started
Once processing completes, query the system:
# Question about founders
curl -X POST http://localhost:8000/api/v1/query \
-H "Content-Type: application/json" \
-d '{
"question": "Who founded OpenAI?",
"top_k_semantic": 5,
"graph_depth": 2,
"min_confidence": 0.7
}'
# Question about relationships
curl -X POST http://localhost:8000/api/v1/query \
-H "Content-Type: application/json" \
-d '{
"question": "What companies did Elon Musk found?",
"top_k_semantic": 5,
"graph_depth": 2
}'
# Question about concepts
curl -X POST http://localhost:8000/api/v1/query \
-H "Content-Type: application/json" \
-d '{
"question": "What is deep learning and who are the key researchers?",
"top_k_semantic": 10,
"graph_depth": 3
}'Open Neo4j Browser at http://localhost:7474 and run:
// View all entities
MATCH (n:Entity)
RETURN n.canonical_name, n.entity_type, n.aliases
LIMIT 25
// View founder relationships
MATCH (person:Entity {entity_type: 'Person'})-[r:founded_by]->(company:Entity)
RETURN person.canonical_name, company.canonical_name, r.confidence
// View the subgraph for a specific entity
MATCH path = (n:Entity {canonical_name: 'OpenAI'})-[*1..2]-(m)
RETURN path# Connect to MongoDB
docker exec -it graphbuilder-mongodb mongosh
# Switch to database
use graphbuilder
# Check raw documents
db.raw_documents.find().limit(5)
# Check validated triples
db.validated_triples.find({
"validation_result.confidence": {$gte: 0.9}
}).limit(10)
# Check human review queue
db.human_review_queue.find()import asyncio
from services.ingestion.service import IngestionService
async def test_ingestion():
service = IngestionService()
# Test URL ingestion
doc = await service.ingest_from_url(
url="https://example.com",
source_type="HTML",
)
print(f"Ingested: {doc.document_id}")
asyncio.run(test_ingestion())import asyncio
from services.normalization.service import NormalizationService
from shared.models.schemas import DocumentType
async def test_normalization():
service = NormalizationService()
# Assume we have a raw document ID
normalized = await service.normalize_document("doc_123")
print(f"Sections: {len(normalized.sections)}")
print(f"Tables: {len(normalized.tables)}")
asyncio.run(test_normalization())import asyncio
from services.extraction.service import ExtractionService
async def test_extraction():
service = ExtractionService()
text = "John McCarthy founded the field of AI in 1956."
triples = await service.extract_from_text(
text=text,
document_id="test_doc",
section="intro",
)
for triple in triples:
print(f"{triple.subject} -> {triple.predicate} -> {triple.object}")
asyncio.run(test_extraction())import asyncio
from services.query.service import QueryService
from shared.models.schemas import QueryRequest
async def test_query():
service = QueryService()
request = QueryRequest(
question="Who founded OpenAI?",
top_k_semantic=5,
graph_depth=2,
)
response = await service.answer_question(request)
print(f"Answer: {response.answer}")
print(f"Verification: {response.verification_status}")
print(f"Confidence: {response.confidence}")
asyncio.run(test_query())import asyncio
from agents.agents import ReverifyAgent
async def test_reverify():
agent = ReverifyAgent()
summary = await agent.run_cycle()
print(summary)
asyncio.run(test_reverify())First, create a conflict in Neo4j:
// Create entities
MERGE (openai:Entity {canonical_name: 'OpenAI', entity_type: 'Organization'})
MERGE (sam:Entity {canonical_name: 'Sam Altman', entity_type: 'Person'})
MERGE (elon:Entity {canonical_name: 'Elon Musk', entity_type: 'Person'})
// Create conflicting relationships
CREATE (openai)-[:founded_by {confidence: 0.8, version: 1}]->(sam)
CREATE (openai)-[:founded_by {confidence: 0.6, version: 1}]->(elon)Then run the agent:
import asyncio
from agents.agents import ConflictResolverAgent
async def test_conflict_resolution():
agent = ConflictResolverAgent()
summary = await agent.run_cycle()
print(summary)
asyncio.run(test_conflict_resolution())import asyncio
from agents.agents import SchemaSuggestorAgent
async def test_schema_suggestion():
agent = SchemaSuggestorAgent()
summary = await agent.run_cycle()
print(summary)
asyncio.run(test_schema_suggestion())Create locustfile.py:
from locust import HttpUser, task, between
class GraphBuilderUser(HttpUser):
wait_time = between(1, 3)
@task(3)
def query(self):
self.client.post("/api/v1/query", json={
"question": "What is AI?",
"top_k_semantic": 5,
"graph_depth": 2,
})
@task(1)
def stats(self):
self.client.get("/api/v1/stats")
@task(1)
def health(self):
self.client.get("/health")Run load test:
pip install locust
locust -f locustfile.py --host http://localhost:8000Open http://localhost:8089 and start the test.
import asyncio
import time
from services.extraction.service import ExtractionService
async def benchmark_extraction():
service = ExtractionService()
text = """
Artificial Intelligence was founded by John McCarthy in 1956.
Deep Learning was pioneered by Geoffrey Hinton.
OpenAI was founded by Sam Altman and Elon Musk.
""" * 10 # Repeat to simulate larger document
start = time.time()
triples = await service.extract_from_text(text, "bench_doc", "test")
duration = time.time() - start
print(f"Extracted {len(triples)} triples in {duration:.2f}s")
print(f"Throughput: {len(triples)/duration:.2f} triples/sec")
asyncio.run(benchmark_extraction())Create tests/test_integration.py:
import pytest
import asyncio
from services.ingestion.service import IngestionService
from services.normalization.service import NormalizationService
from services.extraction.service import ExtractionService
from services.validation.service import ValidationEngine
from services.fusion.service import FusionService
from services.query.service import QueryService
from shared.models.schemas import QueryRequest, DocumentType
@pytest.mark.asyncio
async def test_full_pipeline():
"""Test complete document processing pipeline."""
# 1. Ingest
ingestion = IngestionService()
raw_doc = await ingestion.ingest_from_url(
url="https://example.com",
source_type=DocumentType.HTML,
)
# 2. Normalize
normalization = NormalizationService()
normalized = await normalization.normalize_document(raw_doc.document_id)
# 3. Extract
extraction = ExtractionService()
candidates = await extraction.extract_triples(normalized.document_id)
# 4. Validate
validation = ValidationEngine()
validated = []
for candidate in candidates[:10]: # Test with subset
result = await validation.validate_triple(candidate)
if result:
validated.append(result)
# 5. Fuse
fusion = FusionService()
for triple in validated:
await fusion.fuse_triple(triple)
# 6. Query
query = QueryService()
response = await query.answer_question(
QueryRequest(question="Test query", top_k_semantic=5)
)
assert response.answer is not None
assert len(response.evidence_edges) > 0
@pytest.mark.asyncio
async def test_graphverify():
"""Test GraphVerify hallucination detection."""
query = QueryService()
# This should be supported
request = QueryRequest(
question="What is a fact present in the graph?",
top_k_semantic=5,
graph_depth=2,
)
response = await query.answer_question(request)
assert response.verification_status in ["SUPPORTED", "UNKNOWN"]Run tests:
pytest tests/test_integration.py -vUpdate .env:
DEBUG=true
LOG_LEVEL=DEBUG
Restart services:
docker-compose restartfrom workers.tasks import celery_app
# Get task info
task_id = "abc-123-def-456"
result = celery_app.AsyncResult(task_id)
print(result.state)
print(result.info)curl http://localhost:11434/api/generate -d '{
"model": "deepseek-r1:1.5b",
"prompt": "Extract entities: John founded OpenAI in 2015.",
"stream": false
}'# Container stats
docker stats
# MongoDB ops
docker exec -it graphbuilder-mongodb mongosh --eval "db.serverStatus().connections"
# Neo4j metrics
curl http://localhost:7474/db/system/tx/commit \
-H "Content-Type: application/json" \
-d '{"statements":[{"statement":"CALL dbms.listTransactions()"}]}'# Drop all collections in MongoDB
docker exec -it graphbuilder-mongodb mongosh graphbuilder --eval "db.dropDatabase()"
# Clear Neo4j
docker exec -it graphbuilder-neo4j cypher-shell -u neo4j -p password "MATCH (n) DETACH DELETE n"
# Reset FAISS index
rm -rf data/faiss/*
# Restart services
docker-compose restart# Purge all Celery tasks
docker exec -it graphbuilder-worker celery -A workers.tasks purge -f# Stop and remove all containers and volumes
docker-compose down -v
# Remove data directory
rm -rf data/Solution: Check worker is running and connected to Redis
docker-compose logs worker
docker exec -it graphbuilder-redis redis-cli pingSolution: Adjust temperature and prompts
EXTRACTION_TEMPERATURE=0.05 # More deterministic
EXTRACTION_MAX_TOKENS=4096 # Allow longer responses
Solution: Use IVF index for large datasets
FAISS_INDEX_TYPE=IndexIVFFlat
FAISS_NLIST=100
Solution: Adjust thresholds
GRAPHVERIFY_SUPPORT_THRESHOLD=0.9
GRAPHVERIFY_CONTRADICTION_THRESHOLD=0.6
-
Customize for your domain:
- Update ontology rules in
shared/config/settings.py - Add domain-specific prompts in
shared/prompts/templates.py - Configure validation constraints
- Update ontology rules in
-
Add authentication:
- Implement API key middleware
- Add user management
- Configure RBAC
-
Scale for production:
- Set up database replication
- Configure load balancing
- Enable monitoring and alerting
- Implement backup strategies
-
Enhance agent capabilities:
- Connect to Wikidata/DBpedia APIs
- Implement entity linking
- Add feedback loops for model improvement