A Technical Framework for Production AI Systems
Data Pipelines on ACID™ – NPI Labs turns existing data warehouses and APIs into AI-ready semantic endpoints. By wrapping validation, enrichment, and intelligent generation around your data sources, ACID lets LLMs ask and answer business questions reliably without rearchitecting your stack. Start with a single high-value endpoint (e.g., get_at_risk_customers) and get measurable insights in hours, with enterprise-grade observability, conditional human review, and deployment models that keep data inside your cloud.
Most organisations have data infrastructure designed for human analysts, not AI systems. Traditional ETL pipelines feed warehouses optimised for SQL queries and BI dashboards. When you ask an LLM to answer a business question using this infrastructure, it fails.
The LLM does not natively know your database schema. It cannot interpret cryptic table names like dim_user_attributes_v3 or ltv_pred_90d, nor can it join tables across systems or apply business logic hidden in dbt models, Airflow DAGs, or undocumented tribal knowledge.
The real problem: Data is machine-readable but not AI-interpretable. There is no semantic layer that translates a natural question into the complex joins, filters, validations, and transformations needed to answer it correctly.
Text-to-SQL assumes warehouse schemas map neatly to business concepts. In practice, answering a single question may require five or more joins, conditional logic, and context that exists only in documentation or engineers' heads.
RAG (Retrieval-Augmented Generation) helps with document search, but RAG alone cannot handle multi-table joins, aggregations, real-time structured queries, or data validation at scale.
We build protocol layers between LLMs and data infrastructure—providing semantic context, intelligent routing, validation, enrichment, and structured intelligence delivery. This is what we call Data Pipelines on ACID.
For modern AI infrastructure, we define it as:
Self-discovering, self-documenting data access. No manual configuration per query.
Data arrives with relationships, metadata, and business logic—not flat tables.
Built-in validation, enrichment, and generation. Your pipeline thinks, not just passes data.
Routes adapt to agent reasoning patterns. No hardcoded workflows that break when logic changes.
Our infrastructure consists of four core pipeline stages transforming data access for AI systems:
Production-ready architecture from data sources to client interfaces
// AI-Native Data Pipeline Architecture LLM Layer (Claude, GPT-4, Llama, etc.) │ │ Natural language queries ▼ Source & Router • Query planning & decomposition • Multi-source orchestration • Intent classification │ ▼ Validation Layer • Schema validation • Business rule enforcement • Data quality checks │ ▼ Enrichment Layer • Feature engineering • Entity resolution • Context augmentation │ ▼ Generation Layer • Structured intelligence delivery • Content generation • Response formatting │ ▼ Data Infrastructure (Warehouses, APIs, Streams)
This is not a replacement for existing systems but an abstraction layer that makes data AI-accessible. Your warehouses, APIs, and databases remain unchanged—we add the intelligence layer on top.
MCP is an open standard developed by Anthropic for connecting AI systems to data sources. MCP servers expose semantic endpoints that LLMs can discover and use—moving beyond raw database queries to intelligent, validated operations.
{
"name": "customer-intelligence",
"version": "1.0",
"description": "Semantic layer for customer behaviour analysis",
"endpoints": [
{
"name": "get_at_risk_customers",
"description": "Returns customers at risk of churn with intervention recommendations",
"parameters": {
"region": {
"type": "string",
"enum": ["UK", "US", "EU", "APAC"],
"required": true
},
"risk_threshold": {
"type": "number",
"default": 0.7
}
},
"returns": {
"type": "array",
"items": {
"customer_id": {"type": "string"},
"risk_probability": {"type": "number"},
"lifetime_value": {"type": "number"},
"recommended_intervention": {"type": "string"}
}
}
}
]
}Notice how get_at_risk_customers(region, threshold) encodes domain knowledge—not just SELECT * FROM customers. The endpoint validates inputs, enriches data with calculated risk scores, and returns actionable intelligence.
Complex questions require multi-step reasoning, validation at each stage, error handling, and refinement. We build custom stateful, conditional agent workflows using SQS and EventBridge that compose multiple operations with intelligent routing based on confidence scores and data quality.
// User query: "Which high-value customers are at risk in the UK?"
Step 1: Source & Route
└─ Call get_at_risk_customers(region="UK", threshold=0.7)
Step 2: Validation
└─ Filter for customers with >£10k lifetime value
└─ Validate behavioural data completeness
Step 3: Enrichment
└─ Call get_engagement_history(customer_id)
└─ Calculate intervention ROI scores
└─ Rank by value × risk × intervention likelihood
Step 4: Generation
└─ Generate personalised intervention strategies
└─ Route to appropriate retention workflow
└─ Format for campaign system integrationProduction AI systems require sophisticated routing between agents based on confidence scores, data quality, and business rules. We build intelligent message queues using AWS SQS and EventBridge that route operations dynamically through validation, enrichment, and generation agents.
Live visualization: Agent workflows routing through validation (green), enrichment (orange), and generation (pink) stages
// Event-driven agent orchestration with intelligent routing API Request │ ▼ Router Agent • Classify intent • Route to appropriate queue │ ├─────────────┬─────────────┬─────────────┐ ▼ ▼ ▼ ▼ Validation Enrichment Generation Human Review Queue Queue Queue Queue (SQS) (SQS) (SQS) (SQS) │ │ │ │ ▼ ▼ ▼ ▼ Validation Enrichment Generation Human Agent Agent Agent Reviewer │ │ │ │ └─────────────┴─────────────┴─────────────┘ │ ▼ EventBridge │ (Routes based on metadata) │ ├─────────────┼─────────────┐ ▼ ▼ ▼ Next Agent Retry Queue Dead Letter
When generating customer communications, confidence scores determine whether content proceeds automatically or requires human review.
// Generation agent output with routing metadata
{
"operation_id": "gen_8f7a2b",
"content": {
"subject": "We've noticed you haven't been active lately",
"body": "Based on your viewing history...",
"generated_by": "claude-sonnet-4"
},
"confidence_score": 0.68,
"quality_checks": {
"tone_appropriate": true,
"factual_accuracy": true,
"brand_compliance": true
},
"routing": {
"next_agent": "human_review_queue",
"reason": "confidence_below_threshold",
"threshold": 0.75,
"priority": "medium",
"sqs_queue_url": "https://sqs.eu-west-2.amazonaws.com/.../human-review"
}
}
// EventBridge rule for conditional routing
{
"source": ["npi.generation.completed"],
"detail": {
"confidence_score": [
{"numeric": ["<", 0.75]}
]
},
"target": "human_review_queue"
}
// High confidence → automatic publishing
{
"detail": {
"confidence_score": [
{"numeric": [">=", 0.75]}
]
},
"target": "publishing_queue"
}Content generation often requires multiple enrichment passes. The router determines the enrichment sequence based on data completeness.
// Initial validation agent output
{
"entity_id": "artist_9821",
"validation_status": "passed",
"data_completeness": {
"basic_metadata": true,
"social_metrics": false,
"genre_classification": false,
"biographical_text": false
},
"routing": {
"next_agents": [
{
"agent": "enrichment_social_metrics",
"priority": 1,
"sqs_queue": "enrichment_social_queue",
"estimated_duration": "2s"
},
{
"agent": "enrichment_genre_classification",
"priority": 2,
"sqs_queue": "enrichment_ml_queue",
"estimated_duration": "5s"
},
{
"agent": "generation_biography",
"priority": 3,
"depends_on": ["enrichment_social_metrics", "enrichment_genre_classification"],
"sqs_queue": "generation_queue"
}
],
"execution_mode": "sequential"
}
}
// After social enrichment completes
{
"entity_id": "artist_9821",
"enrichment_completed": ["social_metrics"],
"data_completeness": {
"social_metrics": true,
"genre_classification": false
},
"routing": {
"next_agent": "enrichment_genre_classification",
"sqs_queue": "enrichment_ml_queue",
"carry_forward_metadata": true
}
}
// After all enrichments → generation
{
"entity_id": "artist_9821",
"enrichment_completed": ["social_metrics", "genre_classification"],
"data_completeness": {
"social_metrics": true,
"genre_classification": true
},
"routing": {
"next_agent": "generation_biography",
"sqs_queue": "generation_queue",
"all_dependencies_met": true
}
}Intelligent routing includes sophisticated error recovery with exponential backoff and dead letter queues.
// Enrichment agent failure with retry routing
{
"operation_id": "enrich_4f2b9a",
"entity_id": "product_7721",
"status": "failed",
"error": {
"type": "external_api_timeout",
"message": "Social metrics API timeout after 5s",
"retryable": true
},
"retry_metadata": {
"attempt": 2,
"max_attempts": 3,
"backoff_seconds": 60
},
"routing": {
"next_agent": "enrichment_social_metrics",
"sqs_queue": "enrichment_retry_queue",
"delay_seconds": 60,
"fallback_on_final_failure": "enrichment_social_metrics_cached"
}
}
// After max retries → dead letter queue with human escalation
{
"operation_id": "enrich_4f2b9a",
"status": "failed_permanent",
"retry_metadata": {
"attempt": 3,
"max_attempts": 3,
"all_retries_exhausted": true
},
"routing": {
"next_agent": "dead_letter_handler",
"sqs_queue": "dead_letter_queue",
"escalation": {
"notify_ops_team": true,
"priority": "high",
"slack_channel": "#ai-ops-alerts"
}
}
}Our infrastructure processes complex, multi-source data across validation, enrichment, and generation pipelines. Built for continuous operation with:
Traditional debugging tools don't fit AI systems. We implement structured logging and tracing with OpenTelemetry to follow each operation, LLM decision, and outcome across the pipeline.
// Example trace for customer analysis workflow
Trace: customer_retention_analysis (2.3s)
├── Span: query_planning (120ms)
│ └── Result: 3 operations identified
├── Span: get_at_risk_customers (450ms)
│ ├── Tag: region=UK
│ └── Result: 847 customers
├── Span: enrich_customer_data (980ms)
│ ├── Span: fetch_engagement_history (340ms)
│ ├── Span: calculate_intervention_roi (420ms)
│ └── Span: rank_by_priority (220ms)
└── Span: generate_recommendations (750ms)
└── Result: 847 personalised strategiesInclude span taxonomies (plan → validate → enrich → generate) and redact sensitive data at the trace level. This enables debugging without exposing customer PII or proprietary logic.
Building AI-native data infrastructure requires disciplined execution. Below is a proven roadmap from discovery to production deployment over six weeks.
Objective: Identify the highest-value use case and map existing data infrastructure.
Objective: Design MCP endpoints that model business logic, not raw database tables.
Objective: Build and deploy your first pipeline with production-grade validation and enrichment.
Objective: Build workflows that compose multiple pipeline operations to answer complex queries.
Objective: Instrument your system and deploy to internal users.
Resist the urge to build a complete semantic layer upfront. Start with one endpoint solving one problem. Validate value, then expand incrementally. A working get_at_risk_customers() endpoint is worth more than a comprehensive schema that never ships.
Don't expose raw database queries. Endpoints like get_trending_products(category, timeframe) should encapsulate business logic, validation rules, and calculated metrics.
AI tooling evolves rapidly. Protocol-based design ensures your infrastructure remains relevant as models and frameworks change. Version your endpoints (e.g., v1/get_trending_products) to avoid breaking existing workflows.
You cannot debug what you cannot see. Structured tracing is not optional—it's the foundation of reliable AI systems.
AI-driven data infrastructure must observe enterprise-grade controls. Security cannot be an afterthought when LLMs query sensitive data.
The gap between "we have data" and "AI can use our data" remains large for most organisations. Data Pipelines on ACID introduces a protocol layer that bridges this divide through semantic access, intelligent validation and enrichment, structured reasoning, and production-grade observability.
The path forward is clear: build incrementally, start with one endpoint solving one problem, measure results, and expand with confidence. Six weeks from discovery to production is achievable with disciplined execution.
The organisations that master this architecture will unlock AI capabilities impossible with traditional data infrastructure—not by replacing their systems, but by making them AI-interpretable.
From finance to e-commerce and entertainment, we've deployed AI-native infrastructure processing billions of operations. If we can handle the complexity of multi-source entity resolution and continuous data enrichment at scale, we can handle your domain.