Apache Airflow Pocket Book

Apache Airflow Pocket Book

DAGs & tasks • Schedulers & executors • Connections & XCom • Sensors • TaskFlow API • Deploy & observe

Section 1 — Fundamentals

1) What is Apache Airflow?

Airflow is a platform to programmatically author, schedule, and monitor workflows as code (Python). A DAG defines task dependencies; the Scheduler triggers task runs; Executors run tasks on local machines, Celery workers, Kubernetes, etc.

# Quick start (local)
pip install "apache-airflow[azure,celery,cncf.kubernetes,google,amazon]" --constraint <constraints-url>
airflow db init
airflow users create -u admin -p admin -r Admin -e admin@example.com -f Admin -l User
airflow webserver -p 8080 & airflow scheduler

2) Core Concepts

  • DAG: Directed Acyclic Graph of tasks with schedules.
  • Operator: how work executes (BashOperator, PythonOperator, etc.).
  • Task: an Operator instance in a DAG.
  • Hook/Connection: credentials & client helpers to external systems.
  • XCom: small cross-task messages (metadata, not big data).

3) First DAG (Python)

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

def hello():
    print("Hello, Airflow!")

with DAG(
    dag_id="hello_airflow",
    start_date=datetime(2024,1,1),
    schedule="0 6 * * *",
    catchup=False,
    default_args={"retries":1,"retry_delay":timedelta(minutes=5)}
) as dag:
    task = PythonOperator(task_id="say_hello", python_callable=hello)

Drop this file into your dags/ folder and refresh the UI.

4) TaskFlow API (Cleaner Python)

from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024,1,1), schedule="@daily", catchup=False)
def etl():
    @task
    def extract(): return {"ids":[1,2,3]}
    @task
    def transform(data): return [i*10 for i in data["ids"]]
    @task
    def load(rows): print(rows)
    load(transform(extract()))
etl()

5) Operators & Sensors

Operators do work; Sensors wait for external conditions (e.g., file in S3, partition in Hive). Use deferrable operators to avoid resource waste while waiting.

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
S3KeySensor(task_id="wait_for_file", bucket_key="inbound/data_*.csv", bucket_name="landing")

Section 2 — Scheduling, Connections & Data

6) Scheduling

Use cron strings or presets (@hourly, @daily, @once). catchup backfills missed runs. Set max_active_runs to limit concurrency per DAG.

with DAG(..., schedule="0 */2 * * *", catchup=True, max_active_runs=1): ...

7) Connections & Variables

Store credentials in the Airflow metadata DB (UI: Admin → Connections). Use env vars or secret backends (Vault, AWS Secrets Manager, GCP Secret Manager) for production.

# CLI example
airflow connections add my_s3 --conn-type aws --conn-extra '{"region_name":"ap-south-1"}'
airflow variables set dataset_bucket my-data-bucket

8) XComs (Pass Small Data)

Tasks can push/pull small metadata via XCom. For big data, use object stores/DBs and pass references in XCom.

@task
def step1(): return {"path":"s3://bucket/key.parquet"}

9) Branching & Trigger Rules

Use BranchPythonOperator to choose paths at runtime. Control downstream behavior with trigger rules (e.g., all_failed, one_success).

from airflow.operators.python import BranchPythonOperator
choose = BranchPythonOperator(task_id="choose", python_callable=lambda: "path_a")

10) SLAs & Alerts

Define SLAs on tasks and configure email/Slack/PagerDuty alerts via callbacks or Alerting integrations.

Section 3 — Execution, Deployment & Observability

11) Executors

  • Sequential/Local: simple dev/test.
  • Celery: distributed workers with a broker (Redis/RabbitMQ).
  • Kubernetes: each task in its own pod; great isolation & autoscaling.
# airflow.cfg
executor = CeleryExecutor
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:***@postgres/airflow

12) KubernetesPodOperator Example

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
KubernetesPodOperator(
  task_id="spark_job",
  name="spark",
  namespace="airflow",
  image="ghcr.io/org/spark-job:latest",
  cmds=["/opt/app/run.sh"],
  get_logs=True
)

13) Deploy Options

Docker Compose for dev, Helm chart for Kubernetes, or managed offerings (e.g., Astro, Cloud Composer, MWAA). Package DAGs as a repo; use CI/CD to sync to dags/ (git-sync or artifact bundle).

14) Observability

Use built-in logs and Gantt/Graph views. Export metrics to Prometheus/Grafana; centralize logs in S3/Cloud Logging. Add task-level timing and lineage tags for critical paths.

15) Data-Aware Scheduling

Use Datasets to trigger DAGs when upstream datasets are updated, improving event-driven orchestration.

Section 4 — Reliability, Security & Patterns

16) Reliability Patterns

  • Retries with exponential delay; idempotent tasks.
  • Time limits on tasks; detect and kill zombie tasks.
  • Use pools/queues to prevent resource contention.

17) Security

Use RBAC, secret backends, network policies (K8s), and restrict code execution sources. Avoid embedding credentials in DAG files; use Connections/Env.

18) Testing DAGs

Use airflow dags test for targeted runs; pytest with airflow.testing utilities for unit tests. Validate DAG loading in CI.

airflow dags test hello_airflow 2025-08-09

19) Common Pitfalls

  • Passing large payloads via XCom → use storage and pass URIs.
  • Global imports causing long DAG parse times.
  • Too many tasks in one DAG; prefer modular DAGs and datasets.

20) Interview Q&A — 8 Quick Ones

1) Airflow vs cron? Airflow handles dependencies, retries, backfills, and observability; cron is per-host scheduling only.

2) When K8s executor? For strong isolation, autoscaling, and heterogeneous task environments.

3) XCom size? Keep tiny (< a few KB). Store data externally; pass references.

4) Idempotency? Design tasks so re-runs don’t corrupt state; use upserts and checkpoints.

5) Backfill safely? Disable side effects or guard with execution date; write outputs by partition.

6) Sensors at scale? Use deferrable sensors to free worker slots while waiting.

7) Secret management? Use Connections + secret backend; never commit creds.

8) CI/CD? Lint & load-check DAGs; deploy via Helm/Compose; run smoke DAG post-deploy.