Apache Airflow Pocket Book
DAGs & tasks • Schedulers & executors • Connections & XCom • Sensors • TaskFlow API • Deploy & observe
1) What is Apache Airflow?
Airflow is a platform to programmatically author, schedule, and monitor workflows as code (Python). A DAG defines task dependencies; the Scheduler triggers task runs; Executors run tasks on local machines, Celery workers, Kubernetes, etc.
# Quick start (local)
pip install "apache-airflow[azure,celery,cncf.kubernetes,google,amazon]" --constraint <constraints-url>
airflow db init
airflow users create -u admin -p admin -r Admin -e admin@example.com -f Admin -l User
airflow webserver -p 8080 & airflow scheduler
2) Core Concepts
- DAG: Directed Acyclic Graph of tasks with schedules.
- Operator: how work executes (BashOperator, PythonOperator, etc.).
- Task: an Operator instance in a DAG.
- Hook/Connection: credentials & client helpers to external systems.
- XCom: small cross-task messages (metadata, not big data).
3) First DAG (Python)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def hello():
print("Hello, Airflow!")
with DAG(
dag_id="hello_airflow",
start_date=datetime(2024,1,1),
schedule="0 6 * * *",
catchup=False,
default_args={"retries":1,"retry_delay":timedelta(minutes=5)}
) as dag:
task = PythonOperator(task_id="say_hello", python_callable=hello)
Drop this file into your dags/
folder and refresh the UI.
4) TaskFlow API (Cleaner Python)
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024,1,1), schedule="@daily", catchup=False)
def etl():
@task
def extract(): return {"ids":[1,2,3]}
@task
def transform(data): return [i*10 for i in data["ids"]]
@task
def load(rows): print(rows)
load(transform(extract()))
etl()
5) Operators & Sensors
Operators do work; Sensors wait for external conditions (e.g., file in S3, partition in Hive). Use deferrable operators
to avoid resource waste while waiting.
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
S3KeySensor(task_id="wait_for_file", bucket_key="inbound/data_*.csv", bucket_name="landing")
6) Scheduling
Use cron strings or presets (@hourly
, @daily
, @once
). catchup backfills missed runs. Set max_active_runs to limit concurrency per DAG.
with DAG(..., schedule="0 */2 * * *", catchup=True, max_active_runs=1): ...
7) Connections & Variables
Store credentials in the Airflow metadata DB (UI: Admin → Connections). Use env vars or secret backends (Vault, AWS Secrets Manager, GCP Secret Manager) for production.
# CLI example
airflow connections add my_s3 --conn-type aws --conn-extra '{"region_name":"ap-south-1"}'
airflow variables set dataset_bucket my-data-bucket
8) XComs (Pass Small Data)
Tasks can push/pull small metadata via XCom. For big data, use object stores/DBs and pass references in XCom.
@task
def step1(): return {"path":"s3://bucket/key.parquet"}
9) Branching & Trigger Rules
Use BranchPythonOperator
to choose paths at runtime. Control downstream behavior with trigger rules (e.g., all_failed
, one_success
).
from airflow.operators.python import BranchPythonOperator
choose = BranchPythonOperator(task_id="choose", python_callable=lambda: "path_a")
10) SLAs & Alerts
Define SLAs on tasks and configure email/Slack/PagerDuty alerts via callbacks or Alerting integrations.
11) Executors
- Sequential/Local: simple dev/test.
- Celery: distributed workers with a broker (Redis/RabbitMQ).
- Kubernetes: each task in its own pod; great isolation & autoscaling.
# airflow.cfg
executor = CeleryExecutor
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:***@postgres/airflow
12) KubernetesPodOperator Example
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
KubernetesPodOperator(
task_id="spark_job",
name="spark",
namespace="airflow",
image="ghcr.io/org/spark-job:latest",
cmds=["/opt/app/run.sh"],
get_logs=True
)
13) Deploy Options
Docker Compose for dev, Helm chart for Kubernetes, or managed offerings (e.g., Astro, Cloud Composer, MWAA). Package DAGs as a repo; use CI/CD to sync to dags/
(git-sync or artifact bundle).
14) Observability
Use built-in logs and Gantt/Graph views. Export metrics to Prometheus/Grafana; centralize logs in S3/Cloud Logging. Add task-level timing and lineage tags for critical paths.
15) Data-Aware Scheduling
Use Datasets to trigger DAGs when upstream datasets are updated, improving event-driven orchestration.
16) Reliability Patterns
- Retries with exponential delay; idempotent tasks.
- Time limits on tasks; detect and kill zombie tasks.
- Use pools/queues to prevent resource contention.
17) Security
Use RBAC, secret backends, network policies (K8s), and restrict code execution sources. Avoid embedding credentials in DAG files; use Connections/Env.
18) Testing DAGs
Use airflow dags test
for targeted runs; pytest with airflow.testing
utilities for unit tests. Validate DAG loading in CI.
airflow dags test hello_airflow 2025-08-09
19) Common Pitfalls
- Passing large payloads via XCom → use storage and pass URIs.
- Global imports causing long DAG parse times.
- Too many tasks in one DAG; prefer modular DAGs and datasets.
20) Interview Q&A — 8 Quick Ones
1) Airflow vs cron? Airflow handles dependencies, retries, backfills, and observability; cron is per-host scheduling only.
2) When K8s executor? For strong isolation, autoscaling, and heterogeneous task environments.
3) XCom size? Keep tiny (< a few KB). Store data externally; pass references.
4) Idempotency? Design tasks so re-runs don’t corrupt state; use upserts and checkpoints.
5) Backfill safely? Disable side effects or guard with execution date; write outputs by partition.
6) Sensors at scale? Use deferrable sensors to free worker slots while waiting.
7) Secret management? Use Connections + secret backend; never commit creds.
8) CI/CD? Lint & load-check DAGs; deploy via Helm/Compose; run smoke DAG post-deploy.