scala - How does RDD.aggregate action work (i.e. how to understand the parameters)? -
can give detailed explanation of how below agrregate action in spark produces result of (9,4)
val rdd = sc.parallelize(list(1,2,3,3)) rdd.aggregate((0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) res : (9,4)
this spark 2.1.0 here (which should not matter much, but...)
go the official documentation of aggregate
(aka scaladoc) , read:
aggregate elements of each partition, , results partitions, using given combine functions , neutral "zero value". function can return different result type, u, type of rdd, t. thus, need 1 operation merging t u , 1 operation merging 2 u's, in scala.traversableonce. both of these functions allowed modify , return first argument instead of creating new u avoid memory allocation.
the signature follows (removed implicit parameter not particularly interesting):
aggregate[u](zerovalue: u)(seqop: (u, t) ⇒ u, combop: (u, u) ⇒ u): u
the scaladoc says:
zerovalue initial value accumulated result of each partition seqop operator, , initial value combine results different partitions combop operator - typically neutral element (e.g. nil list concatenation or 0 summation)
in case, zerovalue
(0, 0)
.
seqop operator used accumulate results within partition
in case, seqop
(x, y) => (x._1 + y, x._2 + 1)
function accepting 2 pairs, unfortunately named x
, y
(which i'd call p1
, p2
@ least or using pattern matching , partial function, i.e. case ((x1, y1), (x2, y2)) => ...
).
given you've got n
partitions (you can check out using rdd.getnumpartition
), seqop
going called n
times.
the scaladoc says:
combop associative operator used combine results different partitions
which means combop
combine results of seqop
, apply function:
(x, y) => (x._1 + y._1, x._2 + y._2)
it's again badly written see , i'd call a noise. i'd write function follows:
{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }
follow types , give proper names, , in scala becomes easier ;-)
Comments
Post a Comment