Apache Hudi Pocket Book

Apache Hudi Pocket Book

Upserts on data lakes • COW vs MOR • Timeline • Indexing • Incremental pulls • Compaction & clustering • Spark/Flink/Presto/Trino

Section 1 — Fundamentals

1) What is Apache Hudi?

Apache Hudi brings database-like primitives (upsert, delete, change capture) to data lakes on object stores (S3/GCS/ADLS/HDFS). It maintains a timeline of commits and supports fast incremental processing and near-real-time ingestion.

# Spark quick start (PySpark)
pip install pyspark apache-hudi

2) Table Types

  • Copy-on-Write (COW): writes create new Parquet files; simpler, great for read-optimized analytics.
  • Merge-on-Read (MOR): appends to log files, compacts later; lower write latency, good for near real-time.

3) Core Keys

  • recordKey: unique id per row (e.g., order_id).
  • partitionPath: directory layout (e.g., dt=YYYY-MM-DD, region=US).
  • preCombineField: choose latest record when duplicates land within a batch (e.g., ts).

4) Operations

insert, upsert, bulk_insert (initial load), delete, insert_overwrite (replace partitions), delta_commit (MOR).

5) Query Types

  • Snapshot: latest view (COW/MOR).
  • Read Optimized: Parquet only (MOR without merging logs).
  • Incremental: rows changed since a commit time.

Section 2 — Write & Query (Spark/Flink)

6) Spark Write — Upsert (Python)

from pyspark.sql import SparkSession
spark = (SparkSession.builder
  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  .getOrCreate())

df = spark.read.json("s3://raw/orders/*.json")

(base_path, table) = ("s3://lake/bronze/orders_hudi", "orders_hudi")

(df.write.format("hudi")
  .option("hoodie.table.name", table)
  .option("hoodie.datasource.write.recordkey.field", "order_id")
  .option("hoodie.datasource.write.partitionpath.field", "dt")
  .option("hoodie.datasource.write.precombine.field", "updated_at")
  .option("hoodie.datasource.write.operation", "upsert")
  .mode("append")
  .save(base_path))

7) Spark Read — Snapshot / Incremental

# Snapshot
snap = (spark.read.format("hudi").load("s3://lake/bronze/orders_hudi"))
# Incremental since a commit time
inc = (spark.read.format("hudi")
  .option("hoodie.datasource.query.type","incremental")
  .option("hoodie.datasource.read.begin.instanttime","20250701000000")
  .load("s3://lake/bronze/orders_hudi"))

8) Flink Streaming Ingest (Java/SQL)

-- Flink SQL example (simplified)
CREATE TABLE orders_hudi (
  order_id STRING,
  amount DOUBLE,
  updated_at TIMESTAMP(3),
  dt STRING,
  PRIMARY KEY (order_id) NOT ENFORCED
) PARTITIONED BY (dt)
WITH ('connector'='hudi','table.type'='MERGE_ON_READ');

9) Query Engines

Use Presto/Trino/Athena/Hive/Spark SQL for interactive queries. For MOR, snapshot gives merged view; read_optimized is faster but may be slightly stale.

10) Indexing

Hudi maintains indexes (Bloom, HBase, Simple, Bucket) to find file groups during upsert. Choose based on scale and key distribution; bucket index improves large-scale upserts.

Section 3 — Timeline, Maintenance & Performance

11) Timeline & Commits

Hudi records actions as instants (commit, delta_commit, clean, compaction, restore). Use the timeline to run incremental ETL and CDC.

12) Compaction (MOR)

Merges log files into Parquet to keep reads fast. Schedule and run compaction during low-traffic windows or continuously with Flink.

# Spark compaction trigger
.option("hoodie.compact.inline","true")
.option("hoodie.compact.inline.max.delta.commits","10")

13) Clustering (File Layout)

Rewrites file layout for better query performance (e.g., sort columns, larger files). Can be async and incremental.

.option("hoodie.clustering.inline","true")
.option("hoodie.clustering.plan.strategy.sort.columns","dt,region")

14) Cleaning & Retention

Automatically removes obsolete file versions to control storage.

.option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS")
.option("hoodie.cleaner.commits.retained","20")

15) Schema Evolution

Supports add/drop nullable columns and compatible type changes. Keep evolution consistent across producers; validate with a schema registry when possible.

Section 4 — Design, Integrations & Patterns

16) Partitioning & File Sizes

  • Prefer hierarchical partitions with manageable cardinality (e.g., dt=YYYY-MM-DD, region).
  • Tune target file size (e.g., 128–512 MB) for scan efficiency.

17) Lakehouse Interop

Hudi competes/interop with Iceberg & Delta Lake. Choose based on: upsert latency (Hudi/MOR), catalog/ACID features, engine support, and org standardization.

18) CDC & Incremental ETL

Ingest source CDC (Debezium/Kafka) into Hudi; downstream jobs read incrementally using the begin instant to avoid full scans.

19) Common Pitfalls

  • Skewed keys → slow upserts; consider bucket index or repartitioning.
  • Unmanaged compaction → growing log files → slow reads.
  • Over-partitioning → too many small files; tune clustering/file size.
  • Reading MOR without merge when freshness is required.

20) Interview Q&A — 8 Quick Ones

1) Why Hudi over plain Parquet on S3? Upserts, deletes, incremental queries, and consistent snapshots on a lake.

2) COW vs MOR? COW = simpler, great read perf; MOR = lower write latency with log + compaction.

3) How does incremental pull work? Use commit times on the timeline to read only changed rows.

4) What is preCombineField? Break ties among duplicate keys in a batch, keeping the latest version.

5) How to handle late data? Use preCombine timestamp and partition design; MOR helps absorb late arrivals.

6) Speeding up upserts? Proper indexing (bucket/Bloom), co-locate data, tune parallelism & file sizes.

7) Compaction vs Clustering? Compaction merges logs; clustering rewrites file layout for query efficiency.

8) Query engines? Spark/Hive/Presto/Trino/Athena support snapshot/read-optimized modes.