scala - "Immortal" Spark Streaming Job? -
all right, i've asked similar question related how spark handles exceptions internally, example had wasn't clear or complete. answer there pointed me in direction can't explain things.
i've setup dummy spark streaming app , in transform stage have russian-roulette expression, might or not throw exception. if exception thrown, stop spark streaming context. that's it, no other logic, no rdd
transformation.
object immortalstreamingjob extends app { val conf = new sparkconf().setappname("fun-spark").setmaster("local[*]") val ssc = new streamingcontext(conf, seconds(1)) val elems = (1 1000).grouped(10) .map(seq => ssc.sparkcontext.parallelize(seq)) .toseq val stream = ssc.queuestream(mutable.queue[rdd[int]](elems: _*)) val transformed = stream.transform { rdd => try { if (random.nextint(6) == 5) throw new runtimeexception("boom") else println("lucky bastard") rdd } catch { case e: throwable => println("stopping streaming context", e) ssc.stop(stopsparkcontext = true, stopgracefully = false) throw e } } transformed.foreachrdd { rdd => println(rdd.collect().mkstring(",")) } ssc.start() ssc.awaittermination() }
running in intellij throw exception @ point. fun part:
- if exception thrown in first transformation (when first rdd processed), spark context stopped , app dies, want
- if exception thrown after @ least 1
rdd
has been processed, app hangs after printing error message , never stops, not want
why app hang instead of dying in second case?
i'm running spark 2.1.0 on scala 2.11.8. getting out try-catch solves problem (spark stops itself). also, moving out try-catch inside foreachrdd
solves problem.
however i'm looking answer can me understand what's going on in particular example.
you see exceptions manifest in actions (like foreachrdd
in case) rather transformations (like transform
in case) because actions execute transformations lazily. means transformations won't occur until action. reason why necessary demands changing mental model of how distributed processing works.
consider conventional, single-threaded program. code proceeds line-by-line, , if exception thrown , not handled, subsequent lines of code don't execute. in distributed system same spark transformations running in parallel on multiple machines (and @ different paces), should happen when exception thrown? it's not simple since exception on 1 machine independent of code proceeding on other machines, how want it. want independent tasks spread throughout cluster shut down on exception single-machine thinking doesn't translate distributed paradigm. how driver supposed deal that?
according matei zaharia, of databricks , 1 of creators of spark @ berkeley, "exceptions should sent driver program , logged there (with sparkexception
thrown if task fails more 4 times)." (incidentally, default number of retries can changed spark.task.maxfailures
.). if log4j configured on executors, exception logged there; serialized , sent driver, try again 3 more times default.
in particular situation, guess have couple of things going on. first, running on single machine, give misleading picture of how exception handling works in distributed model. second, stopping context prematurely. stopping context is extremely destructive operation, includes stopping listeners , dagscheduler
. frankly, don't know how can expect spark wrap neatly when you've turned out lights.
finally, mention more elegant exception handling model might executing transformations inside try
. end potentially more cumbersome code in transformations return rdd[try[t]]
or dstream[try[t]]
, means have handle success
, failure
cases each element. able propagate success , error information downstream benefits monad provides including mapping rdd[try[a]] => rdd[try[b]]
, using for
comprehensions (by virtue of flatmap
).
Comments
Post a Comment