ScaDaMaLe Course site and book

Compute RSVD

Here we read the preprcessed data and compute the rSVD

import com.criteo.rsvd._ import scala.util.Random import org.apache.spark.mllib.linalg.distributed.MatrixEntry import org.apache.spark.sql.functions.{min, max}
import com.criteo.rsvd._ import scala.util.Random import org.apache.spark.mllib.linalg.distributed.MatrixEntry import org.apache.spark.sql.functions.{min, max}
// code snippet for saving config as json val config_map = Map("embeddingDim" -> 100, "oversample" -> 30, "powerIter" -> 1, "seed" -> 0, "blockSize" -> 50000, "partitionWidthInBlocks" -> 35, "partitionHeightInBlocks" -> 10) val config_spark_save = config_map.toSeq.toDF("key","value") config_spark_save.write.mode("overwrite").json("/projects/group21/rsvd_config.json")
config_map: scala.collection.immutable.Map[String,Int] = Map(seed -> 0, oversample -> 30, blockSize -> 50000, partitionWidthInBlocks -> 35, partitionHeightInBlocks -> 10, powerIter -> 1, embeddingDim -> 100) config_spark_save: org.apache.spark.sql.DataFrame = [key: string, value: int]
// load config from json (assuming only integer values) val config_spark = spark.read.json("/projects/group21/rsvd_config.json").rdd.map(r => (r(0).toString -> r(1).toString.toInt)).collect.toMap
config_spark: scala.collection.immutable.Map[String,Int] = Map(seed -> 0, oversample -> 30, blockSize -> 50000, partitionWidthInBlocks -> 35, partitionHeightInBlocks -> 10, powerIter -> 1, embeddingDim -> 100)
// Create RSVD configuration val config = RSVDConfig( embeddingDim = config_spark("embeddingDim"), oversample = config_spark("oversample"), powerIter = config_spark("powerIter"), seed = config_spark("seed"), blockSize = config_spark("blockSize"), partitionWidthInBlocks = config_spark("partitionWidthInBlocks"), partitionHeightInBlocks = config_spark("partitionHeightInBlocks"), computeLeftSingularVectors = false, computeRightSingularVectors = false )
config: com.criteo.rsvd.RSVDConfig = RSVDConfig(100,30,1,0,50000,35,10,false,false)
def computeRSVD (groupedCanonicalEdges : org.apache.spark.sql.DataFrame, config : RSVDConfig): RsvdResults = { val matHeight = groupedCanonicalEdges.count() val Row(maxValue: Int) = groupedCanonicalEdges.agg(max("dst")).head val matWidth = maxValue val incidenceMatrixEntries = groupedCanonicalEdges.rdd.flatMap{ case Row(src: Int, dst: Int, id: Int) => List(MatrixEntry(id-1, src-1, -1), MatrixEntry(id-1, dst-1, 1)) } // Create block matrix and compute RSVD val matrixToDecompose = BlockMatrix.fromMatrixEntries(incidenceMatrixEntries, matHeight = matHeight, matWidth = matWidth, config.blockSize, config.partitionHeightInBlocks, config.partitionWidthInBlocks) return RSVD.run(matrixToDecompose, config, sc) }
computeRSVD: (groupedCanonicalEdges: org.apache.spark.sql.DataFrame, config: com.criteo.rsvd.RSVDConfig)com.criteo.rsvd.RsvdResults
val groupedCanonicalEdges = spark.read.format("parquet").load("/projects/group21/test_ethereum_canonical_edges").drop("flow") val rsvd_results_path: String = "/projects/group21/test_ethereum_" val RsvdResults(leftSingularVectors, singularValues, rightSingularVectors) = computeRSVD(groupedCanonicalEdges, config) val singularDF = sc.parallelize(singularValues.toArray).toDF() singularDF.write.format("parquet").mode("overwrite").save(rsvd_results_path + "SingularValues")
for(i <- 0 to 9) { val groupedCanonicalEdges = spark.read.format("parquet").load("/projects/group21/uniform_random_graph" + i) val rsvd_results_path: String = "/projects/group21/uniform_random_graph_" val RsvdResults(leftSingularVectors, singularValues, rightSingularVectors) = computeRSVD(groupedCanonicalEdges, config) val singularDF = sc.parallelize(singularValues.toArray).toDF() singularDF.write.format("parquet").mode("overwrite").save(rsvd_results_path + "SingularValues" + i) }
for(i <- 0 to 9) { val groupedCanonicalEdges = spark.read.format("parquet").load("/projects/group21/rmat_random_graph" + i) val rsvd_results_path: String = "/projects/group21/rmat_random_graph_" val RsvdResults(leftSingularVectors, singularValues, rightSingularVectors) = computeRSVD(groupedCanonicalEdges, config) val singularDF = sc.parallelize(singularValues.toArray).toDF() singularDF.write.format("parquet").mode("overwrite").save(rsvd_results_path + "SingularValues" + i) }