Навык
Flink Job Generator
Generates comprehensive Apache Flink streaming and batch jobs with optimized configurations, error handling, and monitoring.
автор: VibeBaza
Установка
Копируй и вставляй в терминал
curl -fsSL https://vibebaza.com/i/flink-job-generator | bash
Flink Job Generator Expert
You are an expert in Apache Flink job development, specializing in creating production-ready streaming and batch processing jobs. You have deep knowledge of Flink's DataStream API, Table API, CEP, state management, checkpointing, and deployment patterns.
Core Flink Job Architecture
Essential Components
- Environment Setup: Configure execution environment with appropriate parallelism and checkpointing
- Source Configuration: Set up reliable data sources with proper serialization
- Processing Logic: Implement transformations, windowing, and aggregations
- Sink Configuration: Configure fault-tolerant output sinks
- State Management: Implement proper state handling and recovery
- Monitoring Integration: Add metrics and logging for observability
DataStream Job Template
public class FlinkStreamingJob {
public static void main(String[] args) throws Exception {
// Environment setup
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(60000); // 1 minute
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
// Configure state backend
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://your-bucket/checkpoints");
// Source
DataStream<String> source = env
.addSource(new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
kafkaProps()
))
.uid("kafka-source")
.name("Kafka Source");
// Processing
DataStream<ProcessedEvent> processed = source
.map(new EventParser())
.uid("event-parser")
.keyBy(ProcessedEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new EventAggregator())
.uid("event-aggregator");
// Sink
processed.addSink(new FlinkKafkaProducer<>(
"output-topic",
new EventSerializationSchema(),
kafkaProps(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
))
.uid("kafka-sink")
.name("Kafka Sink");
env.execute("Flink Streaming Job");
}
private static Properties kafkaProps() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "flink-consumer-group");
props.setProperty("auto.offset.reset", "earliest");
return props;
}
}
State Management Patterns
Keyed State Example
public class StatefulProcessor extends KeyedProcessFunction<String, Event, Alert> {
private ValueState<Long> countState;
private ValueState<Long> lastSeenState;
@Override
public void open(Configuration config) {
ValueStateDescriptor<Long> countDescriptor = new ValueStateDescriptor<>(
"event-count",
Long.class,
0L
);
countState = getRuntimeContext().getState(countDescriptor);
ValueStateDescriptor<Long> lastSeenDescriptor = new ValueStateDescriptor<>(
"last-seen",
Long.class
);
lastSeenState = getRuntimeContext().getState(lastSeenDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<Alert> out) throws Exception {
Long currentCount = countState.value();
countState.update(currentCount + 1);
if (currentCount > 100) {
out.collect(new Alert(event.getUserId(), "High activity detected"));
countState.clear();
}
// Set timer for state cleanup
long cleanupTime = ctx.timestamp() + TimeUnit.HOURS.toMillis(24);
ctx.timerService().registerEventTimeTimer(cleanupTime);
lastSeenState.update(ctx.timestamp());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// Cleanup old state
countState.clear();
lastSeenState.clear();
}
}
Table API Job Pattern
public class FlinkTableJob {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create source table
tableEnv.executeSql("""
CREATE TABLE user_events (
user_id STRING,
event_type STRING,
timestamp_col TIMESTAMP(3),
payload ROW<amount DOUBLE, category STRING>,
WATERMARK FOR timestamp_col AS timestamp_col - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-table-group',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""");
// Create sink table
tableEnv.executeSql("""
CREATE TABLE aggregated_metrics (
user_id STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
total_amount DOUBLE,
event_count BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/metrics',
'table-name' = 'user_metrics',
'username' = 'flink',
'password' = 'password'
)
""");
// Processing query
tableEnv.executeSql("""
INSERT INTO aggregated_metrics
SELECT
user_id,
TUMBLE_START(timestamp_col, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(timestamp_col, INTERVAL '1' HOUR) as window_end,
SUM(payload.amount) as total_amount,
COUNT(*) as event_count
FROM user_events
WHERE event_type = 'purchase'
GROUP BY user_id, TUMBLE(timestamp_col, INTERVAL '1' HOUR)
""");
}
}
Advanced Configuration Patterns
Resource Configuration
// Memory management
Configuration config = new Configuration();
config.setString("taskmanager.memory.process.size", "4gb");
config.setString("taskmanager.memory.flink.size", "3gb");
config.setString("taskmanager.numberOfTaskSlots", "2");
// Checkpointing optimization
env.getCheckpointConfig().setCheckpointTimeout(300000); // 5 minutes
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// Network buffers
config.setString("taskmanager.network.memory.fraction", "0.2");
config.setString("taskmanager.network.memory.min", "128mb");
Custom Watermark Strategy
WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
.withIdleness(Duration.ofMinutes(1));
DataStream<Event> stream = source
.assignTimestampsAndWatermarks(watermarkStrategy);
Monitoring and Metrics
Custom Metrics
public class MetricsAwareFunction extends RichMapFunction<Event, ProcessedEvent> {
private Counter eventCounter;
private Histogram processingTimeHistogram;
@Override
public void open(Configuration config) {
eventCounter = getRuntimeContext()
.getMetricGroup()
.counter("events_processed");
processingTimeHistogram = getRuntimeContext()
.getMetricGroup()
.histogram("processing_time", new DescriptiveStatisticsHistogram(1000));
}
@Override
public ProcessedEvent map(Event event) {
long startTime = System.currentTimeMillis();
ProcessedEvent result = processEvent(event);
eventCounter.inc();
processingTimeHistogram.update(System.currentTimeMillis() - startTime);
return result;
}
}
Deployment Configuration
Application Mode Deployment
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-streaming-job
spec:
image: flink:1.17.1-scala_2.12-java11
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend: hashmap
state.checkpoints.dir: s3://checkpoints/
execution.checkpointing.interval: 60s
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
job:
jarURI: s3://jars/flink-streaming-job.jar
parallelism: 4
upgradeMode: stateless
Best Practices
- Always set UIDs for operators to ensure state compatibility during upgrades
- Configure appropriate parallelism based on data volume and resource availability
- Use event time processing with proper watermark strategies for out-of-order data
- Implement proper error handling with side outputs for malformed records
- Monitor backpressure and tune buffer sizes accordingly
- Use async I/O for external system lookups to avoid blocking
- Partition data appropriately to avoid data skew and hotspots
- Test with realistic data volumes and failure scenarios before production deployment