Apache Beam Pocket Book

Apache Beam Pocket Book

Unified batch & streaming • SDKs (Python/Java) • Windows & triggers • IO connectors • Runners • Testing & ops

Section 1 — Fundamentals

1) What is Apache Beam?

Apache Beam is a unified programming model for defining data processing pipelines once and executing them on multiple runners (Dataflow, Spark, Flink, Direct). It supports batch and streaming with the same abstractions.

# Install Python SDK
pip install apache-beam[gcp]  # add extras as needed

2) Core Concepts

  • PCollection: distributed dataset (bounded or unbounded).
  • PTransform: operation on PCollections (ParDo, GroupByKey, Combine).
  • Pipeline: container that runs transforms on a runner.
  • Windowing & Triggers: group elements in time; decide when to emit.

3) Minimal Pipeline (Python)

import apache_beam as beam
with beam.Pipeline() as p:
  (p
   | "Read" >> beam.io.ReadFromText("input.txt")
   | "Upper" >> beam.Map(str.upper)
   | "Write" >> beam.io.WriteToText("out/prefix", file_name_suffix=".txt"))

4) Minimal Pipeline (Java)

// Maven deps: org.apache.beam:beam-sdks-java-core
Pipeline p = Pipeline.create();
p.apply(TextIO.read().from("input.txt"))
 .apply(MapElements.into(TypeDescriptors.strings())
        .via((String s) -> s.toUpperCase()))
 .apply(TextIO.write().to("out/prefix").withSuffix(".txt"));
p.run().waitUntilFinish();

5) Common Transforms

  • ParDo: general map/flatMap with rich context.
  • GroupByKey/Combine: aggregations (sum, mean, custom).
  • WindowInto: fixed/sliding/session windows for streaming.
  • Side Inputs/Outputs: pass extra data; split outputs.

Section 2 — IO, Windowing & Triggers

6) IO Connectors (Python)

lines = (p | beam.io.ReadFromPubSub(topic="projects/PROJ/topics/in")
            | beam.Map(lambda b: b.decode("utf-8")))
(rows | beam.io.WriteToBigQuery(table="PROJ:DS.table",
       write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

Other common IOs: KafkaIO, JDBCIO, GCS/S3, Elasticsearch, Parquet/Avro.

7) Fixed & Sliding Windows

events | beam.WindowInto(beam.window.FixedWindows(60))      # 1-min fixed
events | beam.WindowInto(beam.window.SlidingWindows(300, 60)) # 5-min slide every 1-min

Use sessions to group bursts separated by gaps.

8) Event Time, Watermarks & Triggers

Beam processes by event time with a watermark that tracks completeness. Triggers control when results emit: after watermark, early firings, late firings.

from apache_beam.transforms import trigger
windowed = (events
  | beam.WindowInto(
      beam.window.FixedWindows(300),
      trigger=trigger.AfterWatermark(
        early=trigger.AfterProcessingTime(60),
        late=trigger.AfterCount(100)),
      accumulation_mode=trigger.AccumulationMode.ACCUMULATING))

9) Allowed Lateness & Dedup

Handle late data with allowed_lateness and deduplicate using ids or time-based keys.

events | beam.WindowInto(beam.window.FixedWindows(300),
                         allowed_lateness=60)

10) Combining & Aggregations

# Python combine per key
pairs | beam.CombinePerKey(sum)

# Java custom combine
pcol.apply(Combine.perKey(new SumIntsFn()));

Section 3 — Runners & Deployment

11) Choose a Runner

  • Direct: local dev/testing.
  • Google Cloud Dataflow: fully managed, autoscaling, streaming.
  • Apache Flink: low-latency streaming, on-prem/K8s.
  • Apache Spark: batch/streaming in Spark clusters.

12) Run on Dataflow (Python)

python main.py \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --region us-central1 \
  --staging_location gs://BUCKET/staging \
  --temp_location gs://BUCKET/tmp \
  --job_name beam-etl-$(date +%Y%m%d-%H%M%S)

13) Run on Flink (Java)

mvn package
# Start Flink cluster, then:
java -jar target/app.jar --runner=FlinkRunner --flinkMaster=localhost:8081

14) Packaging & Dependencies

Use --requirements_file (Py) or shaded JAR (Java). Keep container images slim if using custom containers.

python main.py --runner DataflowRunner --requirements_file requirements.txt

15) Schema-aware & Row-based APIs

Define schemas for typed rows; simplifies joins and BQ sinks.

@beam.typehints.with_output_types(beam.RowTypeConstraint.from_fields(
  [("user", str), ("amount", float)]))
def parse(line): ...

Section 4 — Testing, Observability & Patterns

16) Unit Testing (Python)

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

with TestPipeline() as p:
  out = (p | beam.Create([1,2,3]) | beam.Map(lambda x: x*2))
  assert_that(out, equal_to([2,4,6]))

17) Metrics & Logging

Use Metrics (counters, distributions) and structured logs for visibility. Export to your runner’s monitoring (Dataflow, Flink, Spark UI).

from apache_beam.metrics import Metrics
counter = Metrics.counter("etl","bad_lines")

18) Exactly-once Pattern

Design sinks to be idempotent (upserts), or transactional with dedup keys. For BQ, write to staging, de-dup by key, then merge.

19) Common Pitfalls

  • Using processing time instead of event time for late/early data.
  • Large side inputs causing memory pressure—use lookups/joins via external stores.
  • Emitting huge elements; prefer chunking and file-based shuffles.

20) Interview Q&A — 8 Quick Ones

1) Beam vs Spark? Beam is a model with many runners; Spark is an engine. Beam gives portability; Spark ties you to Spark.

2) Event vs Processing time? Event time = when data happened; processing time = when processed.

3) Watermark? Estimate of completeness for event time; drives trigger firings and late data handling.

4) Triggers? Policies that decide when to emit pane results (on-time, early, late).

5) Stateful DoFn? Keep per-key state/timers for complex patterns (sessions, dedup).

6) Fault tolerance? Checkpointing & replay handled by runner; design idempotent sinks.

7) Backpressure? Handled by runner (Flink/Spark) — design transforms to be non-blocking.

8) Cost control? Right-size workers, autoscale, avoid hot keys, use combiner lifts, and compress IO.