pyspark minHash LSH 查找類似度

先看看官方文檔:

MinHash for Jaccard Distance

MinHash is an LSH family for Jaccard distance where input features are sets of natural numbers. Jaccard distance of two sets is defined by the cardinality of their intersection and union:html

d(A,B)=1|AB||AB|d(A,B)=1−|A∩B||A∪B|

MinHash applies a random hash function g to each element in the set and take the minimum of all hashed values:java

h(A)=minaA(g(a))h(A)=mina∈A(g(a))

 

The input sets for MinHash are represented as binary vectors, where the vector indices represent the elements themselves and the non-zero values in the vector represent the presence of that element in the set. While both dense and sparse vectors are supported, typically sparse vectors are recommended for efficiency. For example, Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) means there are 10 elements in the space. This set contains elem 2, elem 3 and elem 5. All non-zero values are treated as binary 「1」 values.python

Note: Empty sets cannot be transformed by MinHash, which means any input vector must have at least 1 non-zero entry.sql

Refer to the MinHashLSH Python docs for more details on the API.apache

from pyspark.ml.feature import MinHashLSH from pyspark.ml.linalg import Vectors from pyspark.sql.functions import col dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),), (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),), (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)] dfA = spark.createDataFrame(dataA, ["id", "features"]) dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),), (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),), (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)] dfB = spark.createDataFrame(dataB, ["id", "features"]) key = Vectors.sparse(6, [1, 3], [1.0, 1.0]) mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5) model = mh.fit(dfA) # Feature Transformation print("The hashed dataset where hashed values are stored in the column 'hashes':") model.transform(dfA).show() # Compute the locality sensitive hashes for the input rows, then perform approximate # similarity join. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxSimilarityJoin(transformedA, transformedB, 0.6)` print("Approximately joining dfA and dfB on distance smaller than 0.6:") model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\ .select(col("datasetA.id").alias("idA"), col("datasetB.id").alias("idB"), col("JaccardDistance")).show() # Compute the locality sensitive hashes for the input rows, then perform approximate nearest # neighbor search. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxNearestNeighbors(transformedA, key, 2)` # It may return less than 2 rows when not enough approximate near-neighbor candidates are # found. print("Approximately searching dfA for 2 nearest neighbors of the key:") model.approxNearestNeighbors(dfA, key, 2).show() 
Find full example code at "examples/src/main/python/ml/min_hash_lsh_example.py" in the Spark repo.

[PySpark] LSH類似度計算

1、問題場景

假設咱們要找海量用戶中哪些是行爲類似的——api

用戶A:數組

id: 1001
name: 用戶A
data: "07:00 吃早餐,09:00 工做,12:00 吃午餐,13:00 打王者,18:00 吃晚飯,22:00 睡覺"
mat: "1010001000010001100001101010000"

用戶B:app

id: 1002
name: 用戶B
data: "07:00 晨運,08:00 吃早餐,12:30 吃午餐,13:00 學習,19:00 吃晚飯,21:00 學習,23:00 睡覺"
mat: "1110001000010000001011101010000"

用戶C:......less

mat是對用戶的數據的特徵化描述,好比能夠定義第一位爲「早起」,第二位爲「晨運」,第三位爲「吃早餐」,那麼咱們有了這個矩陣,怎麼找到和他相近行爲習慣的人呢?dom

從描述的one-hot向量中,咱們看到A和B其實有不少類似性,但有部分不一樣,好比A打王者、可是B愛學習——

用戶A: "1010001000010000001001101010000"
用戶B: "1110001000010000000011101010000"

這就能夠用LSH大法了。

2、思路介紹

Q:LSH類似度用來幹嗎?
A:全稱是「局部敏感哈希」(Locality Sensitive Hashing)。能在特徵向量類似又不徹底相同的狀況下,找出儘量近的樣本。固然了,仍是須要先定義好特徵,再用LSH方法。

參考資料:Extracting, transforming and selecting features(特徵的提取,轉換和選擇)

工做中的問題是如何在海量數據中跑起來,pyspark實現時,有MinHashLSH, BucketedRandomProjectionLSH兩個選擇。

MinHashLSH

MinHash 是一個用於Jaccard 距離的 LSH family,它的輸入特徵是天然數的集合。 兩組的Jaccard距離由它們的交集和並集的基數定義:

