Description
Company: Databricks
Problem Requirements
You need to design a "Hit Counter" that stores data in memory. This system tracks how many times an event (a "hit") happens. It must also calculate the average load, known as Queries Per Second (QPS).
The system needs to work for different time ranges, from a few seconds to several days.
Focus areas:
- Making smart decisions.
- Writing clean code.
- Understanding the pros and cons of different data structures.
What We Need to Build
Create a class called HitCounter with these methods:
hit(timestamp): Save a record that a hit happened at a specific time (in seconds).get_load(seconds): Return the total hits that happened in the lastsecondsamount of time.get_qps(seconds): Calculate the average hits per second. This is justget_load(seconds) / seconds.
Start simple: First, make it work for a 5-minute (300-second) window. Later: We will extend this to handle hours or weeks.
How It Works (Example)
counter = HitCounter()
# Record hits at different times
counter.hit(1)
counter.hit(2)
counter.hit(2) # Two hits happened at the same second
counter.hit(3)
counter.hit(150)
counter.hit(301)
# Check load for the last 300 seconds (starting from time 301)
# The window is from time 1 to 301.
# It includes hits at 2, 2, 3, 150, 301.
load = counter.get_load(300) # Returns 5 (The hit at time 1 is too old)
# Check load for the last 200 seconds (starting from time 301)
# The window is from time 101 to 301.
# It includes only hits at 150 and 301.
load = counter.get_load(200) # Returns 2
# Calculate QPS
qps = counter.get_qps(300) # Returns 5/300 which is approx 0.0167
Rules and Limitations
- Timestamps always go up (they never decrease), but duplicates are allowed.
- The time window (
seconds) can be between 1 and 1 billion. - High Traffic: The system must handle thousands of
hit()calls every second. - Memory: Don't waste RAM. It should work well if hits are frequent or rare.
- Threads: You do not need to worry about thread safety yet (we will add that later).
Approach 1: Using a Queue (Deque)
The Idea
We use a deque (double-ended queue) to make a sliding window.
- Store data as
(timestamp, count)inside the deque. - Each item in the deque holds all hits for one specific second.
- When a hit comes: Increase the count of the last item, or add a new item if the time has changed.
- When asking for load:
- Remove items from the front if they are too old.
- Add up the counts of all items left in the queue.
Speed (Time Complexity)
hit():O(1)on average. We just update the end of the queue.get_load():O(n). Here,nis the number of seconds with data. We have to loop through them to sum the total.
Memory (Space Complexity)
O(w). w is the window size in seconds.
For 5 minutes, we store at most 300 items.
Code Solution
from collections import deque
import time
class HitCounter:
def __init__(self, window_seconds=300):
"""Initialize counter with time window in seconds."""
self.window_seconds = window_seconds
self.hits = deque() # Stores (timestamp, count) tuples
def hit(self, timestamp=None):
"""Record a hit at the given timestamp (or current time)."""
if timestamp is None:
timestamp = int(time.time())
# If same timestamp as last entry, increment its count
if self.hits and self.hits[-1][0] == timestamp:
old_count = self.hits[-1][1]
self.hits[-1] = (timestamp, old_count + 1)
else:
# New timestamp, append new entry
self.hits.append((timestamp, 1))
def get_load(self, seconds=None):
"""Get total hits in the past 'seconds' seconds."""
if seconds is None:
seconds = self.window_seconds
if not self.hits:
return 0
# Current time is the most recent hit
current_time = self.hits[-1][0]
cutoff_time = current_time - seconds
# Remove old entries outside the window
while self.hits and self.hits[0][0] <= cutoff_time:
self.hits.popleft()
# Sum remaining hits
return sum(count for timestamp, count in self.hits)
def get_qps(self, seconds=None):
"""Calculate average QPS over the time window."""
if seconds is None:
seconds = self.window_seconds
load = self.get_load(seconds)
return load / seconds if seconds > 0 else 0
# Usage Example
counter = HitCounter(window_seconds=300)
counter.hit(1)
counter.hit(2)
counter.hit(2)
counter.hit(3)
counter.hit(150)
counter.hit(301)
print(counter.get_load(300)) # 5
print(counter.get_qps(300)) # 0.0167
Problems with this Approach
- Reading is slow:
get_load()takesO(n). If you query often, it is slow because it has to sum the list every time. - Wastes space for rare hits: If you have one hit every hour, you still keep entries for a long time.
Approach 2: Using a Circular Array
The Idea
We use a fixed-size array as a "circular buffer."
- The array size equals the window size (e.g., 300 slots for 5 minutes).
- We find the correct slot using modulo:
index = timestamp % 300. - When a hit comes:
- Go to the calculated index.
- If the time stored there matches the current time, increase the count.
- If the time stored is old, reset it to the current time and set count to 1.
- When asking for load: Look at every slot in the array and add up the counts that are still valid (inside the time window).
Speed (Time Complexity)
hit():O(1). Accessing an array index is instant.get_load():O(w).wis the array size. We must look at every slot.
Memory (Space Complexity)
O(w). The memory usage is fixed, no matter how many hits occur.
Code Solution
import time
class HitCounterArray:
def __init__(self, window_seconds=300):
"""Initialize with fixed-size circular array."""
self.window_seconds = window_seconds
# Each bucket stores (timestamp, count)
self.buckets = [(0, 0) for _ in range(window_seconds)]
self.current_timestamp = 0
def hit(self, timestamp=None):
"""Record a hit at the given timestamp."""
if timestamp is None:
timestamp = int(time.time())
self.current_timestamp = max(self.current_timestamp, timestamp)
bucket_index = timestamp % self.window_seconds
bucket_timestamp, bucket_count = self.buckets[bucket_index]
if bucket_timestamp == timestamp:
# Same second, increment count
self.buckets[bucket_index] = (timestamp, bucket_count + 1)
else:
# Different second, reset bucket
self.buckets[bucket_index] = (timestamp, 1)
def get_load(self, seconds=None):
"""Get total hits in the past 'seconds' seconds."""
if seconds is None:
seconds = self.window_seconds
cutoff_time = self.current_timestamp - seconds
total = 0
for bucket_timestamp, bucket_count in self.buckets:
if bucket_timestamp > cutoff_time:
total += bucket_count
return total
def get_qps(self, seconds=None):
"""Calculate average QPS."""
if seconds is None:
seconds = self.window_seconds
load = self.get_load(seconds)
return load / seconds if seconds > 0 else 0
# Usage Example
counter = HitCounterArray(window_seconds=300)
counter.hit(1)
counter.hit(2)
counter.hit(2)
counter.hit(3)
counter.hit(150)
counter.hit(301)
print(counter.get_load(300)) # 5
print(counter.get_qps(300)) # 0.0167
Pros and Cons
Good:
- Fast writes:
hit()is always fast. - Predictable memory: You know exactly how much RAM it needs.
Bad:
- Slow reads:
get_load()checks the whole array. - Empty space: It reserves memory for every second, even if no hits happen.
Approach 3: Optimized Array with Running Total
The Idea
This improves Approach 2. We keep a running total variable.
- Maintain a
totalvariable that tracks the sum of all hits in the window. - When a hit comes:
- If the bucket was empty or old: Subtract the old bucket value from
total, add the new value, and updatetotal. - If the bucket is current: Just add 1 to the bucket and
total.
- If the bucket was empty or old: Subtract the old bucket value from
- When asking for load:
- Simply return the
totalvariable.
- Simply return the
- Note: We might need to quickly clean up very old buckets first to make sure
totalis accurate.
Speed (Time Complexity)
hit():O(1)usually. OccasionallyO(w)if we need to clean up many old buckets at once.get_load():O(1). We just return the pre-calculated number.
Memory (Space Complexity)
O(w). The memory usage is fixed, no matter how many hits occur.
Code Solution
import time
class HitCounterOptimized:
def __init__(self, window_seconds=300):
"""Initialize optimized counter with cached total."""
self.window_seconds = window_seconds
self.buckets = [(0, 0) for _ in range(window_seconds)]
self.total = 0
self.current_timestamp = 0
def _cleanup_stale_buckets(self, timestamp):
"""Remove buckets outside the current window."""
cutoff_time = timestamp - self.window_seconds
for i in range(len(self.buckets)):
bucket_timestamp, bucket_count = self.buckets[i]
if 0 < bucket_timestamp <= cutoff_time:
# Bucket is stale, clear it
self.buckets[i] = (0, 0)
self.total -= bucket_count
def hit(self, timestamp=None):
"""Record a hit."""
if timestamp is None:
timestamp = int(time.time())
# Clean up stale data before processing hit
if timestamp > self.current_timestamp:
self._cleanup_stale_buckets(timestamp)
self.current_timestamp = timestamp
bucket_index = timestamp % self.window_seconds
bucket_timestamp, bucket_count = self.buckets[bucket_index]
if bucket_timestamp == timestamp:
# Same second, increment
self.buckets[bucket_index] = (timestamp, bucket_count + 1)
self.total += 1
else:
# New second, replace bucket
if bucket_timestamp > 0:
# Subtract old bucket count
self.total -= bucket_count
self.buckets[bucket_index] = (timestamp, 1)
self.total += 1
def get_load(self, seconds=None):
"""Get total hits (O(1) if recently cleaned)."""
if seconds is None or seconds == self.window_seconds:
# Return cached total for default window
return self.total
# Variable window size requires scan
cutoff_time = self.current_timestamp - seconds
total = 0
for bucket_timestamp, bucket_count in self.buckets:
if bucket_timestamp > cutoff_time:
total += bucket_count
return total
def get_qps(self, seconds=None):
"""Calculate average QPS."""
if seconds is None:
seconds = self.window_seconds
load = self.get_load(seconds)
return load / seconds if seconds > 0 else 0
# Usage Example
counter = HitCounterOptimized(window_seconds=300)
for t in range(1, 100):
counter.hit(t)
print(counter.get_load()) # Fast O(1) lookup
print(counter.get_qps()) # Fast O(1) calculation
Follow-Up 1: Handling Frequent Queries
Question: What if get_load() is called thousands of times per second?
Solution: Use a Cache.
- Save the result of the last query.
- If
hit()is called, the data changes, so clear the cache. - If
get_load()is called again before the nexthit, just return the saved result.
class HitCounterWithCache(HitCounterOptimized):
def __init__(self, window_seconds=300):
super().__init__(window_seconds)
self.cache = {} # (seconds) -> (timestamp, result)
def hit(self, timestamp=None):
"""Record hit and invalidate cache."""
super().hit(timestamp)
# Invalidate all cached queries
self.cache.clear()
def get_load(self, seconds=None):
"""Get load with caching."""
if seconds is None:
seconds = self.window_seconds
# Check cache
if seconds in self.cache:
cached_timestamp, cached_result = self.cache[seconds]
if cached_timestamp == self.current_timestamp:
return cached_result
# Compute and cache
result = super().get_load(seconds)
self.cache[seconds] = (self.current_timestamp, result)
return result
Follow-Up 2: Different Time Ranges (Hours to Weeks)
Question: How do you handle asking for the last 5 minutes AND the last 1 week? An array with 1-second slots for a week is too big.
Solution: Multi-Level Buckets. Use different arrays with different detail levels (granularity):
- 5-minute bucket: 1 bucket per second (Size: 300).
- 24-hour bucket: 1 bucket per 30 minutes (Size: 48).
- 1-week bucket: 1 bucket per 12 hours (Size: 14).
How to query:
- Need last 5 mins? Check the 1-second array.
- Need last 24 hours? Check the 30-minute array.
import time
class MultiWindowHitCounter:
def __init__(self):
# Different granularities
self.buckets_1s = [(0, 0) for _ in range(300)] # 5 min, 1s granularity
self.buckets_30m = [(0, 0) for _ in range(48)] # 24h, 30min granularity
self.buckets_12h = [(0, 0) for _ in range(14)] # 1 week, 12h granularity
self.current_timestamp = 0
def hit(self, timestamp=None):
"""Record hit in all bucket levels."""
if timestamp is None:
timestamp = int(time.time())
self.current_timestamp = timestamp
# Update 1-second bucket
idx_1s = timestamp % 300
ts, count = self.buckets_1s[idx_1s]
if ts == timestamp:
self.buckets_1s[idx_1s] = (ts, count + 1)
else:
self.buckets_1s[idx_1s] = (timestamp, 1)
# Update 30-minute bucket
bucket_30m = timestamp // 1800 # 1800 seconds = 30 min
idx_30m = bucket_30m % 48
ts, count = self.buckets_30m[idx_30m]
if ts == bucket_30m:
self.buckets_30m[idx_30m] = (ts, count + 1)
else:
self.buckets_30m[idx_30m] = (bucket_30m, 1)
# Update 12-hour bucket
bucket_12h = timestamp // 43200 # 43200 seconds = 12 hours
idx_12h = bucket_12h % 14
ts, count = self.buckets_12h[idx_12h]
if ts == bucket_12h:
self.buckets_12h[idx_12h] = (ts, count + 1)
else:
self.buckets_12h[idx_12h] = (bucket_12h, 1)
def get_load(self, seconds):
"""Query appropriate bucket based on time window."""
if seconds <= 300:
# Use 1-second granularity bucket
return self._query_bucket(self.buckets_1s, seconds, 1)
elif seconds <= 86400: # 24 hours
# Use 30-minute granularity bucket
return self._query_bucket(self.buckets_30m, seconds, 1800)
elif seconds <= 604800: # 1 week
# Use 12-hour granularity bucket
return self._query_bucket(self.buckets_12h, seconds, 43200)
else:
# Fall back to approximation or add more levels
return self._query_bucket(self.buckets_12h, seconds, 43200)
def _query_bucket(self, buckets, seconds, granularity):
"""Helper to query a specific bucket array."""
cutoff = self.current_timestamp - seconds
total = 0
for bucket_timestamp, bucket_count in buckets:
# Convert bucket timestamp back to seconds
if granularity == 1:
actual_timestamp = bucket_timestamp
else:
actual_timestamp = bucket_timestamp * granularity
if actual_timestamp > cutoff:
total += bucket_count
return total
Follow-Up 3: Handling Multiple Threads
Question: What if multiple threads call hit() at the exact same time?
Solution: Locks. Use a standard Lock (or RLock) to make sure only one thread can edit the data at a time.
import threading
from threading import RLock
class ThreadSafeHitCounter(HitCounterOptimized):
def __init__(self, window_seconds=300):
super().__init__(window_seconds)
self.lock = RLock() # Reentrant lock
def hit(self, timestamp=None):
"""Thread-safe hit recording."""
with self.lock:
super().hit(timestamp)
def get_load(self, seconds=None):
"""Thread-safe load query."""
with self.lock:
return super().get_load(seconds)
Follow-Up 4: Scaling to Many Machines
Question: How do we count hits if the system runs on 100 different servers?
Options:
- Central Server: All servers send their counts to one main server.
- Pros: Simple.
- Cons: If the main server crashes, everything breaks.
- Stream Processing (Kafka): Send all hits to a message queue like Kafka. A processing tool (like Flink) adds them up.
- Pros: Very scalable.
- Cons: Harder to set up.
- Tree Structure: Servers group up. A "parent" server adds up hits from "child" servers.
- Pros: Less network traffic.
Follow-Up 5: Saving Memory for Low Traffic
Question: What if the time window is huge (24 hours) but you only get 10 hits a day? An array would be mostly empty zeros.
Solution: Use a HashMap. Don't use a fixed array. Use a dictionary (HashMap) to store timestamp -> count. Only store seconds where hits actually happened.
import time
class SparseHitCounter:
def __init__(self, window_seconds=300):
self.window_seconds = window_seconds
self.hits = {} # timestamp -> count
self.current_timestamp = 0
def hit(self, timestamp=None):
"""Record hit in sparse map."""
if timestamp is None:
timestamp = int(time.time())
self.current_timestamp = max(self.current_timestamp, timestamp)
self.hits[timestamp] = self.hits.get(timestamp, 0) + 1
def get_load(self, seconds=None):
"""Query with cleanup."""
if seconds is None:
seconds = self.window_seconds
cutoff = self.current_timestamp - seconds
# Clean up old entries
to_delete = [ts for ts in self.hits if ts <= cutoff]
for ts in to_delete:
del self.hits[ts]
return sum(self.hits.values())
Special Cases
When writing your code, watch out for these situations:
- Empty Data: What happens if
get_load()is called before any hits? (Should return 0). - Duplicates: Multiple hits in the exact same second.
- Clock Skew: If one server's clock is behind another server's clock.
- Huge Windows: If the window is too large, the array might consume too much memory.
Important Takeaways
- Speed vs. Memory: Arrays are fast but can waste space. HashMaps save space but can be slower to process.
- Granularity: You don't always need 1-second precision. For a yearly report, 1-day precision is fine.
- Caching: If you read data more than you write it, caching makes things much faster.
- Real World Uses: This logic is used for Rate Limiting (stopping spammers), Metrics (tracking server load), and Analytics.