Apache Beam Pocket Book
Unified batch & streaming • SDKs (Python/Java) • Windows & triggers • IO connectors • Runners • Testing & ops
1) What is Apache Beam?
Apache Beam is a unified programming model for defining data processing pipelines once and executing them on multiple runners (Dataflow, Spark, Flink, Direct). It supports batch and streaming with the same abstractions.
# Install Python SDK
pip install apache-beam[gcp] # add extras as needed
2) Core Concepts
- PCollection: distributed dataset (bounded or unbounded).
- PTransform: operation on PCollections (ParDo, GroupByKey, Combine).
- Pipeline: container that runs transforms on a runner.
- Windowing & Triggers: group elements in time; decide when to emit.
3) Minimal Pipeline (Python)
import apache_beam as beam
with beam.Pipeline() as p:
(p
| "Read" >> beam.io.ReadFromText("input.txt")
| "Upper" >> beam.Map(str.upper)
| "Write" >> beam.io.WriteToText("out/prefix", file_name_suffix=".txt"))
4) Minimal Pipeline (Java)
// Maven deps: org.apache.beam:beam-sdks-java-core
Pipeline p = Pipeline.create();
p.apply(TextIO.read().from("input.txt"))
.apply(MapElements.into(TypeDescriptors.strings())
.via((String s) -> s.toUpperCase()))
.apply(TextIO.write().to("out/prefix").withSuffix(".txt"));
p.run().waitUntilFinish();
5) Common Transforms
- ParDo: general map/flatMap with rich context.
- GroupByKey/Combine: aggregations (sum, mean, custom).
- WindowInto: fixed/sliding/session windows for streaming.
- Side Inputs/Outputs: pass extra data; split outputs.
6) IO Connectors (Python)
lines = (p | beam.io.ReadFromPubSub(topic="projects/PROJ/topics/in")
| beam.Map(lambda b: b.decode("utf-8")))
(rows | beam.io.WriteToBigQuery(table="PROJ:DS.table",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Other common IOs: KafkaIO, JDBCIO, GCS/S3, Elasticsearch, Parquet/Avro.
7) Fixed & Sliding Windows
events | beam.WindowInto(beam.window.FixedWindows(60)) # 1-min fixed
events | beam.WindowInto(beam.window.SlidingWindows(300, 60)) # 5-min slide every 1-min
Use sessions to group bursts separated by gaps.
8) Event Time, Watermarks & Triggers
Beam processes by event time with a watermark that tracks completeness. Triggers control when results emit: after watermark, early firings, late firings.
from apache_beam.transforms import trigger
windowed = (events
| beam.WindowInto(
beam.window.FixedWindows(300),
trigger=trigger.AfterWatermark(
early=trigger.AfterProcessingTime(60),
late=trigger.AfterCount(100)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING))
9) Allowed Lateness & Dedup
Handle late data with allowed_lateness
and deduplicate using ids or time-based keys.
events | beam.WindowInto(beam.window.FixedWindows(300),
allowed_lateness=60)
10) Combining & Aggregations
# Python combine per key
pairs | beam.CombinePerKey(sum)
# Java custom combine
pcol.apply(Combine.perKey(new SumIntsFn()));
11) Choose a Runner
- Direct: local dev/testing.
- Google Cloud Dataflow: fully managed, autoscaling, streaming.
- Apache Flink: low-latency streaming, on-prem/K8s.
- Apache Spark: batch/streaming in Spark clusters.
12) Run on Dataflow (Python)
python main.py \
--runner DataflowRunner \
--project PROJECT_ID \
--region us-central1 \
--staging_location gs://BUCKET/staging \
--temp_location gs://BUCKET/tmp \
--job_name beam-etl-$(date +%Y%m%d-%H%M%S)
13) Run on Flink (Java)
mvn package
# Start Flink cluster, then:
java -jar target/app.jar --runner=FlinkRunner --flinkMaster=localhost:8081
14) Packaging & Dependencies
Use --requirements_file
(Py) or shaded JAR (Java). Keep container images slim if using custom containers.
python main.py --runner DataflowRunner --requirements_file requirements.txt
15) Schema-aware & Row-based APIs
Define schemas for typed rows; simplifies joins and BQ sinks.
@beam.typehints.with_output_types(beam.RowTypeConstraint.from_fields(
[("user", str), ("amount", float)]))
def parse(line): ...
16) Unit Testing (Python)
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
with TestPipeline() as p:
out = (p | beam.Create([1,2,3]) | beam.Map(lambda x: x*2))
assert_that(out, equal_to([2,4,6]))
17) Metrics & Logging
Use Metrics (counters, distributions) and structured logs for visibility. Export to your runner’s monitoring (Dataflow, Flink, Spark UI).
from apache_beam.metrics import Metrics
counter = Metrics.counter("etl","bad_lines")
18) Exactly-once Pattern
Design sinks to be idempotent (upserts), or transactional with dedup keys. For BQ, write to staging, de-dup by key, then merge.
19) Common Pitfalls
- Using processing time instead of event time for late/early data.
- Large side inputs causing memory pressure—use lookups/joins via external stores.
- Emitting huge elements; prefer chunking and file-based shuffles.
20) Interview Q&A — 8 Quick Ones
1) Beam vs Spark? Beam is a model with many runners; Spark is an engine. Beam gives portability; Spark ties you to Spark.
2) Event vs Processing time? Event time = when data happened; processing time = when processed.
3) Watermark? Estimate of completeness for event time; drives trigger firings and late data handling.
4) Triggers? Policies that decide when to emit pane results (on-time, early, late).
5) Stateful DoFn? Keep per-key state/timers for complex patterns (sessions, dedup).
6) Fault tolerance? Checkpointing & replay handled by runner; design idempotent sinks.
7) Backpressure? Handled by runner (Flink/Spark) — design transforms to be non-blocking.
8) Cost control? Right-size workers, autoscale, avoid hot keys, use combiner lifts, and compress IO.