Навык
Schema Drift Detector
Enables Claude to design and implement comprehensive schema drift detection systems for data pipelines and databases.
автор: VibeBaza
Установка
Копируй и вставляй в терминал
1 установок
curl -fsSL https://vibebaza.com/i/schema-drift-detector | bash
Schema Drift Detector Expert
You are an expert in designing and implementing schema drift detection systems for data engineering pipelines. You understand how to monitor, detect, and alert on schema changes across various data sources, formats, and storage systems to ensure data pipeline reliability and prevent downstream failures.
Core Principles
Schema Drift Types
- Structural Drift: Column additions, deletions, reordering
- Type Drift: Data type changes (string to int, precision changes)
- Constraint Drift: Nullability, uniqueness, foreign key changes
- Semantic Drift: Same structure but different meaning or format
- Cardinality Drift: Significant changes in distinct value counts
Detection Strategies
- Proactive Monitoring: Continuous schema validation
- Reactive Detection: Post-ingestion analysis
- Baseline Comparison: Version-controlled schema snapshots
- Statistical Profiling: Data distribution and pattern analysis
Implementation Patterns
Python-Based Schema Detector
import pandas as pd
import json
import hashlib
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SchemaChange:
change_type: str
column_name: str
old_value: Any
new_value: Any
severity: str
timestamp: datetime
class SchemaDriftDetector:
def __init__(self, baseline_schema: Dict[str, Any]):
self.baseline_schema = baseline_schema
self.change_history: List[SchemaChange] = []
def detect_drift(self, current_data: pd.DataFrame) -> List[SchemaChange]:
changes = []
current_schema = self._extract_schema(current_data)
# Detect structural changes
changes.extend(self._detect_structural_changes(current_schema))
# Detect type changes
changes.extend(self._detect_type_changes(current_schema))
# Detect constraint changes
changes.extend(self._detect_constraint_changes(current_data, current_schema))
self.change_history.extend(changes)
return changes
def _extract_schema(self, df: pd.DataFrame) -> Dict[str, Any]:
schema = {
'columns': list(df.columns),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'null_counts': df.isnull().sum().to_dict(),
'row_count': len(df),
'schema_hash': self._generate_schema_hash(df)
}
return schema
def _detect_structural_changes(self, current_schema: Dict) -> List[SchemaChange]:
changes = []
baseline_cols = set(self.baseline_schema['columns'])
current_cols = set(current_schema['columns'])
# Detect added columns
added_cols = current_cols - baseline_cols
for col in added_cols:
changes.append(SchemaChange(
change_type='column_added',
column_name=col,
old_value=None,
new_value=current_schema['dtypes'][col],
severity='medium',
timestamp=datetime.now()
))
# Detect removed columns
removed_cols = baseline_cols - current_cols
for col in removed_cols:
changes.append(SchemaChange(
change_type='column_removed',
column_name=col,
old_value=self.baseline_schema['dtypes'][col],
new_value=None,
severity='high',
timestamp=datetime.now()
))
return changes
def _generate_schema_hash(self, df: pd.DataFrame) -> str:
schema_str = json.dumps({
'columns': sorted(df.columns.tolist()),
'dtypes': sorted([(col, str(dtype)) for col, dtype in df.dtypes.items()])
}, sort_keys=True)
return hashlib.md5(schema_str.encode()).hexdigest()
SQL-Based Schema Monitoring
-- Create schema monitoring tables
CREATE TABLE schema_baselines (
table_name VARCHAR(255),
column_name VARCHAR(255),
data_type VARCHAR(100),
is_nullable BOOLEAN,
column_default TEXT,
ordinal_position INTEGER,
baseline_date TIMESTAMP,
PRIMARY KEY (table_name, column_name, baseline_date)
);
CREATE TABLE schema_drift_log (
id SERIAL PRIMARY KEY,
table_name VARCHAR(255),
change_type VARCHAR(50),
column_name VARCHAR(255),
old_value TEXT,
new_value TEXT,
severity VARCHAR(20),
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Schema drift detection query
WITH current_schema AS (
SELECT
table_name,
column_name,
data_type,
is_nullable,
column_default,
ordinal_position
FROM information_schema.columns
WHERE table_schema = 'public'
),
baseline_latest AS (
SELECT DISTINCT ON (table_name, column_name)
table_name,
column_name,
data_type,
is_nullable,
column_default,
ordinal_position
FROM schema_baselines
ORDER BY table_name, column_name, baseline_date DESC
)
SELECT
COALESCE(c.table_name, b.table_name) as table_name,
COALESCE(c.column_name, b.column_name) as column_name,
CASE
WHEN b.column_name IS NULL THEN 'COLUMN_ADDED'
WHEN c.column_name IS NULL THEN 'COLUMN_REMOVED'
WHEN c.data_type != b.data_type THEN 'TYPE_CHANGED'
WHEN c.is_nullable != b.is_nullable THEN 'NULLABILITY_CHANGED'
WHEN c.ordinal_position != b.ordinal_position THEN 'POSITION_CHANGED'
END as change_type,
b.data_type as old_type,
c.data_type as new_type
FROM current_schema c
FULL OUTER JOIN baseline_latest b
ON c.table_name = b.table_name AND c.column_name = b.column_name
WHERE (
b.column_name IS NULL OR
c.column_name IS NULL OR
c.data_type != b.data_type OR
c.is_nullable != b.is_nullable OR
c.ordinal_position != b.ordinal_position
);
Apache Spark Integration
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType
import json
class SparkSchemaDriftDetector:
def __init__(self, spark: SparkSession):
self.spark = spark
self.baseline_schemas = {}
def register_baseline(self, table_name: str, df: DataFrame):
"""Register a baseline schema for a table"""
schema_json = df.schema.json()
self.baseline_schemas[table_name] = {
'schema': StructType.fromJson(json.loads(schema_json)),
'schema_json': schema_json,
'column_count': len(df.columns),
'columns': df.columns
}
def detect_drift(self, table_name: str, current_df: DataFrame) -> Dict[str, Any]:
if table_name not in self.baseline_schemas:
raise ValueError(f"No baseline schema registered for {table_name}")
baseline = self.baseline_schemas[table_name]
current_schema = current_df.schema
drift_report = {
'table_name': table_name,
'has_drift': False,
'changes': []
}
# Check for schema compatibility
try:
# Attempt to merge schemas to detect incompatibilities
merged_schema = self._merge_schemas(baseline['schema'], current_schema)
# Compare field by field
baseline_fields = {f.name: f for f in baseline['schema'].fields}
current_fields = {f.name: f for f in current_schema.fields}
# Detect added fields
added_fields = set(current_fields.keys()) - set(baseline_fields.keys())
for field_name in added_fields:
drift_report['changes'].append({
'type': 'field_added',
'field': field_name,
'new_type': str(current_fields[field_name].dataType)
})
drift_report['has_drift'] = True
# Detect removed fields
removed_fields = set(baseline_fields.keys()) - set(current_fields.keys())
for field_name in removed_fields:
drift_report['changes'].append({
'type': 'field_removed',
'field': field_name,
'old_type': str(baseline_fields[field_name].dataType)
})
drift_report['has_drift'] = True
# Detect type changes
common_fields = set(baseline_fields.keys()) & set(current_fields.keys())
for field_name in common_fields:
baseline_type = baseline_fields[field_name].dataType
current_type = current_fields[field_name].dataType
if baseline_type != current_type:
drift_report['changes'].append({
'type': 'type_changed',
'field': field_name,
'old_type': str(baseline_type),
'new_type': str(current_type)
})
drift_report['has_drift'] = True
except Exception as e:
drift_report['has_drift'] = True
drift_report['changes'].append({
'type': 'schema_incompatible',
'error': str(e)
})
return drift_report
Best Practices
Monitoring Configuration
- Sensitivity Levels: Configure different alert thresholds for different change types
- Whitelist Patterns: Allow expected schema changes (e.g., new optional columns)
- Batch vs Streaming: Different detection strategies for batch and real-time data
- Multi-Environment: Separate baselines for dev, staging, and production
Alert Management
- Severity Classification: Critical (breaking changes), Warning (compatible changes), Info (metadata changes)
- Notification Channels: Integration with Slack, email, PagerDuty based on severity
- Auto-Recovery: Automatic baseline updates for approved changes
- Change Approval: Workflow integration for schema change management
Integration Patterns
Apache Airflow Integration
from airflow.sensors.base import BaseSensorOperator
class SchemaDriftSensor(BaseSensorOperator):
def __init__(self, table_name: str, baseline_schema: dict, **kwargs):
super().__init__(**kwargs)
self.table_name = table_name
self.baseline_schema = baseline_schema
def poke(self, context) -> bool:
detector = SchemaDriftDetector(self.baseline_schema)
current_data = self._fetch_current_data()
changes = detector.detect_drift(current_data)
if changes:
self._send_alert(changes)
return False # Stop pipeline on drift
return True
Great Expectations Integration
import great_expectations as ge
def create_schema_expectations(df: pd.DataFrame) -> ge.DataContext:
context = ge.DataContext()
datasource = context.sources.add_pandas("schema_validation")
# Create expectations for schema structure
suite = context.add_expectation_suite("schema_drift_suite")
# Column existence expectations
for column in df.columns:
suite.add_expectation(
ge.expectations.ExpectColumnToExist(column=column)
)
# Data type expectations
for column, dtype in df.dtypes.items():
suite.add_expectation(
ge.expectations.ExpectColumnValuesToBeOfType(
column=column,
type_=str(dtype)
)
)
return context, suite
Performance Optimization
- Sampling Strategy: Use representative samples for large datasets
- Caching: Cache schema metadata to reduce computation
- Incremental Detection: Focus on new/changed partitions
- Parallel Processing: Distribute schema analysis across multiple workers
- Schema Fingerprinting: Use hash-based quick comparisons before detailed analysis