Apache Hudi Pocket Book
Upserts on data lakes • COW vs MOR • Timeline • Indexing • Incremental pulls • Compaction & clustering • Spark/Flink/Presto/Trino
1) What is Apache Hudi?
Apache Hudi brings database-like primitives (upsert, delete, change capture) to data lakes on object stores (S3/GCS/ADLS/HDFS). It maintains a timeline of commits and supports fast incremental processing and near-real-time ingestion.
# Spark quick start (PySpark)
pip install pyspark apache-hudi
2) Table Types
- Copy-on-Write (COW): writes create new Parquet files; simpler, great for read-optimized analytics.
- Merge-on-Read (MOR): appends to log files, compacts later; lower write latency, good for near real-time.
3) Core Keys
- recordKey: unique id per row (e.g., order_id).
- partitionPath: directory layout (e.g., dt=YYYY-MM-DD, region=US).
- preCombineField: choose latest record when duplicates land within a batch (e.g., ts).
4) Operations
insert, upsert, bulk_insert (initial load), delete, insert_overwrite (replace partitions), delta_commit (MOR).
5) Query Types
- Snapshot: latest view (COW/MOR).
- Read Optimized: Parquet only (MOR without merging logs).
- Incremental: rows changed since a commit time.
6) Spark Write — Upsert (Python)
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate())
df = spark.read.json("s3://raw/orders/*.json")
(base_path, table) = ("s3://lake/bronze/orders_hudi", "orders_hudi")
(df.write.format("hudi")
.option("hoodie.table.name", table)
.option("hoodie.datasource.write.recordkey.field", "order_id")
.option("hoodie.datasource.write.partitionpath.field", "dt")
.option("hoodie.datasource.write.precombine.field", "updated_at")
.option("hoodie.datasource.write.operation", "upsert")
.mode("append")
.save(base_path))
7) Spark Read — Snapshot / Incremental
# Snapshot
snap = (spark.read.format("hudi").load("s3://lake/bronze/orders_hudi"))
# Incremental since a commit time
inc = (spark.read.format("hudi")
.option("hoodie.datasource.query.type","incremental")
.option("hoodie.datasource.read.begin.instanttime","20250701000000")
.load("s3://lake/bronze/orders_hudi"))
8) Flink Streaming Ingest (Java/SQL)
-- Flink SQL example (simplified)
CREATE TABLE orders_hudi (
order_id STRING,
amount DOUBLE,
updated_at TIMESTAMP(3),
dt STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) PARTITIONED BY (dt)
WITH ('connector'='hudi','table.type'='MERGE_ON_READ');
9) Query Engines
Use Presto/Trino/Athena/Hive/Spark SQL for interactive queries. For MOR, snapshot gives merged view; read_optimized is faster but may be slightly stale.
10) Indexing
Hudi maintains indexes (Bloom, HBase, Simple, Bucket) to find file groups during upsert. Choose based on scale and key distribution; bucket index improves large-scale upserts.
11) Timeline & Commits
Hudi records actions as instants (commit
, delta_commit
, clean
, compaction
, restore
). Use the timeline to run incremental ETL and CDC.
12) Compaction (MOR)
Merges log files into Parquet to keep reads fast. Schedule and run compaction during low-traffic windows or continuously with Flink.
# Spark compaction trigger
.option("hoodie.compact.inline","true")
.option("hoodie.compact.inline.max.delta.commits","10")
13) Clustering (File Layout)
Rewrites file layout for better query performance (e.g., sort columns, larger files). Can be async and incremental.
.option("hoodie.clustering.inline","true")
.option("hoodie.clustering.plan.strategy.sort.columns","dt,region")
14) Cleaning & Retention
Automatically removes obsolete file versions to control storage.
.option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS")
.option("hoodie.cleaner.commits.retained","20")
15) Schema Evolution
Supports add/drop nullable columns and compatible type changes. Keep evolution consistent across producers; validate with a schema registry when possible.
16) Partitioning & File Sizes
- Prefer hierarchical partitions with manageable cardinality (e.g., dt=YYYY-MM-DD, region).
- Tune target file size (e.g., 128–512 MB) for scan efficiency.
17) Lakehouse Interop
Hudi competes/interop with Iceberg & Delta Lake. Choose based on: upsert latency (Hudi/MOR), catalog/ACID features, engine support, and org standardization.
18) CDC & Incremental ETL
Ingest source CDC (Debezium/Kafka) into Hudi; downstream jobs read incrementally using the begin instant to avoid full scans.
19) Common Pitfalls
- Skewed keys → slow upserts; consider bucket index or repartitioning.
- Unmanaged compaction → growing log files → slow reads.
- Over-partitioning → too many small files; tune clustering/file size.
- Reading MOR without merge when freshness is required.
20) Interview Q&A — 8 Quick Ones
1) Why Hudi over plain Parquet on S3? Upserts, deletes, incremental queries, and consistent snapshots on a lake.
2) COW vs MOR? COW = simpler, great read perf; MOR = lower write latency with log + compaction.
3) How does incremental pull work? Use commit times on the timeline to read only changed rows.
4) What is preCombineField? Break ties among duplicate keys in a batch, keeping the latest version.
5) How to handle late data? Use preCombine timestamp and partition design; MOR helps absorb late arrivals.
6) Speeding up upserts? Proper indexing (bucket/Bloom), co-locate data, tune parallelism & file sizes.
7) Compaction vs Clustering? Compaction merges logs; clustering rewrites file layout for query efficiency.
8) Query engines? Spark/Hive/Presto/Trino/Athena support snapshot/read-optimized modes.