ml-pipeline-automation
Build end-to-end ML pipelines with automated data processing, training, validation, and deployment using Airflow, Kubeflow, and Jenkins
About ml-pipeline-automation
ml-pipeline-automation is a Claude AI skill developed by aj-geddes. Build end-to-end ML pipelines with automated data processing, training, validation, and deployment using Airflow, Kubeflow, and Jenkins This powerful Claude Code plugin helps developers automate workflows and enhance productivity with intelligent AI assistance.
Why use ml-pipeline-automation? With 0 stars on GitHub, this skill has been trusted by developers worldwide. Install this Claude skill instantly to enhance your development workflow with AI-powered automation.
| name | ML Pipeline Automation |
| description | Build end-to-end ML pipelines with automated data processing, training, validation, and deployment using Airflow, Kubeflow, and Jenkins |
ML Pipeline Automation
ML pipeline automation orchestrates the entire machine learning workflow from data ingestion through model deployment, ensuring reproducibility, scalability, and reliability.
Pipeline Components
- Data Ingestion: Collecting data from multiple sources
- Data Processing: Cleaning, transformation, feature engineering
- Model Training: Training and hyperparameter tuning
- Validation: Cross-validation and testing
- Deployment: Moving models to production
- Monitoring: Tracking performance metrics
Orchestration Platforms
- Apache Airflow: Workflow scheduling with DAGs
- Kubeflow: Kubernetes-native ML workflows
- Jenkins: CI/CD for ML pipelines
- Prefect: Modern data flow orchestration
- Dagster: Asset-driven orchestration
Python Implementation
import pandas as pd import numpy as np from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, f1_score import joblib import logging from datetime import datetime import json import os # Airflow imports from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago # MLflow for tracking import mlflow import mlflow.sklearn # Logging setup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) print("=== 1. Modular Pipeline Functions ===") # Data ingestion def ingest_data(**context): """Ingest and load data""" logger.info("Starting data ingestion...") X, y = make_classification(n_samples=2000, n_features=30, n_informative=20, random_state=42) data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])]) data['target'] = y # Save to disk data_path = '/tmp/raw_data.csv' data.to_csv(data_path, index=False) context['task_instance'].xcom_push(key='data_path', value=data_path) logger.info(f"Data ingested: {len(data)} rows") return {'status': 'success', 'samples': len(data)} # Data processing def process_data(**context): """Clean and preprocess data""" logger.info("Starting data processing...") # Get data path from previous task task_instance = context['task_instance'] data_path = task_instance.xcom_pull(key='data_path', task_ids='ingest_data') data = pd.read_csv(data_path) # Handle missing values data = data.fillna(data.mean()) # Remove duplicates data = data.drop_duplicates() # Remove outliers (simple approach) numeric_cols = data.select_dtypes(include=[np.number]).columns for col in numeric_cols: Q1 = data[col].quantile(0.25) Q3 = data[col].quantile(0.75) IQR = Q3 - Q1 data = data[(data[col] >= Q1 - 1.5 * IQR) & (data[col] <= Q3 + 1.5 * IQR)] processed_path = '/tmp/processed_data.csv' data.to_csv(processed_path, index=False) task_instance.xcom_push(key='processed_path', value=processed_path) logger.info(f"Data processed: {len(data)} rows after cleaning") return {'status': 'success', 'rows_remaining': len(data)} # Feature engineering def engineer_features(**context): """Create new features""" logger.info("Starting feature engineering...") task_instance = context['task_instance'] processed_path = task_instance.xcom_pull(key='processed_path', task_ids='process_data') data = pd.read_csv(processed_path) # Create interaction features feature_cols = [col for col in data.columns if col.startswith('feature_')] for i in range(min(5, len(feature_cols))): for j in range(i+1, min(6, len(feature_cols))): data[f'interaction_{i}_{j}'] = data[feature_cols[i]] * data[feature_cols[j]] # Create polynomial features for col in feature_cols[:5]: data[f'{col}_squared'] = data[col] ** 2 engineered_path = '/tmp/engineered_data.csv' data.to_csv(engineered_path, index=False) task_instance.xcom_push(key='engineered_path', value=engineered_path) logger.info(f"Features engineered: {len(data.columns)} total features") return {'status': 'success', 'features': len(data.columns)} # Train model def train_model(**context): """Train ML model""" logger.info("Starting model training...") task_instance = context['task_instance'] engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features') data = pd.read_csv(engineered_path) X = data.drop('target', axis=1) y = data['target'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Scale features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Train model model = RandomForestClassifier(n_estimators=100, max_depth=15, random_state=42) model.fit(X_train_scaled, y_train) # Evaluate y_pred = model.predict(X_test_scaled) accuracy = accuracy_score(y_test, y_pred) f1 = f1_score(y_test, y_pred) # Save model model_path = '/tmp/model.pkl' scaler_path = '/tmp/scaler.pkl' joblib.dump(model, model_path) joblib.dump(scaler, scaler_path) task_instance.xcom_push(key='model_path', value=model_path) task_instance.xcom_push(key='scaler_path', value=scaler_path) # Log to MLflow with mlflow.start_run(): mlflow.log_param('n_estimators', 100) mlflow.log_param('max_depth', 15) mlflow.log_metric('accuracy', accuracy) mlflow.log_metric('f1_score', f1) mlflow.sklearn.log_model(model, 'model') logger.info(f"Model trained: Accuracy={accuracy:.4f}, F1={f1:.4f}") return {'status': 'success', 'accuracy': accuracy, 'f1_score': f1} # Validate model def validate_model(**context): """Validate model performance""" logger.info("Starting model validation...") task_instance = context['task_instance'] model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model') engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features') model = joblib.load(model_path) data = pd.read_csv(engineered_path) X = data.drop('target', axis=1) y = data['target'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model') scaler = joblib.load(scaler_path) X_test_scaled = scaler.transform(X_test) # Validate y_pred = model.predict(X_test_scaled) accuracy = accuracy_score(y_test, y_pred) validation_result = { 'status': 'success' if accuracy > 0.85 else 'failed', 'accuracy': accuracy, 'threshold': 0.85, 'timestamp': datetime.now().isoformat() } task_instance.xcom_push(key='validation_result', value=json.dumps(validation_result)) logger.info(f"Validation result: {validation_result}") return validation_result # Deploy model def deploy_model(**context): """Deploy validated model""" logger.info("Starting model deployment...") task_instance = context['task_instance'] validation_result = json.loads(task_instance.xcom_pull( key='validation_result', task_ids='validate_model')) if validation_result['status'] != 'success': logger.warning("Validation failed, deployment skipped") return {'status': 'skipped', 'reason': 'validation_failed'} model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model') scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model') # Simulate deployment deploy_path = '/tmp/deployed_model/' os.makedirs(deploy_path, exist_ok=True) import shutil shutil.copy(model_path, os.path.join(deploy_path, 'model.pkl')) shutil.copy(scaler_path, os.path.join(deploy_path, 'scaler.pkl')) logger.info(f"Model deployed to {deploy_path}") return {'status': 'success', 'deploy_path': deploy_path} # 2. Airflow DAG Definition print("\n=== 2. Airflow DAG ===") dag_definition = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'ml-team', 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( 'ml_pipeline_dag', default_args=default_args, description='End-to-end ML pipeline', schedule_interval='0 2 * * *', # Daily at 2 AM start_date=datetime(2024, 1, 1), catchup=False, ) as dag: # Task 1: Ingest Data ingest = PythonOperator( task_id='ingest_data', python_callable=ingest_data, ) # Task 2: Process Data process = PythonOperator( task_id='process_data', python_callable=process_data, ) # Task 3: Engineer Features engineer = PythonOperator( task_id='engineer_features', python_callable=engineer_features, ) # Task 4: Train Model train = PythonOperator( task_id='train_model', python_callable=train_model, ) # Task 5: Validate Model validate = PythonOperator( task_id='validate_model', python_callable=validate_model, ) # Task 6: Deploy Model deploy = PythonOperator( task_id='deploy_model', python_callable=deploy_model, ) # Define dependencies ingest >> process >> engineer >> train >> validate >> deploy ''' print("Airflow DAG defined with 6 tasks") # 3. Pipeline execution summary print("\n=== 3. Pipeline Execution ===") class PipelineOrchestrator: def __init__(self): self.execution_log = [] self.start_time = None self.end_time = None def run_pipeline(self): self.start_time = datetime.now() logger.info("Starting ML pipeline execution") try: # Execute pipeline tasks result1 = ingest_data(task_instance=self) self.execution_log.append(('ingest_data', result1)) result2 = process_data(task_instance=self) self.execution_log.append(('process_data', result2)) result3 = engineer_features(task_instance=self) self.execution_log.append(('engineer_features', result3)) result4 = train_model(task_instance=self) self.execution_log.append(('train_model', result4)) result5 = validate_model(task_instance=self) self.execution_log.append(('validate_model', result5)) result6 = deploy_model(task_instance=self) self.execution_log.append(('deploy_model', result6)) self.end_time = datetime.now() logger.info("Pipeline execution completed successfully") except Exception as e: logger.error(f"Pipeline execution failed: {str(e)}") def xcom_push(self, key, value): if not hasattr(self, 'xcom_storage'): self.xcom_storage = {} self.xcom_storage[key] = value def xcom_pull(self, key, task_ids): if hasattr(self, 'xcom_storage') and key in self.xcom_storage: return self.xcom_storage[key] return None def get_summary(self): duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0 return { 'start_time': self.start_time.isoformat() if self.start_time else None, 'end_time': self.end_time.isoformat() if self.end_time else None, 'duration_seconds': duration, 'tasks_executed': len(self.execution_log), 'execution_log': self.execution_log } # Execute pipeline orchestrator = PipelineOrchestrator() orchestrator.run_pipeline() summary = orchestrator.get_summary() print("\n=== Pipeline Summary ===") for key, value in summary.items(): if key != 'execution_log': print(f"{key}: {value}") print("\nTask Execution Log:") for task_name, result in summary['execution_log']: print(f" {task_name}: {result}") print("\nML pipeline automation setup completed!")
Pipeline Best Practices
- Modularity: Each step should be independent
- Idempotency: Tasks should be safely repeatable
- Error Handling: Graceful degradation and alerting
- Versioning: Track data, code, and model versions
- Monitoring: Track execution metrics and logs
Scheduling Strategies
- Daily: Standard for daily retraining
- Weekly: For larger feature engineering
- On-demand: Triggered by data updates
- Real-time: For streaming applications
Deliverables
- Automated pipeline DAG
- Task dependency graph
- Execution logs and monitoring
- Performance metrics
- Rollback procedures
- Documentation

aj-geddes
useful-ai-prompts
Download Skill Files
View Installation GuideDownload the complete skill directory including SKILL.md and all related files