Prerequisite
Learn how to use Hive with Elastic MapReduce (EMR).
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive.html
Before launching an EMR job,
- create ${s3bucket}/emr/outputs for outputs
- optionally, create ${s3bucket}/emr/logs for logging
- put emr_hivemall_bootstrap.sh on ${s3bucket}/emr/conf
Then, lunch an EMR job with hive in an interactive mode. I'm usually lunching EMR instances with cheap Spot instances through CLI client as follows:
./elastic-mapreduce --create --alive \
--name "Hive cluster" \
--hive-interactive --hive-versions latest \
--hive-site=s3://${s3bucket}/emr/conf/hive-site.xml \
--ami-version latest \
--instance-group master --instance-type m1.medium --instance-count 1 --bid-price 0.175 \
--instance-group core --instance-type m1.large --instance-count 3 --bid-price 0.35 \
--enable-debugging --log-uri s3n://${s3bucket}/emr/logs \
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/run-if \
--args "instance.isMaster=true,s3://${s3bucket}/emr/conf/emr_hivemall_bootstrap.sh" --bootstrap-name "hivemall setup"
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia --bootstrap-name "install ganglia"
To use YARN instead of old Hadoop, specify "--ami-version 3.0.0". Hivemall works on both old Hadoop and YARN.
Or, lunch an interactive EMR job using the EMR GUI wizard.
Data preparation
Put training and test data in a TSV format on Amazon S3, e.g., on ${s3bucket}/datasets/news20b/[train|test].
create database news20;
use news20;
add jar ./tmp/hivemall.jar;
source ./tmp/define-all.hive;
set hivevar:s3bucket=YOUR_BUCKET_NAME;
-- The default input split size is often too large for Hivemall
set mapred.max.split.size=67108864;
Create external table news20b_train (
rowid int,
label int,
features ARRAY<STRING>
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ","
STORED AS TEXTFILE LOCATION 's3n://${s3bucket}/datasets/news20b/train';
Create external table news20b_test (
rowid int,
label int,
features ARRAY<STRING>
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ","
STORED AS TEXTFILE LOCATION 's3n://${s3bucket}/datasets/news20b/test';
-- create or replace view news20b_train_x3
-- as
-- select
-- *
-- from (
-- select
-- amplify(3, *) as (rowid, label, features)
-- from
-- news20b_train
-- ) t
-- CLUSTER BY CAST(rand(47) * 100 as INT), CAST(rand(49) * 100 as INT), CAST(rand(50) * 100 as INT);
create or replace view news20b_train_x3
as
select
rand_amplify(3, 1000, *) as (rowid, label, features)
from
news20b_train;
create table news20b_test_exploded as
select
rowid,
label,
cast(split(feature,":")[0] as int) as feature,
cast(split(feature,":")[1] as float) as value
from
news20b_test LATERAL VIEW explode(add_bias(features)) t AS feature;
Adaptive Regularization of Weight Vectors (AROW)
training
DROP TABLE news20b_arow_model1;
CREATE EXTERNAL TABLE IF NOT EXISTS news20b_arow_model1 (
feature string,
weight float
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 's3://${s3bucket}/emr/outputs/news20b_arow_model1';
insert overwrite table news20b_arow_model1
select
feature,
cast(voted_avg(weight) as float) as weight
from
(select
train_arow(add_bias(features),label) as (feature,weight)
from
news20b_train_x3
) t
group by feature;
prediction
create or replace view news20b_arow_predict1
as
select
t.rowid,
sum(m.weight * t.value) as total_weight,
case when sum(m.weight * t.value) > 0.0 then 1 else -1 end as label
from
news20b_test_exploded t LEFT OUTER JOIN
news20b_arow_model1 m ON (t.feature = m.feature)
group by
t.rowid;
evaluation
create or replace view news20b_arow_submit1 as
select
t.rowid,
t.label as actual,
pd.label as predicted
from
news20b_test t JOIN news20b_arow_predict1 pd
on (t.rowid = pd.rowid);
select count(1)/4996 from news20b_arow_submit1
where actual == predicted;
0.9659727782225781
Cleaning
drop table news20b_arow_model1;
drop view news20b_arow_predict1;
drop view news20b_arow_submit1;
Tips
We recommended users to use m1.xlarge running Hivemall on EMR as follows.
./elastic-mapreduce --create --alive \
--name "Hive cluster" \
--hive-interactive --hive-versions latest \
--ami-version latest \
--instance-group master --instance-type m1.xlarge --instance-count 1 \
--instance-group core --instance-type m1.xlarge --instance-count 8 --bid-price 0.7 \
--instance-group task --instance-type m1.xlarge --instance-count 2 --bid-price 0.7 \
--enable-debugging --log-uri s3://mybucket/emr/logs \
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop \
--args "-m,mapred.child.java.opts=-Xmx1536m,-m,mapred.tasktracker.map.tasks.maximum=7,-m,mapred.tasktracker.reduce.tasks.maximum=2,-c,fs.s3n.multipart.uploads.enable=true,-c,fs.s3n.multipart.uploads.split.size=67108864" \
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/run-if \
--args "instance.isMaster=true,s3://mybucket/emr/conf/emr_hivemall_bootstrap.sh" \
--bootstrap-name "hivemall setup" \
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia \
--bootstrap-name "install ganglia" \
--availability-zone ap-northeast-1a
Using spot instance for core/task instance groups is the best way to save your money.