Dask Pocket Book — Uplatz
50 deep-dive flashcards • Wide layout • Fewer scrolls • 20+ Interview Q&A • Readable code examples
1) What is Dask?
Dask is a flexible parallel computing library for Python. It scales familiar interfaces like NumPy, pandas, and Python iterators from a single machine to clusters by building task graphs and executing them with pluggable schedulers. Core collections: dask.array
(n-dim arrays), dask.dataframe
(tabular), dask.bag
(semi-structured), plus dask.delayed
/futures
for custom workflows. It shines for out-of-core workloads, interactive analysis, and incremental migration from local notebooks to distributed clusters.
# Install (prefer conda for scientific stacks)
conda install -c conda-forge dask distributed
# or
pip install "dask[complete]" distributed
2) Why Dask? Core Strengths & Tradeoffs
Strengths: Python-native, minimal code changes from NumPy/pandas, fine-grained graphs, rich dashboard, and deploy-anywhere clusters. It handles datasets larger than memory via chunking and spilling. Tradeoffs: requires thought about chunk/partition sizing, shuffles can be costly, and debugging distributed state needs tooling discipline. Mitigate by profiling early, designing for locality, and persisting key intermediates.
# Create a client (local multicore)
from dask.distributed import Client
client = Client() # dashboard at http://localhost:8787
3) Task Graph & Scheduler: Mental Model
Dask builds a DAG of fine-grained tasks (functions + data dependencies). Execution is lazy until you call .compute()
or client.compute()
. The scheduler prioritizes tasks, balances work across workers, spills excess data to disk, and cleans up intermediates. Understand that transforms add nodes/edges; compute triggers graph optimization (fusion, blockwise) and execution.
import dask.array as da
x = da.random.random((10_000, 10_000), chunks=(1_000, 1_000))
y = ((x - x.mean(0)) / x.std(0)).sum(axis=1)
result = y.compute()
Tip: use the dashboard’s Graph and Profile tabs to verify fusion and hotspots.
4) Schedulers: Threads, Processes, Distributed
Dask offers multiple schedulers: single-threaded (debug), threaded (good for NumPy/pandas releasing the GIL), multiprocessing (for pure Python/GIL-bound code), and distributed
(networked cluster with dashboard and resilience). Choose based on workload characteristics; test locally then scale out.
import dask
dask.config.set(scheduler="threads") # or "processes" / "single-threaded"
from dask.distributed import Client, LocalCluster
client = Client(LocalCluster(n_workers=4, threads_per_worker=2))
5) Dask vs pandas/Spark/NumPy
Dask vs pandas/NumPy: same APIs scaled via partitions/chunks; great for iterative analytics and interactive notebooks. Dask vs Spark: Dask is Python-first with fine-grained tasks and a broader scientific ecosystem (xarray, scikit-learn, RAPIDS). Spark excels for JVM shops, SQL-heavy ETL, and massive shuffles. Pick based on team skills, ecosystem, and workload patterns.
import dask.dataframe as dd
df = dd.read_parquet("s3://bucket/path/")
res = df[df.amount > 0].groupby("user").amount.mean().compute()
6) Environments & Versions
Prefer conda/conda-forge for scientific stacks. Keep dask
, distributed
, pandas
, numpy
, and IO libs (e.g., pyarrow
) compatible. Pin envs for production and bake into images. Mismatch across workers can cause serialization or behavior differences.
conda create -n dask-env -c conda-forge python=3.11 dask distributed pandas pyarrow
conda activate dask-env
7) Collections Overview
Array: chunked NumPy-like tensors. DataFrame: partitioned pandas frames. Bag: lists/JSON/logs. Use delayed to wrap Python functions into graphs, and futures for immediate execution with result handles.
from dask import delayed
@delayed
def clean_record(x): ...
lazy = [clean_record(r) for r in records]
out = delayed(sum)(lazy).compute()
8) LTS vs Latest
Dask moves quickly alongside pandas/NumPy/Arrow. For production, pin versions you validate in CI and upgrade periodically. For notebooks, latest often gives performance wins (new shuffles, blockwise fusion, parquet engine improvements).
# environment.yml (pin exact versions after testing)
name: dask-prod
channels: [conda-forge]
dependencies:
- python=3.11
- dask
- distributed
- pandas
- pyarrow
9) LocalCluster & Client Basics
Client
connects your Python session to a scheduler, starting workers locally by default. The dashboard at /status
shows tasks, memory, workers, and bandwidth. Use client.upload_file
to ship helpers; prefer packaging for real deployments.
from dask.distributed import Client
client = Client()
print(client.dashboard_link)
10) Q&A — “How does Dask parallelize with the GIL?”
Answer: Many NumPy/pandas ops release the GIL, so the threaded scheduler runs them in parallel. For pure-Python GIL-bound code, use multiprocessing or the distributed scheduler (multiple processes) or offload to native/GPU code.
11) dask.array Essentials
Replicates NumPy with chunked arrays. Choose chunk sizes that fit memory and align with downstream ops. Operations are fused; reductions aggregate across chunks.
import dask.array as da
x = da.arange(10_000_000, chunks=1_000_000)
y = (x**2 + 3).mean().compute()
12) dask.dataframe Essentials
Partitioned pandas. Avoid row-wise Python UDFs; prefer vectorized ops and aggregations. Set meaningful divisions
(index ranges) to speed loc
and joins.
import dask.dataframe as dd
ddf = dd.read_parquet("data/*.parquet")
ddf = ddf.assign(ratio = ddf.sales / ddf.cost)
out = ddf.groupby("store").ratio.mean().compute()
13) IO: Parquet/CSV/Cloud
dd.read_parquet
/to_parquet
via pyarrow
is the sweet spot. Use fsspec URLs (s3://
, gs://
, abfs://
) with appropriate auth. Prefer column pruning and predicate pushdown; write with partition_on
.
ddf = dd.read_parquet("s3://bucket/ds/", storage_options={"anon": False})
ddf.to_parquet("s3://bucket/out/", partition_on=["date"])
14) Persist, Compute, Cache
.compute()
returns in-memory results (NumPy/pandas). .persist()
materializes a collection across the cluster for reuse, keeping a lazy facade. Persist expensive intermediates before repeated downstream steps.
ddf = dd.read_parquet("...").query("amount > 0")
ddf_p = ddf.persist()
summary = ddf_p.groupby("user").amount.mean().compute()
15) Bag: Semi-Structured Pipelines
For logs/JSONlines/text. Map/filter/reduction over Python objects; convert to DataFrame when schema stabilizes. Good for ETL from messy sources.
import json, dask.bag as db
b = db.read_text("logs/*.json").map(lambda s: json.loads(s))
errors = b.filter(lambda r: r["level"]=="error").count().compute()
16) Delayed for Custom Graphs
Wrap Python functions to build graphs explicitly. Compose many small tasks, then compute once. Great for bespoke workflows not covered by collections.
from dask import delayed
@delayed
def load(p): ...
@delayed
def transform(x): ...
@delayed
def save(x, p): ...
d = save(transform(load("in.csv")), "out.parquet")
d.compute()
17) Futures & Real-Time
client.submit
schedules a function immediately and returns a Future; client.map
batches. Use for streaming results, custom backpressure, or interactive control.
from dask.distributed import Client
client = Client()
futs = client.map(lambda x: x**2, range(10_000))
total = client.submit(sum, futs).result()
18) Priorities & Resources
Annotate tasks with priorities and resource tags (e.g., GPU
, memory). The scheduler honors constraints to place tasks on capable workers.
from dask import annotate
with annotate(priority=10, resources={"GPU": 1}):
y = x.map_blocks(cuda_op)
19) Dashboard 101
Key tabs: Status, Task Stream, Graph, Workers, Progress, Profile. Verify fusion, watch bandwidth, and check for spilling/churn.
from dask.distributed import performance_report
with performance_report(filename="report.html"):
result = workflow()
20) Q&A — “DataFrame vs Bag vs Delayed?”
Answer: Use DataFrame for tabular/columnar ops. Use Bag for heterogeneous/semi-structured records (then convert). Use Delayed for custom, function-oriented pipelines that don’t fit collections.
21) Laziness & Graph Fusion
Operations queue up lazily; Dask optimizes by fusion (combining adjacent tasks), blockwise (tile-wise ops), and culling (dropping unused branches). Aim for vectorized transforms for better fusion.
z = x.map_blocks(lambda b: (b - b.mean())/b.std()).sum()
22) Chunk & Partition Sizing
Rule of thumb: chunks that take ~50–500ms each and fit comfortably in memory (e.g., 100MB–1GB per task for arrays; 100k–1M rows for dataframes). Oversized chunks reduce parallelism; undersized increase overhead.
x = da.from_zarr("s3://zarr/ds", chunks={"time": 240, "lat": 256, "lon": 256})
23) Shuffles: Task-Based vs P2P
Groupbys/joins require shuffles. Newer peer-to-peer (P2P) shuffles reduce scheduler load and scale better than task-based shuffles. Prefer Parquet partitioning to avoid unnecessary shuffles and pre-partition by keys.
ddf = ddf.shuffle("user", shuffle="p2p")
24) Memory & Spilling
Workers spill to disk when nearing memory limits. Avoid thrashing by persisting key datasets, using repartition
/coalesce
, and limiting concurrency. Tune --memory-limit
and --nthreads
.
dask-worker scheduler:8786 --nthreads 2 --memory-limit 8GB --nprocs 4
25) Work Stealing & Adaptive
The scheduler steals tasks from busy workers to idle ones. Adaptive scaling grows/shrinks clusters based on backlog — great for bursty notebooks and cost control.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
cluster.adapt(minimum=2, maximum=20)
26) Map-Reduce Patterns
Structure pipelines as map → combine → reduce. Use reduction
in dask.array
and grouped aggregations in DataFrame. Combine partials to minimize data movement.
import dask.array as da
mean = da.reduction(x, chunk=lambda b: (b.sum(), b.size),
combine=lambda a,b: (a[0]+b[0], a[1]+b[1]),
aggregate=lambda s,n: s/n)
27) Checkpoints & Persist Strategy
Materialize expensive steps to reduce recomputation after failures. Persist after heavy shuffles or IO, then branch into multiple analyses. Use client.rebalance
to spread memory evenly.
ddf = dd.read_parquet("...").persist()
client.rebalance(ddf)
28) Retries & Resilience
Dask retries failed tasks; configure attempt counts and timeouts. For flaky sources, wrap with idempotent IO and add retry logic at the function level.
from dask.distributed import Client
client = Client(retries=3, timeout="120s")
29) Backpressure & Flow Control
With futures, control inflight task counts to avoid overwhelming memory or remote services. Use semaphores/queues or chunk inputs.
from dask.distributed import Semaphore
sem = Semaphore(10)
def guarded(x):
with sem:
return do_work(x)
30) Q&A — “persist() vs compute() vs cache?”
Answer: compute() returns concrete local results. persist() executes and keeps data distributed for reuse. cache (DataFrame) persistently pins data and returns the same collection for method chaining. Persist when branching or iterating downstream.
31) dask-ml & scikit-learn
dask-ml
adds scalable estimators and utilities (incremental learning, parallel grid search). Use for large hyperparameter sweeps or out-of-core preprocessing; many scikit-learn estimators work with Dask arrays.
from dask_ml.model_selection import GridSearchCV
from sklearn.linear_model import SGDClassifier
est = SGDClassifier()
grid = {"alpha": [1e-4, 1e-3, 1e-2]}
search = GridSearchCV(est, grid)
search.fit(X_dask, y_dask)
32) XGBoost & LightGBM
Integrate with Dask for distributed training. Benefits: parallel data loading, multi-node training, and cluster resource management.
from dask.distributed import Client
from xgboost.dask import DaskXGBClassifier
clf = DaskXGBClassifier(tree_method="hist")
clf.fit(X_dask, y_dask)
33) GPUs & RAPIDS
Use dask-cuda
+ RAPIDS (cudf
, cupy
) for GPU-accelerated ETL/ML. UCX enables high-speed GPU-to-GPU comms (NVLink, InfiniBand). Partition data per GPU; prefer columnar formats.
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
34) Xarray: Labeled N-D
Xarray uses Dask under the hood for lazy, chunked computations on labeled arrays (climate, earth science). Choose chunking along time/space wisely for resampling and reductions.
import xarray as xr
ds = xr.open_zarr("s3://bucket/climate.zarr", chunks={"time": 240})
annual = ds.temp.resample(time="1Y").mean().compute()
35) Geo at Scale
dask-geopandas
parallelizes GeoPandas; rioxarray
/rasterio
work with Xarray for rasters. Use spatial partitioning and windowed reads; watch shuffles on spatial joins.
import dask_geopandas as dg
gdf = dg.read_file("s3://.../tiles.parquet")
result = gdf.sjoin(polygons).compute()
36) Kubernetes
Spin clusters inside K8s using KubeCluster
or Dask Gateway. Package envs in images, mount secrets for cloud credentials, enable adaptive scaling, and expose the dashboard securely.
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml("worker-spec.yaml")
cluster.adapt(minimum=1, maximum=50)
client = Client(cluster)
37) Cloud Storage & fsspec
Dask uses fsspec for filesystems. Configure creds via env or storage_options
. Co-locate compute with data to minimize egress; use blocksize aligned to object stores.
ddf = dd.read_parquet("gs://bucket/data/", storage_options={"token": "cloud"})
ddf.to_parquet("gs://bucket/out/")
38) Orchestration: Prefect/Airflow
Wrap Dask tasks in flows/DAGs for scheduling, retries, and audit trails. Use Dask as the execution engine; emit task metadata to your orchestrator.
# Prefect example
from prefect_dask import DaskTaskRunner
task_runner = DaskTaskRunner(cluster_class="distributed.LocalCluster")
39) Communications & Security
Transport backends include TCP, TLS, and UCX. Enable TLS for encryption/auth between workers; mount certs via secrets. UCX accelerates GPU and high-speed interconnects.
dask config set distributed.comm.tls.ca-file=ca.pem
dask config set distributed.comm.tls.key=key.pem
dask config set distributed.comm.tls.cert=cert.pem
40) Q&A — “When is Spark a better fit?”
Answer: Heavy SQL ETL in JVM ecosystems, batch jobs with massive wide shuffles, or existing Spark infra/skills. Dask is ideal for Pythonic analytics, scientific stacks, custom Python functions, and interactive notebooks that gradually scale.
41) Security Fundamentals
Secure the scheduler/dashboard with network policies and TLS; avoid exposing to the public internet. Protect cloud creds, sanitize logs, and validate inputs before distributed execution. Use least-privilege IAM roles for object storage.
# Example: TLS in config.yaml
distributed:
comm:
tls:
ca-file: ca.pem
key: key.pem
cert: cert.pem
42) Reproducibility & Data Contracts
Pin environments, lock dataset versions/paths, and validate schemas (Pandera/Great Expectations). Include run metadata (git SHA, env, parameters) in outputs for lineage.
import pandera as pa
class Sales(pa.DataFrameModel):
amount: pa.Field(gt=0)
Sales.validate(ddf.head(10_000).compute())
43) Testing Dask Code
Unit test pure functions with small pandas/NumPy inputs. Use pytest
with a LocalCluster fixture for integration tests. Assert on graph structure (HighLevelGraph
) and results.
import pytest
from dask.distributed import Client, LocalCluster
@pytest.fixture
def client():
c = Client(LocalCluster(n_workers=2))
yield c
c.close()
44) Linting, Formatting & Types
Black + Ruff/Flake8 for style; MyPy/pyright for types. Type Dask wrappers with typing.Protocol
or pandas/numpy types. Keep functions pure and side-effect free for easier graphing.
pip install black ruff mypy
black .
ruff check .
mypy src/
45) Performance & Profiling
Use the dashboard’s Profile/Task Stream, performance_report
, and client.profile
. Look for skewed partitions, small tasks overhead, and spilling. Optimize IO (predicate pushdown), chunk sizes, and reduce wide shuffles.
from dask.distributed import performance_report
with performance_report("perf.html"):
result = pipeline()
46) Deployment Options
LocalCluster/SSHCluster for simple setups; Kubernetes or Dask Gateway for multi-tenant; managed hosting or VM auto-scaling for convenience. Bake Docker images with pinned conda envs; mount secrets; define resource limits.
# Minimal Dockerfile for Dask worker
FROM mambaorg/micromamba:latest
COPY --chown=micromamba:micromamba env.yml /tmp/env.yml
RUN micromamba install -y -n base -f /tmp/env.yml && micromamba clean --all -y
CMD ["dask-worker","tcp://scheduler:8786"]
47) Observability
Enable Prometheus metrics, structured logs, and tracing around critical functions. Track queue sizes, task durations, memory, and bandwidth. Set SLOs for end-to-end latency and success rates.
# scrape /metrics endpoints when available in your deployment
# adjust confidential data handling as needed
48) Prod Checklist
- Pinned envs & image reproducibility
- Secure scheduler/dashboard (TLS, network policies)
- Partitioning aligned with access patterns
- Persist key intermediates; limit wide shuffles
- Resource limits; autoscaling & quotas
- Dashboards/alerts & runbooks for failures
49) Common Pitfalls
Too many tiny tasks; row-wise Python UDFs; unbounded shuffles; mismatched package versions; reading data from afar (egress); overfilling worker memory; forgetting to persist before branching; ignoring skew.
50) Interview Q&A — 20 Practical Questions (Expanded)
1) Why Dask for Python teams? It scales NumPy/pandas/Xarray code with minimal changes and keeps you in Python.
2) Lazy vs eager? Dask builds DAGs lazily; compute()
/persist()
trigger execution.
3) Threads vs processes? Threads for NumPy/pandas ops that release the GIL; processes for pure-Python/GIL-bound code.
4) What is a shuffle? A data re-partition by key (groupby/join); expensive due to network/data movement.
5) Avoiding tiny tasks? Increase chunk/partition sizes; use blockwise/vectorized ops; fuse tasks.
6) When to persist? After expensive IO/shuffles and before branching/iterating downstream.
7) Handling skew? Repartition by size, pre-hash keys, or sample to balance partitions.
8) Monitoring hotspots? Use Task Stream, Profile, and Bandwidth plots on the dashboard.
9) Cloud storage tips? Co-locate compute with data; enable predicate pushdown; tune blocksize.
10) UCX use case? High-speed GPU/GPU or IB/NVLink clusters for RAPIDS workloads.
11) Futures vs delayed? Futures execute immediately and return handles; delayed stays lazy until compute.
12) DataFrame vs Bag? DataFrame for tabular; Bag for semi-structured/JSONlines.
13) Memory thrash fix? Reduce concurrency, increase chunk size, persist, and rebalance.
14) Adaptive scaling? Automatically resizes the cluster based on backlog.
15) Checkpointing strategy? Persist key states and write Parquet snapshots between stages.
16) Schema contracts? Validate with Pandera/Great Expectations before expensive steps.
17) Secure clusters? TLS, private networking, locked dashboard, minimal permissions.
18) Integration with sklearn? Use dask-ml for parallel hyperparam search and scalable preprocessing.
19) Choosing chunk sizes? Aim for 50–500ms per task; tune with the dashboard.
20) Spark or Dask? JVM/SQL-heavy batch → Spark; Pythonic science/interactive → Dask.