发布于 2021-05-13 21:39 ,所属分类:数据库和大数据技术学习资料
本文主要包含以下几部分:
1.背景
2.Spark支持的数据类型
2.1 Local Vector(本地向量)
2.2 Labeled point(带标签的点)
2.3 Local Matrix(本地矩阵)
2.4 Distributed Matrix(分布式矩阵)
2.4.4 BlockMatrix
3.相似度计算原理探索
3.1 相似度计算
3.2 公式拆解
3.3 矩阵并行
3.4 阅读源码
4.Spark实现Item相似度计算
之前小编在计算两两用户的item重合度,根据item重合度去评估两个用户之间的相似度,根据条件进行过滤之后大概有3000个用户,但每个用户对应的item量参差不齐,有上百万的,有几千的,这样在去构建笛卡尔积的时候,进行item数据关联,得到的用户集就会特别大,spark运行的时候就会很慢,而且会出现很严重的数据倾斜。这个时候了解到了spark支持的数据类型,看到了CoordinateMatrix,然后深究其原理,便看到了这篇文章,经过整理形成了此文。
本文出自「xingoo」在原文的基础上加以小编自己的理解形成的学习笔记,希望对读者有帮助。原文链接:Spark MLlib 之 大规模数据集的相似度计算原理探索
官方文档地址:https://spark.apache.org/docs/latest/mllib-data-types.html
本地向量是从0开始的下标和double类型的数据组成,存储在本地机器上,所以称为Local Vector。它支持两种形式:
比如一个向量[1.0,0.0,3.0],用Dense表示为:[1.0,0.0,3.0],用Sparse表示为:(3,[0,2],[1.0,3.0]),其中3为向量的长度,[0,2]表示元素[1.0,3.0]的位置,可见sparse形式下0.0是不存储的。
importorg.apache.spark.mllib.linalg.VectorsvaldenseVector=Vectors.dense(1.0,0.0,3.0)valsparseVector1=Vectors.sparse(3,Array(0,2),Array(1.0,3.0))valsparseVector2=Vectors.sparse(3,Seq((0,1.0),(2,3.0)))println(s"DenseVectoris:$denseVector")println(s"DenseVectortoSparseis:${denseVector.toSparse}")println(s"sparseVector1is:$sparseVector1")println(s"sparseVector1toDenseis:${sparseVector1.toDense}")println(s"sparseVector2is:$sparseVector2")println(s"sparseVector2toDenseis:${sparseVector2.toDense}")
输出为:
DenseVectoris:[1.0,0.0,3.0]DenseVectortoSparseis:(3,[0,2],[1.0,3.0])sparseVector1is:(3,[0,2],[1.0,3.0])sparseVector1toDenseis:[1.0,0.0,3.0]sparseVector2is:(3,[0,2],[1.0,3.0])sparseVector2toDenseis:[1.0,0.0,3.0]
labeled point由本地向量组成,既可以是dense向量,也可以是sparse向量。在mllib中常用于监督类算法,使用double类型来保存该类型的数据,因为也可以用于回归和分类算法。例如二分类,label可以是0(负例)或1(正例),对于多分类,label可以是0,1,2...
importorg.apache.spark.mllib.linalg.Vectorsimportorg.apache.spark.mllib.regression.LabeledPointvalpos=LabeledPoint(1.0,Vectors.dense(1.0,0.0,3.0))valneg=LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))
sparse data
稀疏数据存储是非常普遍的现象,mllib支持读取libsvm格式的数据,其数据格式如下:
labelindex1:value1,index2:value2...
其读取方式包括:
importorg.apache.spark.mllib.util.MLUtils//method1spark.read.format("libsvm").load("libsvmdatapath")//method2MLUtils.loadLibSVMFile(spark.sparkContext,"libsvmdatapath")
local matrix由行下标,列索引和double类型的值组成,存储在本地机器上,mllib支持密集矩阵和稀疏矩阵,其存储是按照列进行存储的。
例如下面的为密集矩阵:
上面两个向量(x1,y1)和(x2,y2)计算夹角的余弦值就是两个向量方向的相似度,其公式为:
其中,表示的模,即每一项的平方和再开方。
那么如果向量不只是两维,而是n维呢?比如有两个向量:
第一个向量:
第二个向量:
他们的相似度计算方法套用上面的公式为:
通过上面的公式就可以发现,夹角余弦可以拆解成每一项与另一项对应位置的乘积x1∗y1,再除以每个向量自己的
注意,矩阵里面都是一列代表一个向量....上面是创建矩阵时的三元组,如果在spark中想要创建matrix,可以这样:
valdf=spark.createDataFrame(Seq((0,0,1.0),(1,0,1.0),(2,0,1.0),(3,0,1.0),(0,1,2.0),(1,1,2.0),(2,1,1.0),(3,1,1.0),(0,2,3.0),(1,2,3.0),(2,2,3.0),(0,3,1.0),(1,3,1.0),(3,3,4.0)))valmatrix=newCoordinateMatrix(df.map(row=>MatrixEntry(row.getAs[Integer](0).toLong,row.getAs[Integer](1).toLong,row.getAs[Double](2))).toJavaRDD)
然后计算每一个向量的normL2,即平方和开根号。
以第一个和第二个向量计算为例,第一个向量为(1,1,1,1),第二个向量为(2,2,1,1),每一项除以对应的normL2,得到后面的两个向量:
两个向量最终的相似度为0.94。
那么在Spark如何快速并行处理呢?通过上面的例子,可以看到两个向量的相似度,需要把每一维度乘积后相加,但是一个向量一般都是跨RDD保存的,所以可以先计算所有向量的第一维,得出结果
最后对做一次reduceByKey累加结果即可.....
首先创建dataframe形成matrix:
importorg.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}importorg.apache.spark.sql.SparkSessionobjectMatrixSimTest{defmain(args:Array[String]):Unit={//创建dataframe,转换成matrixvalspark=SparkSession.builder().master("local[*]").appName("sim").getOrCreate()spark.sparkContext.setLogLevel("WARN")importspark.implicits._valdf=spark.createDataFrame(Seq((0,0,1.0),(1,0,1.0),(2,0,1.0),(3,0,1.0),(0,1,2.0),(1,1,2.0),(2,1,1.0),(3,1,1.0),(0,2,3.0),(1,2,3.0),(2,2,3.0),(0,3,1.0),(1,3,1.0),(3,3,4.0)))valmatrix=newCoordinateMatrix(df.map(row=>MatrixEntry(row.getAs[Integer](0).toLong,row.getAs[Integer](1).toLong,row.getAs[Double](2))).toJavaRDD)//调用sim方法valx=matrix.toRowMatrix().columnSimilarities()//得到相似度结果x.entries.collect().foreach(println)}}
得到的结果为:
MatrixEntry(0,3,0.7071067811865476)MatrixEntry(0,2,0.8660254037844386)MatrixEntry(2,3,0.2721655269759087)MatrixEntry(0,1,0.9486832980505139)MatrixEntry(1,2,0.9128709291752768)MatrixEntry(1,3,0.596284793999944)
直接进入columnSimilarities方法看看是怎么个流程吧!
defcolumnSimilarities():CoordinateMatrix={columnSimilarities(0.0)}
内部调用了带阈值的相似度方法,这里的阈值是指相似度小于该值时,输出结果时,会自动过滤掉。
defcolumnSimilarities(threshold:Double):CoordinateMatrix={//检查参数...valgamma=if(threshold<1e-6){Double.PositiveInfinity}else{10*math.log(numCols())/threshold}columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray,gamma)}
这里的gamma用于采样,具体的做法咱们来继续看源码。然后看一下computeColumnSummaryStatistics().normL2.toArray这个方法:
defcomputeColumnSummaryStatistics():MultivariateStatisticalSummary={valsummary=rows.treeAggregate(newMultivariateOnlineSummarizer)((aggregator,data)=>aggregator.add(data),(aggregator1,aggregator2)=>aggregator1.merge(aggregator2))updateNumRows(summary.count)summary}
之前有介绍这个treeAggregate是一种带“预reduce”的map-reduce,返回的summary,里面帮我们统计了每一个向量的很多指标,比如
currMean为每一个向量的平均值currM2为每个向量的每一维的平方和currL1为每个向量的绝对值的和currMax为每个向量的最大值currMin为每个向量的最小值nnz为每个向量的非0个数
这里我们只需要currM2,它是每个向量的平方和。summary调用的normL2方法:
overridedefnormL2:Vector={require(totalWeightSum>0,s"Nothinghasbeenaddedtothissummarizer.")valrealMagnitude=Array.ofDim[Double](n)vari=0vallen=currM2.lengthwhile(i<len){realMagnitude(i)=math.sqrt(currM2(i))i+=1}Vectors.dense(realMagnitude)}
上面这步就是对平方和开个根号,这样就求出来了每个向量的分母部分。下面就是最关键的地方了:
private[mllib]defcolumnSimilaritiesDIMSUM(colMags:Array[Double],gamma:Double):CoordinateMatrix={//一些参数校验//对gamma进行开方valsg=math.sqrt(gamma)//sqrt(gamma)usedmanytimes//这里把前面算的平方根的值设置一个默认值,因为如果为0,除0会报异常,所以设置为1valcolMagsCorrected=colMags.map(x=>if(x==0)1.0elsex)//把抽样概率数组和平方根数组进行广播valsc=rows.contextvalpBV=sc.broadcast(colMagsCorrected.map(c=>sg/c))valqBV=sc.broadcast(colMagsCorrected.map(c=>math.min(sg,c)))//遍历每一行,计算每个向量该维的乘积,形成三元组valsims=rows.mapPartitionsWithIndex{(indx,iter)=>valp=pBV.valuevalq=qBV.value//获得随机值valrand=newXORShiftRandom(indx)valscaled=newArray[Double](p.size)iter.flatMap{row=>rowmatch{caseSparseVector(size,indices,values)=>//如果是稀疏向量,遍历向量的每一维,除以平方根valnnz=indices.sizevark=0while(k<nnz){scaled(k)=values(k)/q(indices(k))k+=1}//遍历向量数组,计算每一个数值与其他数值的乘机。//比如向量(1,2,0,1)//得到的结果为(0,1,value)(0,3,value)(2,3,value)Iterator.tabulate(nnz){k=>valbuf=newListBuffer[((Int,Int),Double)]()vali=indices(k)valiVal=scaled(k)//判断当前列是否符合采样范围,如果小于采样值,就忽略if(iVal!=0&&rand.nextDouble()<p(i)){varl=k+1while(l<nnz){valj=indices(l)valjVal=scaled(l)if(jVal!=0&&rand.nextDouble()<p(j)){//计算每一维与其他维的值buf+=(((i,j),iVal*jVal))}l+=1}}buf}.flattencaseDenseVector(values)=>//跟稀疏同理valn=values.sizevari=0while(i<n){scaled(i)=values(i)/q(i)i+=1}Iterator.tabulate(n){i=>valbuf=newListBuffer[((Int,Int),Double)]()valiVal=scaled(i)if(iVal!=0&&rand.nextDouble()<p(i)){varj=i+1while(j<n){valjVal=scaled(j)if(jVal!=0&&rand.nextDouble()<p(j)){buf+=(((i,j),iVal*jVal))}j+=1}}buf}.flatten}}//最后再执行一个reduceBykey,累加所有的值,就是i和j的相似度}.reduceByKey(_+_).map{case((i,j),sim)=>MatrixEntry(i.toLong,j.toLong,sim)}newCoordinateMatrix(sims,numCols(),numCols())}
这样把所有向量的平方和广播后,每一行都可以在不同的节点并行处理了。
总结来说,Spark提供的这个计算相似度的方法有两点优势:
不过杰卡德目前并不能使用这种方法来计算,因为杰卡德中间有一项需要对向量求dot,这种方式就不适合了;如果杰卡德想要快速计算,可以去参考LSH局部敏感哈希算法,这里就不详细说明了。
这里使用的数据集是MovieLens,计算Item的相似度,为用户推荐部分没有实现,不过也比较简单,感兴趣的用户可以自己试着实现一下看看。
//加载数据(userid,itemid,score)=>(string,long,double)valdataPath="data/ml-100k/ua.base"valdataTemp:RDD[(String,(Long,Double))]=spark.sparkContext.textFile(dataPath).map(_.split("\t")).map(l=>(l(0),(l(1).toLong,l(2).toDouble)))//理论为上userid 可能为设备id等字符串,所以进行编码valuserIndex:RDD[(String,Long)]=dataTemp.map(_._1).distinct().zipWithIndex()//(useridindex,itemid,score)valdata:RDD[(Long,Long,Double)]=dataTemp.leftOuterJoin(userIndex).filter(_._2._2.nonEmpty).map(l=>(l._2._2.get,l._2._1._1,l._2._1._2)).persist(StorageLevel.MEMORY_AND_DISK)println(s"使用的数据条数为:${data.count()}")data.take(3).foreach(l=>println(l))valmatrix=data.map(_match{case(uuid,spuid,rate)=>MatrixEntry(uuid,spuid,rate)})//newCoordinateMatrix(matrix)除了传入一个rdd之外//还有另外两个参数,rows和cols,如果不传的话默认是i,j中的最大值valtopicSims:CoordinateMatrix=newCoordinateMatrix(matrix)//toRowMatrix()调用的是toIndexedRowMatrix().toRowMatrix()valitemSim:CoordinateMatrix=topicSims.toRowMatrix().columnSimilarities()valitemSimRDD=itemSim.entries.union(itemSim.entries.map(m=>MatrixEntry(m.j,m.i,m.value)))println("生成计算结果...")itemSimRDD.map(f=>(f.i.toLong,f.j.toLong,f.value)).take(10).foreach(l=>println(l))
Over!
—完—
人工智能机器学习全新升级版I
[数据库] ORACLE认证培训教程 9i DBA数据库管理员 Fundamentals I (76讲全)
理综高三在线大联考(新课标I卷),含详细解析![百度网盘分享]
Google I/O 新发编程语言《Kotlin从入门到放弃》
Java NIO 1.4版的I/O新特性文档
2022全国新高考I卷、乙卷、北京卷数学、语文真题附答案解析[百度网盘分享]
使用C语言实现网站开发
2022全国新高考I卷、乙卷、北京卷数学、语文真题附答案解析
教你如何使用Perl管理便捷Oracle11g Oracle专题视频课程
如何在 iOS 中使用设计模式对 app 进行架构
2016-2020年五年高考英语全国卷(I)听力录音MP3汇总
2020年高考文综全国I卷高考真题及答案Word文档
2020年高考理科数学全国I卷真题及答案Word文档
2020衡水中学备考讲义丨近3年全国I、II、III卷-常考词组、句型-整理
使用PHP+Redis实现微博的用户管理视频教程
ORACLE认证培训教程 9i DBA数据库管理员 Fundamentals II (含电子书)
[网站优化] 使用PHP+Redis实现微博的用户管理视频教程
2020年高考英语全国I卷AB卷含听力高考真题及答案Word文档
Spark 1.X 大数据平台V2
衡水中学内部讲义丨如何一遍读懂阅读理解中的「长难句」
Spark2全方位深入分析(从源码到项目实践)
谈判的艺术——如何在谈判中取得优势,成功人士教给你
相关资源