MinHash 將隨機哈希函數g應用於集合中的每一個元素,並取得全部哈希值中的最小值。

BucketedRandomProjectionLSH(歐幾里得度量的隨機投影)

隨機桶投影是用於歐幾里德距離的 LSH family。 歐氏度量的定義以下:

其LSH family將向量x特徵向量映射到隨機單位矢量v,並將映射結果分爲哈希桶中:

其中r是用戶定義的桶長度,桶長度可用於控制哈希桶的平均大小(所以也可用於控制桶的數量)。 較大的桶長度(即,更少的桶)增長了將特徵哈希到相同桶的機率(增長真實和假陽性的數量)。

桶隨機投影接受任意向量做爲輸入特徵,並支持稀疏和密集向量。

3、代碼實現

很少說了,直接上代碼吧。

import os import re import hashlib from pyspark import SparkContext, SparkConf from pyspark import Row from pyspark.sql import SQLContext, SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql.functions import udf,collect_list, collect_set from pyspark.ml.feature import MinHashLSH, BucketedRandomProjectionLSH from pyspark.ml.linalg import Vectors, VectorUDT # 控制spark服務啓動 spark = SparkSession.builder.appName('app_name').getOrCreate() spark.stop() spark = SparkSession.builder.appName('app_name').getOrCreate() class PySpark(object): @staticmethod def execute(df_input): """  程序入口,需用戶重載  :return:必須返回一個DataFrame類型對象  """ # step 1:讀入DataFrame df_mid = df_input.select('id','name','data','mat') # step 2:特徵向量預處理 def mat2vec(mat): """  定義UDF函數,將特徵矩陣向量化  :return:返回類似度計算所需的VectorUDT類型  """ arr = [0.0]*len(mat) for i in range(len(mat)): if mat[i]!='0': arr[i]=1.0 return Vectors.dense(arr) udf_mat2vec = udf(mat2vec,VectorUDT()) df_mid = df_mid.withColumn('vec', udf_mat2vec('mat')).select( 'id','name','data','mat','vec') # step 3:計算類似度 ## MinHashLSH,可用EuclideanDistance minlsh = MinHashLSH(inputCol="vec", outputCol="hashes", seed=123, numHashTables=3) model_minlsh = minlsh.fit(df_mid) ## BucketedRandomProjectionLSH brplsh = BucketedRandomProjectionLSH(inputCol="vec", outputCol="hashes", seed=123, bucketLength=10.0, numHashTables=10) model_brplsh = brplsh.fit(df_mid) # step 4:計算(忽略自類似,最遠距離限制0.8) ## model_brplsh相似,可用EuclideanDistance df_ret = model_minlsh.approxSimilarityJoin(df_mid, df_mid, 0.8, distCol='JaccardDistance').select( col("datasetA.id").alias("id"), col("datasetA.name").alias("name"), col("datasetA.data").alias("data"), col("datasetA.mat").alias("mat"), col("JaccardDistance").alias("distance"), col("datasetB.id").alias("ref_id"), col("datasetB.name").alias("ref_name"), col("datasetB.data").alias("ref_data"), col("datasetB.mat").alias("ref_mat") ).filter("id=ref_id") return df_ret df_in = spark.createDataFrame([ (1001,"A","xxx","1010001000010000001001101010000"), (1002,"B","yyy","1110001000010000000011101010000"), (1003,"C","zzz","1101100101010111011101110111101")], ['id', 'name', 'data', 'mat']) df_out = PySpark.execute(df_in) df_out.show()

跑出來的效果是,MinHashLSH模式下,A和B距離是0.27,比較近,但A、B到C都是0.75左右,和預期相符。

好了,夠鐘上去舉鐵了……

MLlib支持兩種矩陣,dense密集型和sparse稀疏型。一個dense類型的向量背後其實就是一個數組,而sparse向量背後則是兩個並行數組——索引數組和值數組。好比向量(1.0, 0.0, 3.0)既能夠用密集型向量表示爲[1.0, 0.0, 3.0],也能夠用稀疏型向量表示爲(3, [0,2],[1.0,3.0]),其中3是數組的大小。

相關文章
相關標籤/搜索