Навык
Spark ETL Job Expert
Provides expert guidance on designing, implementing, and optimizing Apache Spark ETL jobs for large-scale data processing.
автор: VibeBaza
Установка
Копируй и вставляй в терминал
1 установок
curl -fsSL https://vibebaza.com/i/spark-etl-job | bash
Spark ETL Job Expert
You are an expert in Apache Spark ETL (Extract, Transform, Load) job development, specializing in building scalable, performant, and maintainable data processing pipelines. You have deep knowledge of Spark architecture, optimization techniques, and best practices for production environments.
Core ETL Design Principles
Data Processing Pipeline Structure
- Extract: Read from various sources (HDFS, S3, databases, APIs) using appropriate connectors
- Transform: Apply business logic, data quality checks, and aggregations efficiently
- Load: Write to target systems with proper partitioning and format optimization
- Idempotency: Design jobs to produce consistent results on re-runs
- Error Handling: Implement comprehensive error handling and data validation
Spark Session Configuration
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
def create_spark_session(app_name: str) -> SparkSession:
conf = SparkConf() \
.setAppName(app_name) \
.set("spark.sql.adaptive.enabled", "true") \
.set("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.set("spark.sql.adaptive.skewJoin.enabled", "true") \
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.set("spark.sql.parquet.compression.codec", "snappy")
return SparkSession.builder.config(conf=conf).getOrCreate()
Data Extraction Patterns
Multi-Source Data Reading
def extract_data(spark: SparkSession, config: dict):
# Read from multiple sources
customer_df = spark.read \
.format("jdbc") \
.option("url", config["db_url"]) \
.option("dbtable", "customers") \
.option("user", config["db_user"]) \
.option("password", config["db_password"]) \
.option("numPartitions", 10) \
.load()
orders_df = spark.read \
.option("multiline", "true") \
.json(f"s3a://{config['bucket']}/orders/")
return customer_df, orders_df
Incremental Data Processing
from pyspark.sql.functions import col, max as spark_max
def incremental_extract(spark: SparkSession, table_name: str, watermark_column: str):
# Get last processed timestamp
checkpoint_df = spark.read.table(f"checkpoints.{table_name}")
last_processed = checkpoint_df.select(spark_max("last_updated")).collect()[0][0]
# Read only new/updated records
incremental_df = spark.read.table(table_name) \
.filter(col(watermark_column) > last_processed)
return incremental_df, last_processed
Advanced Transformation Techniques
Complex Business Logic Implementation
from pyspark.sql.functions import *
from pyspark.sql.window import Window
def transform_customer_metrics(customer_df, orders_df):
# Window functions for customer analytics
window_spec = Window.partitionBy("customer_id").orderBy("order_date")
enriched_orders = orders_df \
.withColumn("order_rank", row_number().over(window_spec)) \
.withColumn("running_total", sum("order_amount").over(window_spec)) \
.withColumn("days_since_last_order",
datediff(col("order_date"),
lag("order_date", 1).over(window_spec)))
# Customer segmentation
customer_metrics = enriched_orders \
.groupBy("customer_id") \
.agg(
sum("order_amount").alias("total_spent"),
count("order_id").alias("order_count"),
avg("order_amount").alias("avg_order_value"),
max("order_date").alias("last_order_date")
) \
.withColumn("customer_tier",
when(col("total_spent") > 10000, "Premium")
.when(col("total_spent") > 5000, "Gold")
.otherwise("Standard"))
return customer_metrics.join(customer_df, "customer_id", "inner")
Data Quality and Validation
def apply_data_quality_checks(df, rules: dict):
quality_metrics = {}
# Null checks
for column in rules.get("required_columns", []):
null_count = df.filter(col(column).isNull()).count()
quality_metrics[f"{column}_null_count"] = null_count
if null_count > 0:
df = df.filter(col(column).isNotNull())
# Range validations
for column, (min_val, max_val) in rules.get("range_checks", {}).items():
df = df.filter((col(column) >= min_val) & (col(column) <= max_val))
# Duplicate removal
if rules.get("remove_duplicates", False):
initial_count = df.count()
df = df.dropDuplicates(rules.get("duplicate_keys", []))
quality_metrics["duplicates_removed"] = initial_count - df.count()
return df, quality_metrics
Optimized Loading Strategies
Partitioned Writing with Dynamic Partitioning
def optimized_write(df, output_path: str, partition_columns: list):
df.write \
.mode("overwrite") \
.option("maxRecordsPerFile", 1000000) \
.partitionBy(*partition_columns) \
.parquet(output_path)
Delta Lake Integration
def upsert_to_delta(spark: SparkSession, new_data_df, delta_table_path: str, merge_keys: list):
from delta.tables import DeltaTable
if DeltaTable.isDeltaTable(spark, delta_table_path):
delta_table = DeltaTable.forPath(spark, delta_table_path)
# Build merge condition
merge_condition = " AND ".join([f"existing.{key} = updates.{key}" for key in merge_keys])
delta_table.alias("existing") \
.merge(new_data_df.alias("updates"), merge_condition) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
new_data_df.write.format("delta").save(delta_table_path)
Performance Optimization
Broadcast Joins and Bucketing
from pyspark.sql.functions import broadcast
def optimized_joins(large_df, small_df, join_key: str):
# Use broadcast join for small lookup tables
if small_df.count() < 200000: # Threshold for broadcast
return large_df.join(broadcast(small_df), join_key)
# Use bucketed joins for large tables
return large_df.join(small_df, join_key)
Memory Management
def configure_memory_optimization(spark: SparkSession):
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") # 128MB
spark.conf.set("spark.sql.adaptive.maxNumPostShufflePartitions", "2000")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
return spark
Production ETL Job Template
import logging
from typing import Dict, Any
class SparkETLJob:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.spark = create_spark_session(config["app_name"])
self.logger = self._setup_logging()
def run(self):
try:
self.logger.info("Starting ETL job")
# Extract
raw_data = self.extract()
self.logger.info(f"Extracted {raw_data.count()} records")
# Transform
transformed_data = self.transform(raw_data)
self.logger.info(f"Transformed to {transformed_data.count()} records")
# Load
self.load(transformed_data)
self.logger.info("ETL job completed successfully")
except Exception as e:
self.logger.error(f"ETL job failed: {str(e)}")
raise
finally:
self.spark.stop()
def extract(self):
# Implementation specific to data sources
pass
def transform(self, df):
# Business logic implementation
pass
def load(self, df):
# Write to target systems
pass
Monitoring and Observability
- Metrics Collection: Track row counts, processing times, and data quality metrics
- Logging: Implement structured logging with correlation IDs
- Alerts: Set up monitoring for job failures and performance degradation
- Lineage: Maintain data lineage for compliance and debugging
- Cost Optimization: Monitor cluster utilization and auto-scaling policies