each_top_k(int k, ANY group, double value, arg1, arg2, ..., argN)
returns a top-k records for each group
. It returns a relation consists of (int rank, double value, arg1, arg2, .., argN)
.
This function is particularly useful for applying a similarity/distance function where the computation complexity is O(nm).
each_top_k
is very fast when compared to other methods running top-k queries (e.g., rank/distribute by
) in Hive.
Caution
each_top_k
is supported from Hivemall v0.3.2-3 or later.- This UDTF assumes that input records are sorted by
group
. UseDISTRIBUTE BY group SORT BY group
to ensure that. Or, you can useLEFT OUTER JOIN
for certain cases. - It takes variable lengths arguments in
argN
. - The third argument
value
is used for the comparison. Any number types
ortimestamp
are accepted for the type ofvalue
.- If k is less than 0, reverse order is used and
tail-K
records are returned for eachgroup
. - Note that this function returns a pseudo ranking for top-k. It always returns
at-most K
records for each group. The ranking scheme is similar todense_rank
but slightly different in certain cases.
Usage
Efficient Top-k Query Processing using each_top_k
Efficient processing of Top-k queries is a crucial requirement in many interactive environments that involve massive amounts of data.
Our Hive extension each_top_k
helps running Top-k processing efficiently.
- Suppose the following table as the input
student | class | score |
---|---|---|
1 | b | 70 |
2 | a | 80 |
3 | a | 90 |
4 | b | 50 |
5 | a | 70 |
6 | b | 60 |
- Then, list top-2 students for each class
student | class | score | rank |
---|---|---|---|
3 | a | 90 | 1 |
2 | a | 80 | 2 |
1 | b | 70 | 1 |
6 | b | 60 | 2 |
The standard way using SQL window function would be as follows:
SELECT
student, class, score, rank
FROM (
SELECT
student, class, score,
rank() over (PARTITION BY class ORDER BY score DESC) as rank
FROM
table
) t
WHRE rank <= 2
An alternative and efficient way to compute top-k items using each_top_k
is as follows:
SELECT
each_top_k(
2, class, score,
class, student -- output other columns in addition to rank and score
) as (rank, score, class, student)
FROM (
SELECT * FROM table
CLUSTER BY class -- Mandatory for `each_top_k`
) t
Note
CLUSTER BY x
is a synonym of DISTRIBUTE BY x SORT BY x
and required when using each_top_k
.
The function signature of each_top_k
is each_top_k(int k, ANY group, double value, arg1, arg2, ..., argN)
and it returns a relation (int rank, double value, arg1, arg2, .., argN)
.
Any number types or timestamp are accepted for the type of value
but it MUST be not NULL.
Do null hanlding like if(value is null, -1, value)
to avoid null.
If k
is less than 0, reverse order is used and tail-K records are returned for each group
.
The ranking semantics of each_top_k
follows SQL's dense_rank
and then limits results by k
.
Caution
each_top_k
is benefical where the number of grouping keys are large. If the number of grouping keys are not so large (e.g., less than 100), consider using rank() over
instead.
top-k clicks
set hivevar:k=5;
select
page-id,
user-id,
clicks
from (
select
each_top_k(${k}, page-id, clicks, page-id, user-id)
as (rank, clicks, page-id, user-id)
from (
select
page-id, user-id, clicks
from
mytable
DISTRIBUTE BY page-id SORT BY page-id
) t1
) t2
order by page-id ASC, clicks DESC;
Top-k similarity computation
set hivevar:k=10;
SELECT
each_top_k(
${k}, t2.id, angular_similarity(t2.features, t1.features),
t2.id,
t1.id,
t1.y
) as (rank, similarity, base_id, neighbor_id, y)
FROM
test_hivemall t2
LEFT OUTER JOIN train_hivemall t1;
rank | similarity | base_id | neighbor_id | y |
---|---|---|---|---|
1 | 0.8594650626182556 | 12 | 10514 | 0 |
2 | 0.8585299849510193 | 12 | 11719 | 0 |
3 | 0.856602132320404 | 12 | 21009 | 0 |
4 | 0.8562054634094238 | 12 | 17582 | 0 |
5 | 0.8516314029693604 | 12 | 22006 | 0 |
6 | 0.8499397039413452 | 12 | 25364 | 0 |
7 | 0.8467264771461487 | 12 | 900 | 0 |
8 | 0.8463355302810669 | 12 | 8018 | 0 |
9 | 0.8439178466796875 | 12 | 7041 | 0 |
10 | 0.8438876867294312 | 12 | 21595 | 0 |
1 | 0.8390793800354004 | 25 | 21125 | 0 |
2 | 0.8344510793685913 | 25 | 14073 | 0 |
3 | 0.8340602517127991 | 25 | 9008 | 0 |
4 | 0.8328862190246582 | 25 | 6598 | 0 |
5 | 0.8301891088485718 | 25 | 943 | 0 |
6 | 0.8271955251693726 | 25 | 20400 | 0 |
7 | 0.8255619406700134 | 25 | 10922 | 0 |
8 | 0.8241575956344604 | 25 | 8477 | 0 |
9 | 0.822281539440155 | 25 | 25977 | 0 |
10 | 0.8205751180648804 | 25 | 21115 | 0 |
1 | 0.9761330485343933 | 34 | 2513 | 0 |
2 | 0.9536819458007812 | 34 | 8697 | 0 |
3 | 0.9531533122062683 | 34 | 7326 | 0 |
4 | 0.9493276476860046 | 34 | 15173 | 0 |
5 | 0.9480557441711426 | 34 | 19468 | 0 |
.. | .. | .. | .. | .. |
Explicit grouping using distribute by
and sort by
SELECT
each_top_k(
10, id1, angular_similarity(features1, features2),
id1,
id2,
y
) as (rank, similarity, id, other_id, y)
FROM (
select
t1.id as id1,
t2.id as id2,
t1.features as features1,
t2.features as features2,
t1.y
from
train_hivemall t1
CROSS JOIN test_hivemall t2
DISTRIBUTE BY id1 SORT BY id1
) t;
Parallelization of similarity computation using WITH clause
create table similarities
as
WITH test_rnd as (
select
rand(31) as rnd,
id,
features
from
test_hivemall
),
t01 as (
select
id,
features
from
test_rnd
where
rnd < 0.2
),
t02 as (
select
id,
features
from
test_rnd
where
rnd >= 0.2 and rnd < 0.4
),
t03 as (
select
id,
features
from
test_rnd
where
rnd >= 0.4 and rnd < 0.6
),
t04 as (
select
id,
features
from
test_rnd
where
rnd >= 0.6 and rnd < 0.8
),
t05 as (
select
id,
features
from
test_rnd
where
rnd >= 0.8
),
s01 as (
SELECT
each_top_k(
10, t2.id, angular_similarity(t2.features, t1.features),
t2.id,
t1.id,
t1.y
) as (rank, similarity, base_id, neighbor_id, y)
FROM
t01 t2
LEFT OUTER JOIN train_hivemall t1
),
s02 as (
SELECT
each_top_k(
10, t2.id, angular_similarity(t2.features, t1.features),
t2.id,
t1.id,
t1.y
) as (rank, similarity, base_id, neighbor_id, y)
FROM
t02 t2
LEFT OUTER JOIN train_hivemall t1
),
s03 as (
SELECT
each_top_k(
10, t2.id, angular_similarity(t2.features, t1.features),
t2.id,
t1.id,
t1.y
) as (rank, similarity, base_id, neighbor_id, y)
FROM
t03 t2
LEFT OUTER JOIN train_hivemall t1
),
s04 as (
SELECT
each_top_k(
10, t2.id, angular_similarity(t2.features, t1.features),
t2.id,
t1.id,
t1.y
) as (rank, similarity, base_id, neighbor_id, y)
FROM
t04 t2
LEFT OUTER JOIN train_hivemall t1
),
s05 as (
SELECT
each_top_k(
10, t2.id, angular_similarity(t2.features, t1.features),
t2.id,
t1.id,
t1.y
) as (rank, similarity, base_id, neighbor_id, y)
FROM
t05 t2
LEFT OUTER JOIN train_hivemall t1
)
select * from s01
union all
select * from s02
union all
select * from s03
union all
select * from s04
union all
select * from s05;
tail-K
set hivevar:k=-10;
SELECT
each_top_k(
${k}, t2.id, angular_similarity(t2.features, t1.features),
t2.id,
t1.id,
t1.y
) as (rank, similarity, base_id, neighbor_id, y)
FROM
test_hivemall t2
LEFT OUTER JOIN train_hivemall t1
-- limit 25
rank | similarity | base_id | neighbor_id | y |
---|---|---|---|---|
1 | 0.4383084177970886 | 1 | 7503 | 0 |
2 | 0.44166821241378784 | 1 | 10143 | 0 |
3 | 0.4424300789833069 | 1 | 11073 | 0 |
4 | 0.44254064559936523 | 1 | 17782 | 0 |
5 | 0.4442034363746643 | 1 | 18556 | 0 |
6 | 0.45163780450820923 | 1 | 3786 | 0 |
7 | 0.45244503021240234 | 1 | 10242 | 0 |
8 | 0.4525672197341919 | 1 | 21657 | 0 |
9 | 0.4527127146720886 | 1 | 17218 | 0 |
10 | 0.45314133167266846 | 1 | 25141 | 0 |
1 | 0.44030147790908813 | 2 | 3786 | 0 |
2 | 0.4408798813819885 | 2 | 23386 | 0 |
3 | 0.44112563133239746 | 2 | 11073 | 0 |
4 | 0.4415401816368103 | 2 | 22853 | 0 |
5 | 0.4422193765640259 | 2 | 21657 | 0 |
6 | 0.4429032802581787 | 2 | 10143 | 0 |
7 | 0.4435907006263733 | 2 | 24413 | 0 |
8 | 0.44569307565689087 | 2 | 7503 | 0 |
9 | 0.4460843801498413 | 2 | 25141 | 0 |
10 | 0.4464914798736572 | 2 | 24289 | 0 |
1 | 0.43862903118133545 | 3 | 23150 | 1 |
2 | 0.4398220181465149 | 3 | 9881 | 1 |
3 | 0.44283604621887207 | 3 | 27121 | 0 |
4 | 0.4432108402252197 | 3 | 26220 | 1 |
5 | 0.44323229789733887 | 3 | 18541 | 0 |
.. | .. | .. | .. | .. |
Alternative approaches
In order to utilize mapper-side aggregation and reduce computational cost of shuffling, you can use to_ordered_map
or to_ordered_list
to get top/tail-k elements instead of each_top_k
.
As long as key
is unique in each id
, the following queries return same result:
with t as (
select
each_top_k(
10, id, key,
id, value
) as (rank, key, id, value)
from (
select
*
from
test
cluster by
id
) t
)
select
id, collect_list(value) as topk
from
t
group by
id
with t as (
select
id, to_ordered_map(key, value, 10) as m
from
test
group by
id
)
select
id, collect_list(value) as topk
from
t
lateral view explode(m) t as key, value
group by
id
select
id, to_ordered_list(value, key, '-k 10') as topk
from
test
group by
id
Caution
In case that key
could duplicate in id
, to_ordered_map
behaves differently because key K
is always unique in Map<K, V>
.
Similarly to each_top_k
, tail-k can also be represented as: to_ordered_map(key, value, -10)
and to_ordered_list(value, key, '-k -10')
.