E2006
https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html#E2006-tfidf
Data preparation
$ wget https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/E2006.train.bz2
$ wget https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/E2006.test.bz2
scala> :paste
spark.read.format("libsvm").load("E2006.train.bz2")
.select($"label", to_hivemall_features($"features").as("features"))
.createOrReplaceTempView("rawTrainTable")
val (max, min) = sql("SELECT MAX(label), MIN(label) FROM rawTrainTable").collect.map {
case Row(max: Double, min: Double) => (max, min)
}.head
// `label` must be [0.0, 1.0]
sql(s"""
CREATE OR REPLACE TEMPORARY VIEW trainTable AS
SELECT rescale(label, $min, $max) AS label, features
FROM rawTrainTable
""")
scala> trainDf.printSchema
root
|-- label: float (nullable = true)
|-- features: vector (nullable = true)
scala> :paste
spark.read.format("libsvm").load("E2006.test.bz2")
.select($"label", to_hivemall_features($"features").as("features"))
.createOrReplaceTempView("rawTestTable")
sql(s"""
CREATE OR REPLACE TEMPORARY VIEW testTable AS
SELECT
rowid() AS rowid,
rescale(label, $min, $max) AS target,
features
FROM
rawTestTable
""")
// Caches data to fix row IDs
sql("CACHE TABLE testTable")
sql("""
CREATE OR REPLACE TEMPORARY VIEW testTable_exploded AS
SELECT
rowid,
target,
extract_feature(ft) AS feature,
extract_weight(ft) AS value
FROM (
SELECT
rowid,
target,
explode(features) AS ft
FROM
testTable
""")
scala> df.printSchema
root
|-- rowid: string (nullable = true)
|-- target: float (nullable = true)
|-- feature: string (nullable = true)
|-- value: double (nullable = true)
Tutorials
[AROWe2]
Training
scala> :paste
sql("""
CREATE OR REPLACE TEMPORARY VIEW modelTable AS
SELECT
feature, AVG(weight) AS weight
FROM (
SELECT
train_arowe2_regr(add_bias(features), label) AS (feature, weight)
FROM
trainTable
)
GROUP BY
feature
""")
Test
scala> :paste
sql("""
CREATE OR REPLACE TEMPORARY VIEW predicted AS
SELECT
rowid, sum(weight * value) AS predicted
FROM
testTable_exploded t LEFT OUTER JOIN modelTable m
ON t.feature = m.feature
GROUP BY
rowid
""")
Evaluation
scala> :paste
sql(s"""
SELECT
AVG(target), AVG(predicted)
FROM
predicted p INNER JOIN testTable t
ON p.rowid = t.rowid
""")
+------------------+------------------+
| avg(target)| avg(predicted)|
+------------------+------------------+
|0.5489154884487879|0.6030108853227014|
+------------------+------------------+