python - Join two RDDs on custom function - SPARK -
is possible join 2 rdds in spark on custom function? have 2 big rdds string key. want join them not using classic join custom function like:
def my_func(a,b): return lev.distance(a,b) < 2 result_rdd = rdd1.join(rdd2, my_func)
if it's not possible, there alternative continue use benefits of spark clusters? wrote pyspark not able distribuite work on small cluster.
def custom_join(rdd1, rdd2, my_func): = rdd1.sortbykey().collect() b = rdd2.sortbykey().collect() = 0 j = 0 res = [] while < len(a) , j < len(b): if my_func(a[i][0],b[j][0]): res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))] i+=1 j+=1 elif a[i][0] < b[j][0]: i+=1 else: j+=1 return sc.parallelize(res)
thanks in advance (and sorry english because i'm italian)
you can use cartesian , filter based on conditions.
from pyspark.sql import sparksession spark = sparksession.builder.getorcreate() sc = spark.sparkcontext x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("b", 3)]) def customfunc(x): # may use condition here return x[0][0] ==x[1][0] print(x.join(y).collect()) # normal join # replicating join cartesian print(x.cartesian(y).filter(customfunc).flatmap(lambda x:x).groupbykey().mapvalues(tuple).collect())
output:
[('b', (4, 3)), ('a', (1, 2))] [('a', (1, 2)), ('b', (4, 3))]
Comments
Post a Comment