Distributed Sort
Design a system that sorts terabytes-to-petabytes of data across a cluster of commodity machines, covering external merge sort, MapReduce-style parallel sorting, and the I/O bottlenecks that dominate at massive scale.
What is a distributed sorting system?
A distributed sorting system takes a dataset far too large for any single machine and produces a globally sorted output across a cluster of workers. The engineering challenge is that RAM is orders of magnitude smaller than the input: you must spill sorted chunks to disk repeatedly, then merge them back, while minimizing the number of expensive disk passes.
I like to open with this question in interviews because it forces you to reason through the full hardware stack at once, from RAM to SSD to network bandwidth. It also tests whether you understand how frameworks like MapReduce and TeraSort turn an apparently serial problem into a parallel one.
Functional Requirements
Core Requirements
- Sort a dataset too large to fit in the RAM of any single machine.
- Output a globally sorted sequence, split into N partition files on distributed storage.
- Accept a configurable sort key (field name or byte offset).
- Tolerate any single worker failure without restarting the full sort.
Below the Line (out of scope)
- Streaming sort of infinite data (use a Top-K heap service instead).
- Real-time sort with latency under a second.
- Secondary sort keys and custom comparators.
The scope is bounded datasets on distributed storage (HDFS, S3). Streaming and real-time variants branch off at the very first design decision and become fundamentally different systems. Custom comparators are a useful extension but do not change the core architecture, so we defer them.
I'd call out the scope boundary explicitly in an interview. Candidates who try to solve streaming sort and batch sort in a single design end up with a system that does neither well.
Hardest part: Partitioning the keyspace evenly across workers. If your 100-worker job sends 40% of records to one worker because your partition boundaries are poorly chosen, that worker determines the job's end time regardless of how fast everything else runs. Partition-boundary selection from a representative sample is what separates a working sort from a fast sort.
Non-Functional Requirements
Performance
- Sort 10TB in under 2 hours on a 100-node cluster with 128GB RAM and 10Gbps per node.
- Each worker must sustain near-peak sequential I/O throughput (target: above 80% of hardware max).
- Minimize disk passes: every additional full pass over 10TB at 1GB/s per node adds roughly 3 minutes per node, compounded across the merge tree.
Reliability
- Tolerate any single worker failure without restarting the full job; re-run only the failed tasks.
- Coordinator failure must be recoverable from checkpointed state within 5 minutes.
Scalability
- Scale linearly from 10TB to 1PB by adding commodity nodes to the cluster.
- Partition count is configurable; default is one output partition per reduce worker.
Consistency
- Output partitions are globally sorted with no overlapping key ranges and no gaps.
- Partition boundaries are stored alongside the output so consumers can seek without scanning all partitions.
Read/write ratio that shapes this design: External merge sort needs at minimum two full passes over each worker's data: write sorted runs, then read and merge them. On top of that, the shuffle phase moves all records across the network once. For 10TB on a 100-node cluster, each node handles roughly 100GB of shuffled data. At 500MB/s sustained sequential SSD throughput, that is about 3.5 minutes of I/O per pass per node. Every extra merge pass adds another 3.5 minutes. Maximize the initial in-memory sort chunk size and the k-way merge fan-in so all runs merge in a single pass.
Core Entities
- SortJob - top-level job request: job ID, input and output paths, partition count, sort key, and overall status (PENDING, RUNNING, COMPLETED, FAILED).
- InputSplit - a byte range of the input file assigned to a specific map worker for reading.
- SortedRun - an intermediate sorted file written to shared storage by a map worker after sorting one in-memory chunk. Multiple runs are produced per worker and merged during the reduce phase.
- PartitionBoundary - a key value separating two adjacent output partitions. N output partitions require N-1 boundaries, derived from a sampled key distribution.
- OutputPartition - a fully sorted segment of the final output covering one key range, stored at a stable addressable path on distributed storage.
- WorkerTask - a unit of work (SAMPLE, MAP, or REDUCE) with a status, an assigned worker node, and a retry count. Schema details are in the deep dives.
API Design
Submit a sort job
POST /jobs
Content-Type: application/json
{
"input_path": "s3://my-bucket/data/raw/",
"output_path": "s3://my-bucket/data/sorted/",
"sort_key": "$.event_time",
"sort_order": "asc",
"partition_count": 100
}
Returns 202 Accepted with a job_id. The job is asynchronous: the endpoint creates a SortJob record, enqueues the sample task, and returns immediately. Holding an HTTP connection open for a multi-hour sort is not a workable design.
Query job status
GET /jobs/{job_id}
Returns phase (SAMPLE, MAP, SHUFFLE, REDUCE, DONE) and per-phase task counts so the client can observe progress. The client polls until status: COMPLETED or status: FAILED.
Retrieve output locations
GET /jobs/{job_id}/artifacts
Returns the sorted partition files with their key ranges:
{
"job_id": "job_abc123",
"partitions": [
{ "partition_id": 0, "key_range": { "start": null, "end": "fence" }, "path": "s3://my-bucket/data/sorted/part-00000" },
{ "partition_id": 1, "key_range": { "start": "fence", "end": "jump" }, "path": "s3://my-bucket/data/sorted/part-00001" }
]
}
Key ranges let downstream consumers seek to the right partition without scanning every file. This endpoint is only meaningful after status: COMPLETED.
High-Level Design
The design builds in three steps: first, external merge sort on a single worker; next, horizontal scale via sampling and parallel shuffle; finally, fault tolerance via shared run storage and coordinator checkpointing. Each step adds exactly what the previous one can't handle.
Step 1: Sort data that doesn't fit in a single machine's RAM
The naive approach is to load everything into memory. At 10TB with 128GB of RAM, that is impossible in one shot. External merge sort solves this and it is one of the oldest ideas in computer science (dating back to tape drives in the 1960s). Read a RAM-sized chunk, sort it in memory, flush it to disk as a "sorted run," then repeat for every chunk. After all chunks are processed, merge all runs into a single sorted file using a min-heap.
I'd start any distributed sort interview answer here, on a single node. It shows the interviewer you understand the foundational mechanism before jumping to parallelism.
Components:
- Input reader - reads the source file in sequential, fixed-size chunks (128GB per pass). Sequential I/O is critical: random reads on HDD are 100x slower than sequential.
- In-memory sorter - sorts each chunk using quicksort or radix sort.
- Run writer - flushes each sorted chunk as a numbered run file on local disk.
- K-way merger - opens all run file handles simultaneously, uses a min-heap to merge them in a single pass.
Request walkthrough:
- Input reader pulls a 128GB chunk from the 10TB source.
- In-memory sorter produces 128GB of sorted records (roughly 30 seconds for random data).
- Run writer flushes
run-001.binto disk. - Steps 1-3 repeat for all chunks, producing 78 sorted runs for a 10TB input (10TB / 128GB).
- K-way merger opens all 78 run files, maintains a min-heap over their current leading records, and streams sorted output.
This handles data beyond RAM on a single node. The problem is time: at 1GB/s sequential SSD, reading 10TB takes 3 hours for one pass. Two passes minimum (write runs, then merge them) means 12+ hours. You need horizontal scale.
Step 2: Scale horizontally with sampling-based partition boundaries
Continue Reading with Premium
Unlock this article and every other in-depth system design guide on the platform with NotesFromSDE Premium.