dataflow
"Kailash DataFlow - zero-config database framework with automatic model-to-node generation. Use when asking about 'database operations', 'DataFlow', 'database models', 'CRUD operations', 'bulk operations', 'database queries', 'database migrations', 'multi-tenancy', 'multi-instance', 'database transactions', 'PostgreSQL', 'MySQL', 'SQLite', 'MongoDB', 'pgvector', 'vector search', 'document database', 'RAG', 'semantic search', 'existing database', 'database performance', 'database deployment', 'database testing', or 'TDD with databases'. DataFlow is NOT an ORM - it generates 11 workflow nodes per SQL model, 8 nodes for MongoDB, and 3 nodes for vector operations."
About dataflow
dataflow is a Claude AI skill developed by Integrum-Global. "Kailash DataFlow - zero-config database framework with automatic model-to-node generation. Use when asking about 'database operations', 'DataFlow', 'database models', 'CRUD operations', 'bulk operations', 'database queries', 'database migrations', 'multi-tenancy', 'multi-instance', 'database transactions', 'PostgreSQL', 'MySQL', 'SQLite', 'MongoDB', 'pgvector', 'vector search', 'document database', 'RAG', 'semantic search', 'existing database', 'database performance', 'database deployment', 'database testing', or 'TDD with databases'. DataFlow is NOT an ORM - it generates 11 workflow nodes per SQL model, 8 nodes for MongoDB, and 3 nodes for vector operations." This powerful Claude Code plugin helps developers automate workflows and enhance productivity with intelligent AI assistance.
Why use dataflow? With 0 stars on GitHub, this skill has been trusted by developers worldwide. Install this Claude skill instantly to enhance your development workflow with AI-powered automation.
| name | dataflow |
| description | Kailash DataFlow - zero-config database framework with automatic model-to-node generation. Use when asking about 'database operations', 'DataFlow', 'database models', 'CRUD operations', 'bulk operations', 'database queries', 'database migrations', 'multi-tenancy', 'multi-instance', 'database transactions', 'PostgreSQL', 'MySQL', 'SQLite', 'MongoDB', 'pgvector', 'vector search', 'document database', 'RAG', 'semantic search', 'existing database', 'database performance', 'database deployment', 'database testing', or 'TDD with databases'. DataFlow is NOT an ORM - it generates 11 workflow nodes per SQL model, 8 nodes for MongoDB, and 3 nodes for vector operations. |
Kailash DataFlow - Zero-Config Database Framework
DataFlow is a zero-config database framework built on Kailash Core SDK that automatically generates workflow nodes from database models.
Overview
DataFlow transforms database models into workflow nodes automatically, providing:
- Automatic Node Generation: 11 nodes per model (@db.model decorator)
- Multi-Database Support: PostgreSQL, MySQL, SQLite (SQL) + MongoDB (Document) + pgvector (Vector Search)
- Enterprise Features: Multi-tenancy, multi-instance isolation, transactions
- Zero Configuration: String IDs preserved, deferred schema operations
- Integration Ready: Works with Nexus for multi-channel deployment
- Specialized Adapters: SQL (11 nodes/model), Document (8 nodes), Vector (3 nodes) L
🛠️ Developer Experience Tools
Enhanced Error System
DataFlow provides comprehensive error enhancement across all database operations, strict mode validation for build-time error prevention, and an intelligent debug agent for automated error diagnosis.
Error Enhancement
What It Is: Automatic transformation of Python exceptions into rich, actionable error messages with context, root causes, and solutions.
All DataFlow errors include:
- Error codes: DF-XXX format (DataFlow) or KS-XXX (Core SDK)
- Context: Node, parameters, workflow state
- Root causes: Why the error occurred (3-5 possibilities with probability scores)
- Solutions: How to fix it (with code examples)
Example:
# Missing parameter error shows: # - Error Code: DF-101 # - Missing parameter: "id" # - 3 solutions with code examples # - Link to documentation workflow.add_node("UserCreateNode", "create", { "name": "Alice" # Missing "id" - error enhanced automatically })
Error Categories:
- DF-1XX: Parameter errors (missing, type mismatch, validation)
- DF-2XX: Connection errors (missing, circular, type mismatch)
- DF-3XX: Migration errors (schema, constraints)
- DF-4XX: Configuration errors (database URL, auth)
- DF-5XX: Runtime errors (timeouts, resources)
Architecture:
# BaseErrorEnhancer - Shared abstraction # ├─ CoreErrorEnhancer - KS-501 to KS-508 (Core SDK) # └─ DataFlowErrorEnhancer - DF-XXX codes (DataFlow)
Strict Mode Validation
What It Is: Build-time validation system with 4 layers to catch errors before workflow execution.
Validation Layers:
- Model Validation - Primary keys, auto-fields, reserved fields, field types
- Parameter Validation - Required parameters, types, values, CreateNode structure
- Connection Validation - Source/target nodes, type compatibility, dot notation
- Workflow Validation - Structure, circular dependencies
Configuration:
from dataflow import DataFlow from dataflow.validation.strict_mode import StrictModeConfig config = StrictModeConfig( enabled=True, validate_models=True, validate_parameters=True, validate_connections=True, validate_workflows=True, fail_fast=True, # Stop on first error verbose=False # Minimal output ) db = DataFlow("postgresql://...", strict_mode_config=config)
When to Use:
- ✅ Development: Catch errors early
- ✅ CI/CD: Validate workflows before deployment
- ✅ Production: Prevent invalid workflow execution
Documentation:
- HOW-TO Guide:
dataflow-strict-mode - Architecture Guide:
dataflow-validation-layers
Debug Agent
What It Is: Intelligent error analysis system that automatically diagnoses errors and provides ranked, actionable solutions.
5-Stage Pipeline:
- Capture - Stack traces, context, error chains
- Categorize - 50+ patterns across 5 categories (PARAMETER, CONNECTION, MIGRATION, RUNTIME, CONFIGURATION)
- Analyze - Inspector integration for workflow analysis
- Suggest - 60+ solution templates with relevance scoring
- Format - CLI (color-coded), JSON (machine-readable), dict (programmatic)
Usage:
from dataflow.debug.debug_agent import DebugAgent from dataflow.debug.knowledge_base import KnowledgeBase from dataflow.platform.inspector import Inspector # Initialize once (singleton pattern) kb = KnowledgeBase("patterns.yaml", "solutions.yaml") inspector = Inspector(db) debug_agent = DebugAgent(kb, inspector) # Debug errors automatically try: runtime.execute(workflow.build()) except Exception as e: report = debug_agent.debug(e, max_solutions=5, min_relevance=0.3) print(report.to_cli_format()) # Rich terminal output
Output Formats:
# CLI format (color-coded, ANSI) print(report.to_cli_format()) # JSON format (machine-readable) json_output = report.to_json() # Dictionary format (programmatic) data = report.to_dict()
Performance: 5-50ms per error, 92%+ confidence for known patterns
Documentation:
- Skill Guide:
dataflow-debug-agent - User Guide:
docs/guides/debug-agent-user-guide.md - Developer Guide:
docs/guides/debug-agent-developer-guide.md
Build-Time Validation: Catch Errors Early
Validation Modes: OFF, WARN (default), STRICT
Catch 80% of configuration errors at model registration time (not runtime):
from dataflow import DataFlow db = DataFlow("postgresql://...") # Default: Warn mode (backward compatible) @db.model class User: id: int # Validates: primary key named 'id' name: str email: str # Strict mode: Raises errors on validation failures @db.model(strict=True) class Product: id: int name: str price: float # Skip validation (advanced users) @db.model(skip_validation=True) class Advanced: custom_pk: int # Custom primary key allowed
Validation Checks:
- VAL-002: Missing primary key (error)
- VAL-003: Primary key not named 'id' (warning)
- VAL-004: Composite primary key (warning)
- VAL-005: Auto-managed field conflicts (created_at, updated_at)
- VAL-006: DateTime without timezone
- VAL-007: String/Text without length
- VAL-008: camelCase field names (should be snake_case)
- VAL-009: SQL reserved words as field names
- VAL-010: Missing delete cascade in relationships
When to Use Each Mode:
- OFF: Legacy code migration, custom implementations
- WARN (default): Development, catches issues without blocking
- STRICT: Production deployments, enforce standards
ErrorEnhancer: Actionable Error Messages
Automatic error enhancement with context, root causes, and solutions:
from dataflow import DataFlow from dataflow.core.error_enhancer import ErrorEnhancer db = DataFlow("postgresql://...") # ErrorEnhancer automatically integrated into DataFlow engine # Enhanced errors show: # - Error code (DF-101, DF-102, etc.) # - Context (node, parameters, workflow state) # - Root causes with probability scores # - Actionable solutions with code templates # - Documentation links try: # Missing parameter error workflow.add_node("UserCreateNode", "create", {}) except Exception as e: # ErrorEnhancer automatically catches and enriches # Shows: DF-101 with specific fixes pass
Key Features:
- 40+ Error Codes: DF-101 (missing parameter) through DF-805 (runtime errors)
- Pattern Matching: Automatic error detection and classification
- Contextual Solutions: Code templates with variable substitution
- Color-Coded Output: Emojis and formatting for readability
- Documentation Links: Direct links to relevant guides
Common Errors Covered:
- DF-101: Missing required parameter
- DF-102: Type mismatch (expected dict, got str)
- DF-103: Auto-managed field conflict (created_at, updated_at)
- DF-104: Wrong node pattern (CreateNode vs UpdateNode)
- DF-105: Primary key 'id' missing/wrong name
- DF-201: Invalid connection - source output not found
- DF-301: Migration failed - table already exists
See: sdk-users/apps/dataflow/troubleshooting/top-10-errors.md
Inspector API: Self-Service Debugging
Introspection API for workflows, nodes, connections, and parameters:
from dataflow.platform.inspector import Inspector inspector = Inspector(dataflow_instance) inspector.workflow_obj = workflow.build() # Connection Analysis connections = inspector.connections() # List all connections broken = inspector.find_broken_connections() # Find issues validation = inspector.validate_connections() # Check validity # Parameter Tracing trace = inspector.trace_parameter("create_user", "data") print(f"Source: {trace.source_node}") dependencies = inspector.parameter_dependencies("create_user") # Node Analysis deps = inspector.node_dependencies("create_user") # Upstream dependents = inspector.node_dependents("create_user") # Downstream order = inspector.execution_order() # Topological sort # Workflow Validation report = inspector.workflow_validation_report() if not report['is_valid']: print(f"Errors: {report['errors']}") print(f"Warnings: {report['warnings']}") print(f"Suggestions: {report['suggestions']}") # High-Level Overview summary = inspector.workflow_summary() metrics = inspector.workflow_metrics()
Inspector Methods (18 total):
- Connection Analysis (5): connections(), connection_chain(), connection_graph(), validate_connections(), find_broken_connections()
- Parameter Tracing (5): trace_parameter(), parameter_flow(), find_parameter_source(), parameter_dependencies(), parameter_consumers()
- Node Analysis (5): node_dependencies(), node_dependents(), execution_order(), node_schema(), compare_nodes()
- Workflow Analysis (3): workflow_summary(), workflow_metrics(), workflow_validation_report()
Use Cases:
- Diagnose "missing parameter" errors
- Find broken connections
- Trace parameter flow through workflows
- Validate workflows before execution
- Generate workflow documentation
- Debug complex workflows
Performance: <1ms per method call (cached operations)
CLI Tools: Industry-Standard Workflow Validation
Command-line tools matching pytest/mypy patterns for workflow validation and debugging:
# Validate workflow structure and connections dataflow-validate workflow.py --output text dataflow-validate workflow.py --fix # Auto-fix common issues dataflow-validate workflow.py --output json > report.json # Analyze workflow metrics and complexity dataflow-analyze workflow.py --verbosity 2 dataflow-analyze workflow.py --format json # Generate reports and documentation dataflow-generate workflow.py report --output-dir ./reports dataflow-generate workflow.py diagram # ASCII workflow diagram dataflow-generate workflow.py docs --output-dir ./docs # Debug workflows with breakpoints dataflow-debug workflow.py --breakpoint create_user dataflow-debug workflow.py --inspect-node create_user dataflow-debug workflow.py --step # Step-by-step execution # Profile performance and detect bottlenecks dataflow-perf workflow.py --bottlenecks dataflow-perf workflow.py --recommend dataflow-perf workflow.py --format json > perf.json
CLI Commands (5 total):
- dataflow-validate: Validate workflow structure, connections, and parameters with --fix flag
- dataflow-analyze: Workflow metrics, complexity analysis, and execution order
- dataflow-generate: Generate reports, diagrams (ASCII), and documentation
- dataflow-debug: Interactive debugging with breakpoints and node inspection
- dataflow-perf: Performance profiling, bottleneck detection, and recommendations
Use Cases:
- CI/CD integration for workflow validation
- Pre-deployment validation checks
- Performance profiling and optimization
- Documentation generation
- Interactive debugging sessions
Performance: Industry-standard CLI tool performance (<100ms startup)
Common Pitfalls Guide
New: Comprehensive guides for common DataFlow mistakes
CreateNode vs UpdateNode (saves 1-2 hours):
- Side-by-side comparison
- Decision tree for node selection
- 10+ working examples
- Common mistakes and fixes
- See:
sdk-users/apps/dataflow/guides/create-vs-update.md
Top 10 Errors (saves 30-120 minutes per error):
- Quick fix guide for 90% of issues
- Error code reference (DF-101 through DF-805)
- Diagnosis decision tree
- Prevention checklist
- Inspector commands for debugging
- See:
sdk-users/apps/dataflow/troubleshooting/top-10-errors.md
Quick Start
from dataflow import DataFlow from kailash.workflow.builder import WorkflowBuilder from kailash.runtime.local import LocalRuntime # Initialize DataFlow db = DataFlow(connection_string="postgresql://user:pass@localhost/db") # Define model (generates 11 nodes automatically) @db.model class User: id: str # String IDs preserved name: str email: str # Use generated nodes in workflows workflow = WorkflowBuilder() workflow.add_node("User_Create", "create_user", { "data": {"name": "John", "email": "john@example.com"} }) # Execute runtime = LocalRuntime() results, run_id = runtime.execute(workflow.build()) user_id = results["create_user"]["result"] # Access pattern
Reference Documentation
Getting Started
- dataflow-quickstart - Quick start guide and core concepts
- dataflow-installation - Installation and setup
- dataflow-models - Defining models with @db.model decorator
- dataflow-connection-config - Database connection configuration
Core Operations
- dataflow-crud-operations - Create, Read, Update, Delete operations
- dataflow-queries - Query patterns and filtering
- dataflow-bulk-operations - Batch operations for performance
- dataflow-transactions - Transaction management
- dataflow-connection-isolation - ⚠️ CRITICAL: Connection isolation and ACID guarantees
- dataflow-result-access - Accessing results from nodes
Advanced Features
- dataflow-multi-instance - Multiple database instances
- dataflow-multi-tenancy - Multi-tenant architectures
- dataflow-existing-database - Working with existing databases
- dataflow-migrations-quick - Database migrations
- dataflow-custom-nodes - Creating custom database nodes
- dataflow-performance - Performance optimization
Integration & Deployment
- dataflow-nexus-integration - Deploying with Nexus platform
- dataflow-deployment - Production deployment patterns
- dataflow-dialects - Supported database dialects
- dataflow-monitoring - Monitoring and observability
Testing & Quality
- dataflow-tdd-mode - Test-driven development with DataFlow
- dataflow-tdd-api - Testing API for DataFlow
- dataflow-tdd-best-practices - Testing best practices
- dataflow-compliance - Compliance and standards
Troubleshooting & Debugging
- create-vs-update guide - CreateNode vs UpdateNode comprehensive guide
- top-10-errors - Quick fix guide for 90% of issues
- dataflow-gotchas - Common pitfalls and solutions
- dataflow-strict-mode - Strict mode validation HOW-TO guide (Week 9)
- dataflow-validation-layers - 4-layer validation architecture (Week 9)
- dataflow-debug-agent - Intelligent error analysis with 5-stage pipeline (Week 10)
- ErrorEnhancer: Automatic error enhancement (integrated in DataFlow engine) - Enhanced in Week 7
- Inspector API: Self-service debugging (18 introspection methods)
- CLI Tools: Industry-standard command-line validation and debugging tools (5 commands)
Key Concepts
Not an ORM
DataFlow is NOT an ORM. It's a workflow framework that:
- Generates workflow nodes from models
- Operates within Kailash's workflow execution model
- Uses string-based result access patterns
- Integrates seamlessly with other workflow nodes
Automatic Node Generation
Each @db.model class generates 11 nodes:
{Model}_Create- Create single record{Model}_Read- Read by ID{Model}_Update- Update record{Model}_Delete- Delete record{Model}_List- List with filters{Model}_Upsert- Insert or update (atomic){Model}_Count- Efficient COUNT(*) queries{Model}_BulkCreate- Bulk insert{Model}_BulkUpdate- Bulk update{Model}_BulkDelete- Bulk delete{Model}_BulkUpsert- Bulk upsert
Critical Rules
- ✅ String IDs preserved (no UUID conversion)
- ✅ Deferred schema operations (safe for Docker/FastAPI)
- ✅ Multi-instance isolation (one DataFlow per database)
- ✅ Result access:
results["node_id"]["result"] - ❌ NEVER use truthiness checks on filter/data parameters (empty dict
{}is falsy) - ❌ ALWAYS use key existence checks:
if "filter" in kwargsinstead ofif kwargs.get("filter") - ❌ NEVER use direct SQL when DataFlow nodes exist
- ❌ NEVER use SQLAlchemy/Django ORM alongside DataFlow
Database Support
- SQL Databases: PostgreSQL, MySQL, SQLite (11 nodes per @db.model)
- Document Database: MongoDB with flexible schema (8 specialized nodes)
- Vector Search: PostgreSQL pgvector for RAG/AI (3 vector nodes)
- 100% Feature Parity: SQL databases support identical workflows
When to Use This Skill
Use DataFlow when you need to:
- Perform database operations in workflows
- Generate CRUD APIs automatically (with Nexus)
- Implement multi-tenant systems
- Work with existing databases
- Build database-first applications
- Handle bulk data operations
- Implement enterprise data management
Integration Patterns
With Nexus (Multi-Channel)
from dataflow import DataFlow from nexus import Nexus db = DataFlow(connection_string="...") @db.model class User: id: str name: str # Auto-generates API + CLI + MCP nexus = Nexus(db.get_workflows()) nexus.run() # Instant multi-channel platform
With Core SDK (Custom Workflows)
from dataflow import DataFlow from kailash.workflow.builder import WorkflowBuilder db = DataFlow(connection_string="...") # Use db-generated nodes in custom workflows workflow = WorkflowBuilder() workflow.add_node("User_Create", "user1", {...})
Multi-Database Support Matrix
SQL Databases (DatabaseAdapter)
- PostgreSQL: Full support with advanced features (asyncpg driver, pgvector extension, native arrays)
- MySQL: Full support with 100% feature parity (aiomysql driver)
- SQLite: Full support for development/testing/mobile (aiosqlite + custom pooling)
- Nodes Generated: 11 per @db.model (Create, Read, Update, Delete, List, Upsert, Count, BulkCreate, BulkUpdate, BulkDelete, BulkUpsert)
Document Databases (MongoDBAdapter)
- MongoDB: Complete NoSQL support (Motor async driver)
- Features: Flexible schema, aggregation pipelines, text search, geospatial queries
- Workflow Nodes: 8 specialized nodes (DocumentInsert, DocumentFind, DocumentUpdate, DocumentDelete, BulkDocumentInsert, Aggregate, CreateIndex, DocumentCount)
- Use Cases: E-commerce catalogs, content management, user profiles, event logs
Vector Databases (PostgreSQLVectorAdapter)
- PostgreSQL pgvector: Semantic similarity search for RAG/AI (pgvector extension)
- Features: Cosine/L2/inner product distance, HNSW/IVFFlat indexes
- Workflow Nodes: 3 vector nodes (VectorSearch, VectorInsert, VectorUpdate)
- Use Cases: RAG applications, semantic search, recommendation engines
Architecture
- BaseAdapter: Minimal interface for all adapter types (adapter_type, database_type, health_check)
- DatabaseAdapter: SQL-specific (inherits BaseAdapter)
- MongoDBAdapter: Document database (inherits BaseAdapter)
- PostgreSQLVectorAdapter: Vector operations (inherits DatabaseAdapter)
Planned Extensions
- TimescaleDB: Time-series data optimization (PostgreSQL extension)
- Qdrant/Milvus: Dedicated vector databases with advanced filtering
- Redis: Caching and key-value operations
- Neo4j: Graph database with Cypher queries
Related Skills
- 01-core-sdk - Core workflow patterns
- 03-nexus - Multi-channel deployment
- 04-kaizen - AI agent integration
- 17-gold-standards - Best practices
Support
For DataFlow-specific questions, invoke:
dataflow-specialist- DataFlow implementation and patternstesting-specialist- DataFlow testing strategies (NO MOCKING policy)framework-advisor- Choose between Core SDK and DataFlow

Integrum-Global
new_project_template
Download Skill Files
View Installation GuideDownload the complete skill directory including SKILL.md and all related files