a9a
https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#a9a
Data preparation
$ wget https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a
$ wget https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a.t
scala> :paste
park.read.format("libsvm").load("a9a")
.select($"label", to_hivemall_features($"features").as("features"))
.createOrReplaceTempView("rawTrainTable")
val (max, min) = sql("SELECT MAX(label), MIN(label) FROM rawTrainTable").collect.map {
case Row(max: Double, min: Double) => (max, min)
}.head
// `label` must be [0.0, 1.0]
sql(s"""
CREATE OR REPLACE TEMPORARY VIEW trainTable AS
SELECT rescale(label, $min, $max) AS label, features
FROM rawTrainTable
""")
scala> trainDf.printSchema
root
|-- label: float (nullable = true)
|-- features: vector (nullable = true)
scala> :paste
spark.read.format("libsvm").load("a9a.t")
.select($"label", to_hivemall_features($"features").as("features"))
.createOrReplaceTempView("rawTestTable")
sql(s"""
CREATE OR REPLACE TEMPORARY VIEW testTable AS
SELECT
rowid() AS rowid,
rescale(label, $min, $max) AS target,
features
FROM
rawTestTable
""")
// Caches data to fix row IDs
sql("CACHE TABLE testTable")
sql("""
CREATE OR REPLACE TEMPORARY VIEW testTable_exploded AS
SELECT
rowid,
target,
extract_feature(ft) AS feature,
extract_weight(ft) AS value
FROM (
SELECT
rowid,
target,
explode(features) AS ft
FROM
testTable
)
""")
scala> testDf.printSchema
root
|-- rowid: string (nullable = true)
|-- target: float (nullable = true)
|-- feature: string (nullable = true)
|-- value: double (nullable = true)
Tutorials
[Logistic Regression]
Training
scala> :paste
sql("""
CREATE OR REPLACE TEMPORARY VIEW modelTable AS
SELECT
feature, AVG(weight) AS weight
FROM (
SELECT
train_logistic_regr(add_bias(features), label) AS (feature, weight)
FROM
trainTable
)
GROUP BY
feature
""")
Test
scala> :paste
sql("""
CREATE OR REPLACE TEMPORARY VIEW predicted AS
SELECT
rowid,
CASE
WHEN sigmoid(sum(weight * value)) > 0.50 THEN 1.0
ELSE 0.0
END AS predicted
FROM
testTable_exploded t LEFT OUTER JOIN modelTable m
ON t.feature = m.feature
GROUP BY
rowid
""")
Evaluation
val num_test_instances = spark.table("testTable").count
sql(s"""
SELECT
count(1) / $num_test_instances AS eval
FROM
predicted p INNER JOIN testTable t
ON p.rowid = t.rowid
WHERE
p.predicted = t.target
""")
+------------------+
| eval|
+------------------+
|0.8327921286841418|
+------------------+