apache spark - Preserving keys(index) of an RDD in pyspark -
i have rdd rowrdd
rowrdd=[[u'chevrolet chevelle malibu', u'18.0', u'8', u'307.0', u'130.0', u'3504.', u'12.0', u'70', u'us'], [u'amc rebel sst', u'16.0', u'8', u'304.0', u'150.0', u'3433.', u'12.0', u'70', u'us'], [u'ford torino', u'17.0', u'8', u'302.0', u'140.0', u'3449.', u'10.5', u'70', u'us']]
to above rowrdd apply keyby()
with keys (0,2,8)
, reducebykey()
key (1)
respectively:
def aggonvalfunc(y): return y[1] pipe=rowsrdd.keyby(lambda w:(w[0],w[2],w[8])).mapvalues(lambda aggkey:aggonvalfunc(aggkey))\ .mapvalues(float).reducebykey(lambda x,y: x+y)
which gives me result new rdd call pipe
rdd
pipe=[((u'ford mustang', u'6', u'us'), 18.0), ((u'mercury capri 2000', u'4', u'us'), 23.0), ((u'chevrolet impala', u'8', u'us'), 52.0)]
i want further apply keyby()
, reducebykey()
on result i.e on pipe
rdd remap pipe
rdd tmp
rdd below:
def keylen(k,v): tup2 = () if(type(k)==tuple): in range(len(k)): tup2 += (k[i],) tup2 +=(v,) else: tup2 += (k,v) return tup2 tmp = pipe.map(lambda (k, v): keylen(k,v))
which gives me tmp
rdd as:
tmp=[(u'ford mustang', u'6', u'us', 18.0), (u'mercury capri 2000', u'4', u'us', 23.0), (u'chevrolet impala', u'8', u'us', 52.0)]
but when again want keyby()
older keys (0,8)
, apply reducebykey()
older key (1)
tmp
rdd not possible because rdd getting changed, again doing keyby() , reducebykey()
have change keys in keyby() , reducebyfuntion
function, given below::
def aggonvalfunc3(y): return y[2] pipe2=tmp.keyby(lambda w:(w[0],w[2])).mapvalues(lambda aggkey:aggonvalfunc2(aggkey))\ .mapvalues(float).reducebykey(lambda x,y: x+y)
so question is, there way keep older keys(index) of rdd or fix keys(index) rdd, if operation's been carried on rdd keys(index) should not change.
is possible? came across indexedrdd help? have suggestions resolving problem.
Comments
Post a Comment