How do we count distinct rolling 90 day active users in a data set of billions?
At Pex, we actively monitor all visible content on our supported platforms – over 40 social and UGC sites – leading to the world’s largest video and audio database. Since we’re indexing exabytes of data, our datasets are in the billions.
Sometimes data analysis queries or workflows that could easily be run over a smaller dataset, struggle to compute with the sets we have at Pex.
I was working on this problem – counting distinct rolling 90 day active users – for Pex and was unable to find good resources on how to solve this at our scale. I eventually found a solution and wanted to share it for others facing this issue or anyone interested in working with big data. If you’re interested in how we use data like this at Pex, check out this blog post.
Now, how do we count distinct rolling 90 day active users in a data set of billions?
Let’s imagine we have a data set that looks like the following and represents ~6 months of time:
Every time a user uploads an asset to the platform, a new row is created.
There are ~15+ billion records for the example we are looking at, but
user_id is not unique across the data set. (A user could have multiple
asset_ids over various days and times, even multiple within one day.)
user_ids are strings of 24 bytes each.
Now let’s start counting. Any time we need to count distinct values, we will run into the count distinct problem. To find an exact cardinality count, it will always have a proportional space requirement, making the query very memory extensive (and therefore costly) for a large dataset like ours.
To make it more complicated, what if we want to, with this large data set, count the distinct active users over the last 90 days, for each day?
We will define active as a user_id that has an associated record with a
created_at of within the last 90 days, relative to the date we are looking at.
For example, if we were looking at the value associated with
9/1/2019, we would want the result to count all the distinct users between
6/4/2019 and 9/1/2019. For
9/2/2019, the window shifts to
6/5/2019 and 9/2/2019 and so on.
Step one: create a query showing the number of distinct users per day
The first step towards our goal is to create a simple query showing the number of distinct users per day.
|DATE(created_at) AS day,|
|COUNT(DISTINCT user_id) AS distinct_users|
In BigQuery, statistics are displayed for each query showing additional details about resource consumption. This is extremely useful, as BigQuery’s billing increases with the amount of data you process in a query.
BigQuery also includes an
APPROX_COUNT_DISTINCT function which can greatly increase our query speed, while remaining accurate enough for our concerns.
|DATE(created_at) AS day,|
|APPROX_COUNT_DISTINCT(user_id) AS distinct_users|
APPROX_COUNT_DISTINCT results in a query that is 2.97x faster. You can also see how it is much less resource intensive in slot time and the amount of bytes shuffled.
Step two: calculate active users across the 90 day rolling window
So now we have a baseline, providing the distinct active users per day, but what if we want to calculate active users across the 90 day rolling window, as we mentioned at the start?
Let’s try to extend our query, by applying the count over a window function of the last 90 days.
Since we need to group or aggregate
user_id, let’s add
ninety_day_window_approx to our
We cannot group by an analytic function in BigQuery.
To work around this, we can try to wrap most of the query in a subquery, and do the grouping later.
BigQuery initially thinks this is valid, but upon running the query, we get this error.
Hmm… since we need to aggregate this count over the window, one might be inclined to simply
SUM the counts of distinct users per day, in a 90 day window. Problem solved, right?
|WITH daily AS (|
|SUM(APPROX_COUNT_DISTINCT(user_id)) OVER (ORDER BY day ROWS BETWEEN 89 PRECEDING AND CURRENT ROW) ninty_day_window_approx|
This will run, but result in an incorrect answer to our question.
Why does the method of using
SUM not solve our problem?
This only sums the distinct number of users per day – duplication could be occurring within the window.
What if a user was active on Day 1 and Day 2?
They will be double counted within the 90 day window count of Day 2.
So this is unfortunately not an effective measure of distinct users over the 90 day window.
A new attempt
Let’s try the following to successfully count the distinct user_ids over the 90 day window. This provides a workaround to the fact that ORDER BY cannot be used with COUNT DISTINCT.
- First, we concatenate all the distinct user_ids for each day to a string –
- For each day and the 89 preceding (90 day period), concatenate all the daily distinct user_ids strings obtained in the previous step together (thus storing a larger set of user_ids that we could later count the distinct values from).
- Then split and unnest the final aggregation string, and count the distinct user ids within this, using
|-- build the arrays of distinct user ids per day|
|daily AS (|
|STRING_AGG(DISTINCT user_id) users|
|-- build an array of the 90 day window of users arrays (will be massive)|
|ninety_day_window AS (|
|STRING_AGG(users) OVER (ORDER BY UNIX_DATE (day)|
|RANGE BETWEEN 89 PRECEDING AND CURRENT ROW) users|
|UNNEST(SPLIT(users)) AS id) unique90days|
This will have to shuffle a lot of data.
This might work fine for a smaller dataset, but thinking about the memory needed to store each distinct user_id in a string for each day, and then concatenating 90 of these together to represent the 90 day window, means that it will most likely fail with any large data set. Let’s do some math to verify why that is.
user_id is 24 bytes, and each day contains ~10,000,000 distinct users:
(24 bytes * ~10,000,000) = ~240,000,000 bytes /0.24GB
Then each 90 day window string could possibly contain up to:
(~240,000,000 bytes * 90) = ~21,600,000,000 bytes /21.6GB.
Multiply 21.6Gb by each day in the 6 months, and we’re looking at around ~3.8+ Tb of data to be shuffled just to process this query.
But let’s give it a try anyway:
When a slot’s resources are overwhelmed, a resources_exceeded error results. Reaching the shuffle limit for a slot (2TB in memory compressed) also causes the shuffle to write to disk and further impacts performance.
This methodology might work on a smaller data set, but it won’t work for this.
So, how do we solve this?
Step three: use the Hyperloglog++ algorithm
How do we obtain what we are looking for (distinct active users within the 90 day window), while remaining within the resources allocated and being more efficient?
The answer is HyperLogLog!
BigQuery supports approximate aggregation functions using the HyperLogLog++ algorithm.
You can read more details in their documentation here.
Instead of storing an array of each distinct user ID for the day, and then combining these, we can store the sketch of the estimated cardinality of the user_ids seen in that group. We then can combine these sketches across the 90 day window to represent the union of the input sketches and then extract the numeric estimate from the sketch.
An example of what a HyperLogLog sketch in BigQuery looks like:
which represents an estimate of the cardinality of the following set:
('user-id-1-mre2-hv-24byte', 'user-id-1-mre2-hv-24byte', 'user-id-2-mre2-hv-24byte') (2 distinct strings, 3 total strings of 24 bytes each)
How accurate is this estimate?
From the table, we can see that the default precision setting states that the 95% confidence interval is 0.57%. Not bad.
Since we will need these sketches to stay small, we will stay with the default precision of 15.
Let’s see how effective the HLL++ algorithm is using the three sample values above:
|user_ids AS (|
|'user-id-1' AS user_id|
|'user-id-1' AS user_id|
|'user-id-2' AS user_id )|
|HLL_COUNT.init(user_id) as HyperLogLog_Sketch,|
|HLL_COUNT.extract(HLL_COUNT.init(user_id)) as Distinct_Count_from_HyperLogLog_sketch|
The HyperLogLog sketch is only 25 bytes, compared to our previous method of the concatenation of each user-id, which would be 49 bytes (the two distinct
user_ids of 24 bytes each, plus a comma to separate them (1 byte) = 49 bytes)
Extracting the count from the sketch shows us the estimated distinct count is
The true storage savings will be realized when looking at a larger data set.
A sketch representing the estimated cardinality of 50,000,000+ records can be shown using only ~32,000 bytes!
If we recall our previous idea of string concatenation of the distinct values, this would have been (24 bytes * 50,000,000) = 1,200,000,000 bytes / 1.2Gb.
Therefore, the HyperLogLog sketch of 50M+ records is ~37,500 times smaller!
This will be much more efficient, and can actually run on such a large data set in BigQuery. We now could actually retain these for our 90 day window and not run out of space.
Step four: implement Hyperloglog
1. Build the HyperLogLog sketch of distinct user ids per day.
SELECT date(created_at), HLL_COUNT.init(user_id) FROM `example-project.ivan/article_example_dataset` GROUP BY 1 ORDER BY 1
2. Build an array containing each daily sketch in the 90 day rolling window (now possible because of the small size of the HyperLogLog sketches).
ARRAY_AGG(hll_sketch) OVER (partition by unix_date(date) RANGE BETWEEN 89 PRECEDING AND CURRENT ROW)
3. Unnest the array and merge the sketches into a final sketch representing the cardinality of the 90 day window.
4. Then extract the numeric estimate from the final sketch.
We can merge the sketches and extract the numeric estimation in the same step, by using
If we wanted the sketch itself of the merged sketch, like for the visualization above, we would use
|WITH daily AS (|
|-- build the HLL sketch of distinct user ids per day|
|-- build an array of the sketches of users|
|ninety_day_window AS (|
|ARRAY_AGG(users_sketch) OVER (ORDER BY UNIX_DATE (day)|
|RANGE BETWEEN 89 PRECEDING AND CURRENT ROW) rolling_sketch_array|
|UNNEST(rolling_sketch_array) sketches) rolling_sketch|
Thanks to Felipe and Mikhail on StackOverflow for helping with the syntax needed to merge the final sketches!
Our query returns results in ~30 seconds, and only had to shuffle ~47GB around while calculating it. Better yet, it was actually able to run without reaching the shuffle limit on a slot!
A more detailed look at the worker timing shows the following:
We can see that the largest stages were in the initial steps of truncating the timestamps to dates and grouping by these days while creating the HyperLogLog sketch for each day (Stage 00), and then later, merging the sketches across the 90 day windows (Stage 05).
For more usage of HyperLogLog in BigQuery, I recommend reading through the documentation on their HyperLogLog functions.
Want to work with us at Pex?
We’re always looking for new talent. Check out our open positions.