Design a system that tracks the top K most popular items in real time across multiple time windows, from simple in-memory heaps to Count-Min Sketch and distributed stream aggregation at LinkedIn or Amazon scale.
37 min read2026-03-29hardtop-kheavy-hitterssystem-designstreamingdistributed-systems
A top-K heavy hitters system identifies the K items seen most frequently in a stream of events, such as the top 10 trending hashtags on Twitter or the top 100 searched products on Amazon. The core loop (count everything, rank it, return the top K) sounds simple. The hard part is two things: counting accurately at millions of events per second without blowing out memory, and merging local top-K lists across dozens of distributed stream processors into a single globally correct ranking without re-processing history.
I'd open with the merge problem on a whiteboard because it catches most candidates off guard. Everyone knows how to count; the interesting design work is in how you combine counts from 32 machines into one correct ranking without shipping all the data.
Exact deduplication of duplicate events from the same user session
The hardest part in scope: Merging distributed top-K results correctly. Naively unioning the top-K list from each stream processor is wrong. An item ranked 11th on every individual node can be globally first if its traffic is spread evenly across partitions, and you miss it entirely. The solution (merging Count-Min Sketches) is the deepest technical challenge in this system, and merging Count-Min Sketches is the subject of Deep Dive 3.
User authentication is below the line because it does not change the counting or ranking path. To add it, attach a user_id to each event but route the event through the same Kafka topic and count pipeline unchanged.
Comments
Item metadata enrichment is below the line because it belongs in a separate lookup service. The ranking layer stores only item IDs and approximate counts. Display names and images are joined at the API response layer using a separate catalog service.
Exact deduplication is below the line because it requires a per-user Bloom filter or a Redis SET of (user_session, item_id) pairs, which adds significant storage complexity. Approximate deduplication is acceptable for trending signals: if a user clicks the same product 20 times in a session, the trend still reflects genuine engagement.
Throughput: Handle 1 billion events per day (roughly 11,500 events per second at steady state, with 3x peak burst = 35,000 events per second).
Staleness: Rankings are stale by at most 30 seconds. Sub-second freshness is not required; 30-second staleness is imperceptible for trending content.
Memory: Memory per time window is bounded regardless of item cardinality. At 1 million distinct items per day, exact counting needs 8 MB per window. Count-Min Sketch reduces this to under 200 KB per window.
Availability: 99.9% uptime. A brief gap in ranking freshness is acceptable; returning stale results from a pre-computed snapshot is always preferable to a total outage.
Query latency: Top-K reads return in under 10 ms p99. Rankings are pre-computed, so reads resolve to a single Redis ZREVRANGE command.
Exact (non-approximate) counts for all items (audit-grade counting requires storing every event individually, which does not bound memory)
Sub-second ranking latency (requires a hot path with no batching window, approaching the Redis throughput ceiling at 35K events/second)
Read/write ratio: 1 billion events per day vs roughly 100,000 top-K queries per day gives a 10,000:1 write-to-read ratio. This is an extremely write-heavy system. The write path is the entire design challenge: ingestion throughput, stream aggregation, and memory-bounded counting all exist to handle this asymmetry. The read path is trivial by comparison, because pre-computed snapshots reduce every query to a Redis ZREVRANGE.
The 10,000:1 ratio tells you immediately that the system must decouple writes from reads. You cannot compute rankings synchronously on every write event, and you do not need to: a 30-second staleness window gives you time to batch-aggregate and snapshot asynchronously.
I'd write this ratio on the whiteboard early because it immediately kills any design that tries to compute rankings on every incoming event. Once the interviewer sees 10,000:1, the conversation shifts to "how do we absorb writes cheaply and serve reads from a snapshot?"
Event: A single occurrence of an item being counted (a search query, a product view, a trending signal). Carries item_id, event_type, and timestamp.
Counter: The running approximate frequency for an item_id within a specific time window. Stored in-memory inside stream processors as a Count-Min Sketch structure, not as individual key-value rows.
TopKSnapshot: A pre-computed, point-in-time list of the top K items and their approximate counts for a given time window. Stored in Redis as a sorted set and refreshed every 30 seconds.
TimeWindow: A named time boundary (for example, last_1m, last_1h, last_24h) that scopes a TopKSnapshot. The system maintains one active TopKSnapshot per TimeWindow.
Schema details for the snapshot store (sorted set key naming, TTL policy, serialization format) are deferred to the deep dives. The four entities above are sufficient to drive the API design and High-Level Design.
The system exposes two endpoints: one to ingest events and one to query rankings. Events are ingested asynchronously because the write volume (35K events/sec at peak) makes synchronous counting impractical at the app server layer.
FR 1 and FR 3 - Ingest an event:
POST /eventsBody: { item_id: "product:abc123", event_type: "view", timestamp: 1711670400}Response: HTTP 202 AcceptedBody: { message: "Event queued for processing" }
202 instead of 200: the event is not counted the moment the POST completes. It enters a Kafka queue and is counted asynchronously by a stream processor. Returning 200 would imply the event was counted immediately, which is false and misleading to callers. 202 accurately signals acceptance for later processing.
The response includes snapshot_age_seconds so callers know exactly how stale the pre-computed result is. This transparency lets downstream services decide whether to display the result immediately or wait for a fresher snapshot before refreshing a high-visibility leaderboard.
Direct database writes saturate the primary at 50K events per second, making a write queue the first non-negotiable addition to the system.
The simplest design routes every event from the app server directly to a relational database. At 35,000 events per second at peak, a single Postgres instance cannot sustain the write rate without growing queue depth unboundedly (typical ceiling: 10,000-20,000 simple writes per second before latency degrades). The fix is a Kafka topic that absorbs burst writes and lets stream processors consume at their own pace.
Components:
Client: Web or mobile client that generates item events (product views, search queries, clicks).
App Server: Accepts POST /events, validates the payload, and publishes the event to Kafka. Does not perform any counting.
Kafka: Durable event queue that absorbs write bursts. Decouples the ingestion rate from the stream processing rate. Events are retained for 24 hours to support replay after processor failure.
Stream Processor Fleet: A horizontally scalable fleet (Kafka Streams, Flink, or Spark Streaming) that consumes from Kafka and maintains in-memory frequency counts. Each processor owns one or more Kafka partitions.
Redis Counter Store: Receives aggregated item counts flushed from stream processors every 30 seconds. Stores counts as Redis hashes keyed by time window and item ID.
Request walkthrough:
Client sends POST /events with item_id and event_type.
App Server validates the payload and publishes to the Kafka topic item-events, using item_id as the partition key (so all events for the same item always go to the same processor).
Kafka durably persists the event.
Each stream processor reads from its assigned partitions and increments an in-memory count for the item_id.
Every 30 seconds, each processor flushes aggregated counts to Redis Counter Store using HINCRBY batches.
This diagram covers the write path only. The query path and ranking pre-computation come in the next section.
I'd pause here in an interview and explicitly say: "The app server does zero counting. It is a dumb pipe into Kafka." That single sentence communicates that you understand the separation of concerns, and it prevents the interviewer from wondering whether the app server is a bottleneck.
Scanning all counters on every query is O(N) across potentially millions of items; pre-computing a TopKSnapshot every 30 seconds and serving reads from a Redis sorted set eliminates all per-query computation.
The naive query approach reads all counters from Redis and sorts them in application memory. At 1 million distinct items, that is a million HGETALL values to sort per query. For bulk query traffic (thousands of users refreshing a leaderboard simultaneously), on-demand sorting is a bottleneck that pre-computation eliminates entirely.
Components (new and changed):
Snapshot Builder: A background process that runs every 30 seconds. Reads all counters from Redis Counter Store for each time window, computes top K using a min-heap, and writes the result to a Redis sorted set.
Redis Sorted Set (TopKSnapshot): Stores the pre-computed top-K list per time window. Key pattern: topk:{window}. ZADD writes the score (approximate count) and member (item_id). ZREVRANGE reads the top K in O(log N + K) time.
Query API: Serves GET /top-k by calling ZREVRANGE on the appropriate Redis sorted set. No ranking computation at query time.
Request walkthrough (query path):
Client sends GET /top-k?window=last_1h&k=10.
Query API resolves the Redis key: topk:last_1h.
Query API calls ZREVRANGE topk:last_1h 0 9 WITHSCORES.
Redis returns the top 10 item_id values and their approximate counts in O(log N + K) time.
Query API attaches snapshot_age_seconds (computed from the ZADD timestamp stored alongside the set) and returns the response.
This diagram adds the full query path. Section 3 extends the stream processor layer to handle multiple time windows simultaneously without re-processing Kafka history.
The key insight I'd highlight here: every single read is a Redis ZREVRANGE. There is no computation at query time. If the interviewer asks "what happens during a traffic spike on the read side?", the answer is "nothing changes, because reads never touch the counting pipeline."
Maintaining one Count-Min Sketch per active time window inside each stream processor eliminates the need to replay Kafka history when a new window interval begins.
The naive approach to supporting three time windows (1 minute, 1 hour, 24 hours) runs three independent Kafka consumer groups, each consuming all events and maintaining its own counters. This triples infrastructure cost and, more critically, cannot support sliding window queries; you cannot subtract old minutes from an approximate counter because CMS decrement operations are invalid and they would corrupt the error guarantee.
Components (new and changed):
Stream Processor Fleet (windowed): Each processor now maintains one Count-Min Sketch per active time window. On receiving an event, it increments all active sketches simultaneously in a single pass. No re-processing required when a new window slot begins.
Window Merge Layer: Every 30 seconds, deserializes Count-Min Sketch bytes published by all processor nodes and adds the matrices element-by-element to produce a globally merged sketch per window.
Snapshot Builder (multi-window): Reads the merged CMS for each window, extracts top K via a min-heap, and writes the TopKSnapshot to the appropriate Redis sorted set.
The Redis Counter Store from Section 2 is retired; stream processors now flush serialized sketch bytes to the merge layer instead of raw counts.
Request walkthrough:
Stream processor receives an event for item_id="product:abc123".
Processor increments the CMS for all active windows in one pass: cms_1m, cms_1h, cms_24h.
Every 30 seconds, each processor serializes its CMS per window (108 KB each) and publishes the bytes to a merge Kafka topic.
The Window Merge Layer deserializes all sketch messages for the current period and adds them element-by-element (CMS is additively decomposable).
The merged CMS per window flows to the Snapshot Builder, which extracts top K and writes to the Redis sorted sets.
Each stream processor handles all windows in a single pass over the event stream. This design eliminates re-processing: a new 1-hour window starts its ring buffer slot clean, and old slot data expires automatically when the ring index wraps around (covered in Deep Dive 4).
Three architecturally distinct options exist: an exact hash map (exact counts, unbounded memory), Redis sorted sets (pre-built ranking structure, write saturation), or a Count-Min Sketch with a min-heap (approximate counts, bounded memory). I'd lead with the sketch in an interview after briefly naming the others, because the bounded-memory property is what makes the system viable at scale.
The architecture above uses true streaming (Kafka plus stream processors). The alternative is batch (hourly or micro-batch Spark jobs). The decision is driven by the 30-second staleness NFR: batch does not achieve it, and micro-batch barely achieves it with high overhead.
I've seen candidates jump to Spark because it is the tool they know best. The problem is that Spark's scheduling overhead dominates when the trigger interval drops below a few minutes. I'd name the three options quickly and then justify Kafka Streams by the staleness requirement.
Each stream processor sees only a slice of traffic (its assigned Kafka partitions). To produce a globally correct top-K, you must merge partial results from all nodes. The naive approach (union the local top-K lists) is subtly and severely wrong, and it is the mistake I see most often from candidates who have studied top-K systems but not implemented one.
Naively unioning local top-K lists produces a globally incorrect result. An item ranked 11th on every single processor node can be globally ranked first. Never skip this detail in an interview.
Tumbling windows are the right starting point. The challenge is how to answer a variable-width sliding window query (such as "top K in the last 3 hours") from pre-computed tumbling slots, and how to expire old window data without replaying Kafka.
This deep dive is where I'd spend the most whiteboard time. The ring buffer idea is elegant but non-obvious, and walking through the slot expiry logic step by step shows the interviewer you can reason about data structures at the systems level, not just pick libraries off a shelf.
What data structure tracks item frequency at a single node? Count-Min Sketch (d=10, w=2718) for approximate frequency estimation at 108 KB per window regardless of item cardinality. A min-heap of size K extracts the top-K from a Space-Saving candidate set. Never use an exact hash map: memory grows unboundedly with cardinality.
What is the Space-Saving structure and why is it needed? Space-Saving tracks top-K candidates with O(K) memory and guarantees no heavy hitter is missed. It provides the candidate set that CMS estimate() is called against, since CMS has no built-in way to enumerate all items.
How real-time does the ranking need to be? 30 seconds is the right answer for trending content (Twitter, Amazon, YouTube). Sub-second freshness requires removing the batching window and doing per-event Redis ZINCRBY, which approaches the single-instance Redis throughput ceiling at 35K events/second.
Why 202 instead of 200 for event ingestion? 202 signals that the event was accepted and the event is counted by a Kafka stream processor seconds later. 200 would falsely imply the event was counted at the moment the response was sent.
How do you aggregate top-K correctly across distributed nodes? Merge Count-Min Sketch matrices by element-wise addition, not by unioning local top-K lists. A globally popular item may rank outside the top-K on every individual node if its traffic is evenly partitioned. Sketch merging is mathematically correct and uses 75x less bandwidth than shipping full count maps.
How do you support multiple time windows without re-processing Kafka history? Use a ring buffer of 30-second tumbling CMS slots (2880 slots = 24 hours, ~313 MB per processor node). Any window query merges the trailing N slots: 1 minute = 2 slots, 1 hour = 120 slots, 24 hours = 2880 slots. Memory is constant regardless of runtime duration.
Stream vs batch? Use Kafka Streams with stateful in-memory CMS. Hourly batch Spark misses the 30-second staleness NFR by 120x and wastes compute re-processing unchanged events. Micro-batch Spark (30-second trigger) is operationally complex with high checkpoint I/O overhead per trigger.
What happens if a stream processor crashes? Kafka Streams re-reads from the last committed Kafka offset and rebuilds the CMS from recent events. For the 1-minute window this takes a few seconds. For the 24-hour window, restore from a RocksDB-backed state store that persists the ring buffer every 30 seconds; full restoration takes under 10 seconds from disk.
What are the system memory bounds? CMS per window per node: 108 KB. Ring buffer per node for 24 hours: ~313 MB (2,880 slots x 108 KB per sketch). Redis sorted sets: O(K) per window (top-K items only). Contrast with exact counting: 8 MB per window per node at 1M distinct items, growing linearly with cardinality.
Can you use Redis ZINCRBY instead? Yes for small cardinality (under 100K distinct items). At 1M+ distinct items, Redis sorted set memory grows to 80-100 MB per window and 105K ZINCRBY calls/second approaches the single-instance throughput ceiling. Use ZINCRBY for prototyping or low-cardinality cases, switch to CMS when memory or Redis write throughput becomes a constraint.
How do you scale the query path? Pre-computed Redis sorted sets make every query a single ZREVRANGE command. Add Redis read replicas and route query API nodes to replicas. Query API nodes are stateless and scale horizontally with no coordination.
How does the system handle the 10,000:1 write/read asymmetry? Kafka decouples the write path (35K events/sec ingestion) from the read path (pre-computed snapshot reads). The two paths never share a transaction: events flow through Kafka into stream processors, and reads consume from Redis sorted sets that are updated separately on a 30-second snapshot cycle.
What is the error bound of Count-Min Sketch? The expected frequency overcount for any item is at most epsilon x (total events in the window). At epsilon=0.001 and 1B events per day, the absolute overcount is bounded at 1M events. For an item with 100M actual views, this is a 1% relative error. CMS never undercounts; estimates are always >= true frequency.