scala - How does RDD.aggregate action work (i.e. how to understand the parameters)? -
can give detailed explanation of how below agrregate action in spark produces result of (9,4)
val rdd = sc.parallelize(list(1,2,3,3)) rdd.aggregate((0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) res : (9,4)
this spark 2.1.0 here (which should not matter much, but...)
go the official documentation of aggregate (aka scaladoc) , read:
aggregate elements of each partition, , results partitions, using given combine functions , neutral "zero value". function can return different result type, u, type of rdd, t. thus, need 1 operation merging t u , 1 operation merging 2 u's, in scala.traversableonce. both of these functions allowed modify , return first argument instead of creating new u avoid memory allocation.
the signature follows (removed implicit parameter not particularly interesting):
aggregate[u](zerovalue: u)(seqop: (u, t) ⇒ u, combop: (u, u) ⇒ u): u the scaladoc says:
zerovalue initial value accumulated result of each partition seqop operator, , initial value combine results different partitions combop operator - typically neutral element (e.g. nil list concatenation or 0 summation)
in case, zerovalue (0, 0).
seqop operator used accumulate results within partition
in case, seqop (x, y) => (x._1 + y, x._2 + 1) function accepting 2 pairs, unfortunately named x , y (which i'd call p1 , p2 @ least or using pattern matching , partial function, i.e. case ((x1, y1), (x2, y2)) => ...).
given you've got n partitions (you can check out using rdd.getnumpartition), seqop going called n times.
the scaladoc says:
combop associative operator used combine results different partitions
which means combop combine results of seqop , apply function:
(x, y) => (x._1 + y._1, x._2 + y._2) it's again badly written see , i'd call a noise. i'd write function follows:
{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) } follow types , give proper names, , in scala becomes easier ;-)
Comments
Post a Comment